问题描述
我正在尝试在我的 Flink 作业中使用事件时间,并使用 BoundedOutOfOrdernessTimestampExtractor
提取时间戳并生成水印.但是我有一些输入Kafka有稀疏流,它可以长时间没有数据,这使得AggregateFunction
中的getResult
根本没有被调用.我可以看到数据进入 add
函数.
I am trying to use event time in my Flink job, and using BoundedOutOfOrdernessTimestampExtractor
to extract timestamp and generate watermark.But I have some input Kafka having sparse stream, it can have no data for a long time, which makes the getResult
in AggregateFunction
not called at all. I can see data going into add
function.
我已经设置了 getEnv().getConfig().setAutoWatermarkInterval(1000L);
我试过
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
.allowedLateness(WINDOW_LATENESS)
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
还有会话窗口
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
所有水印指标都显示No Watermark
如何让 Flink 忽略无水印的东西?
All the watermark metics shows No Watermark
How can I let Flink to ignore that no watermark thing?
推荐答案
仅供参考,这通常被称为空闲源"问题.发生这种情况是因为每当 Flink 算子有两个或更多输入时,它的水印就是其输入中最小的水印.如果其中一个输入停止,则其水印不再前进.
FYI, this is commonly referred to as the "idle source" problem. This occurs because whenever a Flink operator has two or more inputs, its watermark is the minimum of the watermarks from its inputs. If one of those inputs stalls, its watermark no longer advances.
请注意,Flink 没有按键添加水印——给定的运算符通常跨多个键的事件多路复用.只要某些事件流经给定任务的输入流,其水印就会提前,空闲键的事件时间计时器仍将触发.要发生这种空闲源"问题,任务必须有一个完全空闲的输入流.
Note that Flink does not have per-key watermarking -- a given operator is typically multiplexed across events for many keys. So long as some events are flowing through a given task's input streams, its watermark will advance, and event time timers for idle keys will still fire. For this "idle source" problem to occur, a task has to have an input stream that has become completely idle.
如果可以安排,最好的解决方案是让您的数据源包含 keepalive 事件.这将使您能够自信地推进水印,因为知道源只是闲置,而不是例如离线.
If you can arrange for it, the best solution is to have your data sources include keepalive events. This will allow you to advance your watermarks with confidence, knowing that the source is simply idle, rather than, for example, offline.
如果这是不可能的,并且如果您有一些未空闲的资源,那么您可以将 rebalance()
放在 BoundedOutOfOrdernessTimestampExtractor
前面(以及之前)keyBy),以便每个实例继续接收一些事件并可以推进其水印.这是以额外的网络洗牌为代价的.
If that's not possible, and if you have some sources that aren't idle, then you could put a rebalance()
in front of the BoundedOutOfOrdernessTimestampExtractor
(and before the keyBy), so that every instance continues to receive some events and can advance its watermark. This comes at the expense of an extra network shuffle.
也许最常用的解决方案是使用水印生成器来检测空闲并根据处理时间计时器人为地推进水印.ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor 就是一个例子.
Perhaps the most commonly used solution is to use a watermark generator that detects idleness and artificially advances the watermark based on a processing time timer. ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor is an example of that.
这篇关于Flink 窗口函数 getResult 未触发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!