我有一个密钥流:
KeyedStream<String, Data> keyed =
env.addSource(...)
.map(new Parser())
.keyBy(i -> i.key)
我想在我的流上运行全状态处理功能;但
process
将返回正常的非密钥流,这会导致丢失KeyedStream
,并迫使我再次调用keyBy
:SingleOutputStreamOperator<Data> unkeyed = keyed.process(new Function)
KeyedStream<String, Data> keyedAgain = keyed.keyBy(i -> i.key)
KeyedStream
上的值调用映射函数? ProcessFunction
这样的低级API来运行全状态功能,而这会导致不必要的费用:运行时出现新的子任务,以及重新整理数据? 最佳答案
您可以使用RichMapFunction
或RichFlatmapFunction
并有权访问Flink的托管状态机制。与ProcessFunction
相比,您将缺少的是计时器。
参见示例here。