# Flink零基础学习教程map算子原理

map是对每个元素处理,一进一出。

image-20260518212859090参考逻辑代码。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Flink02_Transform {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //直接从数据中读取数据
        DataStreamSource<Integer> envStream = env.fromElements(1, 2, 3, 4, 5, 6);

        //算子学习 map
        //1
        SingleOutputStreamOperator<Integer> map = envStream.map(data -> {
            return data * 2;
        });
        
        
        //2
        SingleOutputStreamOperator<Integer> map2 = envStream.map(data -> data * 2);
        // 两种方案都是可行
        //map函数内部是要求实现MapFunction<IN, OUT>   in输入类型 out输出类型
        SingleOutputStreamOperator<String> map3 = envStream.map(
                new MapFunction<Integer, String>() {
                    @Override
                    public String map(Integer value) throws Exception {
                        //这里可以做更加复杂的处理
                        return value.toString();
                    }
                });


		// 3
        //更加复杂的情况是需要map对开不数据处理这时候可以使用RichMapFunction
        SingleOutputStreamOperator<String> map4 = envStream.map(
                new RichMapFunction<Integer, String>() {
                    //open方法做数据初始化

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //在这里可以做外部信息获取比如链接
                        super.open(parameters);
                        System.out.println("这里的代码会在map之前处理");
                    }

                    @Override
                    public String map(Integer value) throws Exception {
                        //这里可以做更加复杂的处理
                        System.out.println("正常处理逻辑代码");
                        return value.toString();
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        System.out.println("程序关闭时候");
                    }
                });



        envStream.print();
        env.execute();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

总结map应用场景

  • 数据类型转换
  • 数值运算
  • 属性映射
Last Updated: 6/12/2026, 4:20:22 AM