我在运行于Cloudera管理的群集(通过Yarn进行资源分配)上的Flink 1.7.2中进行了设置,该设置可从外部Kafka获取大量数据,并通过一系列运算符将其传输到管道,这些运算符将再次汇总,计算,汇总...我什至使用一个带有过滤器和多个运算符的迭代循环,最后使用一个将结果写入我的Hadoop集群上的rocksDB后端的接收器。所有这些都需要一定的时间(目前大约2-3个小时),然后检查点会卡住。
我使用一次精确的检查点,在检查点之间有30分钟的大超时和10分钟的暂停。 1个并发检查点。只要一切正常,这些检查点将在1分钟内完成。但是几个小时后,一个检查点卡住了,这意味着“检查点-UI”选项卡告诉我一个(或多个)操作员尚未确认所有子任务。到那时,正常流程也将陷入困境。我的输入源上的水印将不会继续,并且将不再产生输出。而且,直到计时器用完,他们才开始运作。然后,下一个检查点立即激活,可能写入所有任务的10%,然后再次卡住。没有恢复的机会。如果我取消作业并以上一个成功的检查点作为起点重新启动它,则下一个检查点将以相同的方式卡住。
从更改检查点频率到超时,我已经尝试了很多不同的方法。由于对齐缓冲有时会变得非常昂贵,所以我甚至从完全一次更改为至少一次。但是即使那样,同样的问题在同样的时间之后还是出现了。
资源分配似乎也不起作用,我目前每个任务管理器使用4个任务槽,并不时更改管理器的数量,但没有任何变化。 JVM堆大小似乎也不是问题,因为我提交了多个GB,但是显然只使用了几百MB。
作业经理或任务经理不会发出任何错误消息,所有日志都告诉我是尝试写入检查点,丢失成功消息以及下一个检查点开始的尝试。
最佳答案
当您说使用“一个内部包含过滤器和多个运算符的迭代循环”时,您是否正在对流作业使用Flink的迭代构造?
不建议这样做。如documentation中所述:
Flink当前仅为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊标志:env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)
。
请注意,在故障期间,循环边缘中正在运行的记录(以及与它们相关的状态更改)将丢失。
就是说,您所描述的听起来像是背压阻止检查点障碍前进的情况。可能是由很多原因引起的,但是this blog post可能会帮助您诊断问题。但是我不确定其中有多少适用于使用迭代的工作。
关于java - 如何修复Apache Flink中卡住的检查点,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/57833964/