问题描述
我有一个这样的管道:
env.addSource(kafkaConsumer, name_source)
.keyBy { value -> value.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(2)))
.process(MyProcessor())
.addSink(kafkaProducer)
保证键在当前正在处理的数据中是唯一的.因此,我希望状态大小不会超过2秒的数据.
The keys are guaranteed to be unique in the data that is being currently processed.Thus I would expect the state size to not grow over 2 seconds of data.
但是,我注意到状态的大小在最后一天(自部署应用以来)一直在稳定增长.
However, I notice the state size has been steadily growing over the last day (since the app was deployed).
这是flink中的错误吗?
Is this a bug in flink?
在AWS Kinesis数据分析中使用flink 1.11.2.
using flink 1.11.2 in aws kinesis data analytics.
推荐答案
Kinesis Data Analytics始终使用RocksDB作为其状态后端.使用RocksDB,不会立即清除死状态,只是用墓碑标记它,然后将其压缩.我不确定KDA如何配置RocksDB压缩,但是通常是在级别达到一定大小时完成的-我怀疑您的状态大小仍然足够小以至于没有进行压缩.
Kinesis Data Analytics always uses RocksDB as its state backend. With RocksDB, dead state isn't immediately cleaned up, it's merely marked with a tombstone and is later compacted away. I'm not sure how KDA configures RocksDB compaction, but typically it's done when a level reaches a certain size -- and I suspect your state size is still small enough that compaction hasn't occurred.
使用增量检查点(这是KDA所做的),通过复制RocksDB的SST文件来完成检查点-在您的情况下,这些文件可能充满了过时的数据.如果让它运行足够长的时间,则在完成压缩后,最终应该会发现检查点大小显着下降.
With incremental checkpoints (which is what KDA does), checkpointing is done by copying RocksDB's SST files -- which in your case are presumably full of stale data. If you let this run long enough you should eventually see a significant drop in checkpoint size, once compaction has been done.
这篇关于Flink应用程序的检查点大小不断增长的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!