本文介绍了处理来自flink数据流的输出数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是我的流处理的伪代码.

below is the pseudocode of my stream processing.

Datastream env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Datastream stream = env.addSource() .map(mapping to java object)
    .filter(filter for specific type of events)
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(2)){})
    .timeWindowAll(Time.seconds(10));

//collect all records.
Datastream windowedStream = stream.apply(new AllWindowFunction(...))

Datastream processedStream = windowedStream.keyBy(...).reduce(...)

String outputPath = ""

final StreamingFileSink sink = StreamingFileSink.forRowFormat(...).build();

processedStream.addSink(sink)

上面的代码流正在创建多个文件,我猜每个文件都有不同窗口的记录.例如,每个文件中的记录具有30-40秒之间的时间戳记,而窗口时间仅为10秒.我的预期输出模式是将每个窗口数据写入单独的文件.关于此的任何参考或输入将有很大帮助.

The above code flow is creating multiple files and each file has records of different windows I guess. For example, records in each files have timestamps which ranges between 30-40 seconds, whereas window time is only 10 seconds.My expected output pattern is writing each window data to separate file.Any references or input on this would be of great help.

推荐答案

看看 BucketAssigner 接口.它应该足够灵活以满足您的需求.您只需要确保流事件包含足够的信息即可确定您希望它们写入的路径.

Take a look at the BucketAssigner interface. It should be flexible enough to meet your needs. You just need to make sure that your stream events contain enough information to determine the path you want them written to.

这篇关于处理来自flink数据流的输出数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 10:08