问题描述
我正在使用Google Dataflow中的Beam KafkaIO源运行作业,无法找到一种在重新启动作业期间保持偏移量的简便方法(作业更新选项不够,我需要重新启动作业)
I'm running a job using Beam KafkaIO source in Google Dataflow and cannot find an easy way to persist offsets across job restarts (job update option is not enough, i need to restart the job)
将Beam的KafkaIO与PubSubIO进行比较(或者准确地说是比较PubsubCheckpoint与KafkaCheckpointMark),我可以看到,在KafkaIO中并未实现检查点持久性(KafkaCheckpointMark.finalizeCheckpoint方法为空),而在PubsubCheckpoint.finalizeCheckpoint中实现了它,它会向PubSub确认.
Comparing Beam's KafkaIO against PubSubIO (or to be precise comparing PubsubCheckpoint with KafkaCheckpointMark) I can see that checkpoint persistence is not implemented in KafkaIO (KafkaCheckpointMark.finalizeCheckpoint method is empty) whereas it's implemented in PubsubCheckpoint.finalizeCheckpoint which does acknowledgement to PubSub.
这是否意味着我无法以最小的努力可靠地管理作业重启时的Kafka偏移量?
Does this mean I have no means of reliably managing Kafka offsets on job restarts with minimum effort?
到目前为止我考虑过的选项:
Options I considered so far:
-
实施我自己的持久偏移逻辑-听起来很复杂,我在Scala中使用Beam尽管Scio.
Implement my own logic for persisting offsets - sounds complicated, I'm using Beam though Scio in Scala.
不执行任何操作,但这会导致重新启动作业时出现很多重复(主题保留期为30天).
Do nothing but that would result in many duplicates on job restarts (topic has 30 days retention period).
启用自动提交功能,但这会导致丢失消息,甚至更糟.
Enable auto-commit but that would result in lost messages so even worse.
推荐答案
有两个选项:启用 commitOffsetsInFinalize()在KafkaIO中或者在Kafka使用者配置中启用自动提交.请注意,尽管commitOffsetsInFinalize()
与Beam中已处理的内容相比,与Kafka的自动提交更同步,但它并不能完全保证一次处理.想象一下一个两阶段的流水线,Dataflow在第一阶段之后完成Kafka阅读器的最终确定,而无需等待第二阶段完成.如果您那时从头开始重新启动管道,则不会处理第一阶段完成但第二阶段尚未处理的记录.对于PubsubIO,该问题没有什么不同.
There two options : enable commitOffsetsInFinalize() in KafkaIO or alternately enable auto-commit in Kafka consumer configuration. Note that while commitOffsetsInFinalize()
is more in sync with what has been processed in Beam than Kafka's auto-commit, it does not provide strong guarantees exactly-once processing. Imagine a two stage pipeline, Dataflow finalizes Kafka reader after the first stage, without waiting for second stage to complete. If you restart the pipeline from scratch at that time, you would not process the records that completed first stage, but haven't been processed by the second. The issue is no different for PubsubIO.
Regd选项(2):您可以配置KafkaIO从特定的时间戳开始读取(假设Kafka服务器支持该时间戳(版本10+)).但是看起来并没有启用auto_commit更好.
Regd option (2) : You can configure KafkaIO to start reading from specific timestamp (assuming Kafka server supports it (version 10+)). But does not look any better than enabling auto_commit.
也就是说,KafkaIO应该支持最终确定.它可能比启用auto_commit更容易使用(需要考虑频率等).我们没有很多用户要求它.如果可以的话,请在[email protected]上提及.
That said, KafkaIO should support finalize. Might be simpler to use than enabling auto_commit (need to think about frequency etc). We haven't had many users asking for it. Please mention it on [email protected] if you can.
[更新:我在 PR 4481 ]
这篇关于KafkaIO检查点-如何向Kafka提交偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!