springboot集成flink,写代码学习flink,集成步骤如下:

1、maven引入依赖:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients_2.11</artifactId>
   <version>${flink.version}</version>
</dependency>

2、配置文件配置相关参数:

# Flink配置
flink.jobmanager.host=localhost
flink.jobmanager.port=6123
flink.parallelism=1

3、写测试类,代码如下 :

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import java.util.Random;



public class Demo {

    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建数据源
        DataStream<String> stream = env.addSource(new SourceFunction<String>() {
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    Thread.sleep(10);
                    long timestamp = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    String str = "key" + random.nextInt(10) + "," + timestamp;
                    ctx.collectWithTimestamp(str, timestamp);
                    ctx.emitWatermark(new Watermark(timestamp));
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });


        // 将数据源解析成二元组(key, timestamp)
        DataStream<Tuple2<String, Long>> parsedStream = stream.map((String line)  -> {
            String[] parts = line.split(",");
            return new Tuple2<>((String)parts[0], Long.parseLong(parts[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 设置事件时间和水位线
        DataStream<Tuple2<String, Long>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple2<String, Long> element) {
                        return element.f1;
                    }
                });

        // 按键值进行分组
        KeyedStream<Tuple2<String, Long>, Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);

        // 每5秒钟统计最近一分钟的数据(使用滚动时间窗口)
        WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));

        // 进行聚合计算
        DataStream<Tuple2<String, Long>> resultStream = windowedStream
                .reduce((Tuple2<String, Long> v1, Tuple2<String, Long> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1));

        // 输出结果
        resultStream.print();

        // 启动作业
        env.execute("Demo");
    }
}
06-05 16:03