问题描述
我正在设置一个从 Kafka 读取并汇入 HDFS 的 flink 管道.我想在 addSink() 步骤之后处理元素.这是因为我想设置触发文件,指示某个分区/小时的数据写入(到接收器)已完成.如何做到这一点?目前我正在使用 Bucketing sink.
I am setting up a flink pipeline that reads from Kafka and sinks to HDFS. I want to process the elements after the addSink() step. This is because I want to setup trigger files indicating that writing data (to the sink) for a certain partition/hour is complete. How can this be achieved? Currently I am using the Bucketing sink.
数据流 messageStream = env.addSource(flinkKafkaConsumer011);
DataStream messageStream = env .addSource(flinkKafkaConsumer011);
//将消息流转换为keyedStream的一些聚合
//some aggregations to convert message stream to keyedStream
keyedStream.addSink(sink);
keyedStream.addSink(sink);
//3.之后如何处理元素?
//How to process elements after 3.?
推荐答案
Flink API 不支持将作业图扩展到接收器之外.(但是,您可以分叉流并在写入接收器的同时进行额外的处理.)
The Flink APIs do not support extending the job graph beyond the sink(s). (You can, however, fork the stream and do additional processing in parallel with writing to the sink.)
随着 Streaming File Sink 您可以观察零件文件在完成时转换到完成状态.请参阅 JavaDoc 了解更多信息.
With the Streaming File Sink you can observe the part files transition to the finished state when they complete. See the JavaDoc for more information.
状态存在于单个操作符中——只有该操作符(例如 ProcessFunction)可以修改它.如果您想在接收器完成后修改键值状态,则没有直接的方法可以做到这一点.一种想法是在 ProcessFunction 中添加一个处理时间计时器,该计时器具有定期唤醒并检查新完成的零件文件的键控状态,并根据它们的存在修改状态.或者,如果这是错误的粒度,请编写一个执行类似操作的自定义源,并将信息流式传输或广播到 ProcessFunction(然后必须是 CoProcessFunction 或 KeyedBroadcastProcessFunction),它可以用来执行必要的状态更新.
State lives within a single operator -- only that operator (e.g., a ProcessFunction) can modify it. If you want to modify the keyed value state after the sink has completed, there's no straightforward way to do that. One idea would be to add a processing time timer in the ProcessFunction that has the keyed state that wakes up periodically and checks for newly finished part files, and based on their existence, modifies the state. Or if that's the wrong granularity, write a custom source that does something similar and streams or broadcasts information into the ProcessFunction (which will then have to be a CoProcessFunction or a KeyedBroadcastProcessFunction) that it can use to do the necessary state updates.
这篇关于下沉到目的地后的流程要素的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!