问题描述
我试图在Flink作业中使用事件时间,并使用BoundedOutOfOrdernessTimestampExtractor
提取时间戳并生成水印.但是我有一些输入Kafka的流稀疏,它很长时间都没有数据,这使得AggregateFunction
中的getResult
根本没有被调用.我可以看到数据进入了add
函数.
I am trying to use event time in my Flink job, and using BoundedOutOfOrdernessTimestampExtractor
to extract timestamp and generate watermark.But I have some input Kafka having sparse stream, it can have no data for a long time, which makes the getResult
in AggregateFunction
not called at all. I can see data going into add
function.
我设置了getEnv().getConfig().setAutoWatermarkInterval(1000L);
我尝试过
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
.allowedLateness(WINDOW_LATENESS)
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
也是会话窗口
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
所有水印功能均显示No Watermark
如何让Flink忽略没有水印的东西?
All the watermark metics shows No Watermark
How can I let Flink to ignore that no watermark thing?
推荐答案
仅供参考,这通常称为空闲源"问题.发生这种情况的原因是,每当Flink运算符具有两个或多个输入时,其水印就是来自其输入的最小水印.如果这些输入之一停止运行,其水印将不再前进.
FYI, this is commonly referred to as the "idle source" problem. This occurs because whenever a Flink operator has two or more inputs, its watermark is the minimum of the watermarks from its inputs. If one of those inputs stalls, its watermark no longer advances.
请注意,Flink没有每个按键的水印功能-给定的运算符通常在许多按键的事件之间进行多路复用.只要某些事件流过给定任务的输入流,它的水印就会提前,并且空闲键的事件时间计时器仍会触发.为了使此空闲源"问题发生,任务必须具有完全空闲的输入流.
Note that Flink does not have per-key watermarking -- a given operator is typically multiplexed across events for many keys. So long as some events are flowing through a given task's input streams, its watermark will advance, and event time timers for idle keys will still fire. For this "idle source" problem to occur, a task has to have an input stream that has become completely idle.
如果可以安排,最好的解决方案是让您的数据源包含keepalive事件.知道源只是闲置的,而不是例如离线的,这将使您放心地扩展水印.
If you can arrange for it, the best solution is to have your data sources include keepalive events. This will allow you to advance your watermarks with confidence, knowing that the source is simply idle, rather than, for example, offline.
如果这不可能,并且如果您有一些不空闲的源,则可以在BoundedOutOfOrdernessTimestampExtractor
之前(在keyBy之前)放置一个rebalance()
,以便每个实例继续接收一些事件,并可以提前其水印.这是以额外的网络改组为代价的.
If that's not possible, and if you have some sources that aren't idle, then you could put a rebalance()
in front of the BoundedOutOfOrdernessTimestampExtractor
(and before the keyBy), so that every instance continues to receive some events and can advance its watermark. This comes at the expense of an extra network shuffle.
也许最常用的解决方案是使用水印生成器,该生成器检测空闲并基于处理时间计时器人为地使水印前进. ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor 就是一个例子.
Perhaps the most commonly used solution is to use a watermark generator that detects idleness and artificially advances the watermark based on a processing time timer. ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor is an example of that.
这篇关于Flink窗口函数getResult未触发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!