问题描述
对于Kafka Streams,如果我们使用较低级别的处理器API,则可以控制是否提交.因此,如果我们的代码中发生问题,并且我们不想提交此消息.在这种情况下,Kafka将多次重新发送此消息,直到问题解决.
For Kafka Streams, if we use lower-level processor API, we can control to commit or not. So if problems happens in our code, and we don't want to commit this message. In this case, Kafka will redeliver this message multiple times until the problem gets fixed.
但是在使用其更高级别的流DSL API时如何控制是否提交消息?
But how to control whether commit the message when use its higher-level stream DSL API?
资源:
http://docs.confluent.io/2.1. 0-alpha1/streams/developer-guide.html
推荐答案
您的陈述并不完全正确.您无法控制是否提交"-至少不能直接(既不在Processor API中也不在DSL中).您只能使用ProcessorContext#commit()
请求其他提交.因此,在调用#commit()
之后,Streams会尝试尽快提交,但这不是立即提交.此外,即使您从不调用#commit()
,流也将自动提交.您可以通过Streams配置commit.interval.m
控制Streams提交间隔(请参见 http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application )
Your statement is not completely true. You cannot "control to commit or not" -- at least not directly (neither in Processor API nor in DSL). You can only use ProcessorContext#commit()
to request additional commits. Thus, after a call to #commit()
Streams tries to commit as soon as possible, but it's not an immediate commit. Furthermore, Streams will commit automatically even if you never call #commit()
. You can control Streams commit interval via Streams configuration commit.interval.m
(cf. http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)
在出现问题"的情况下,这取决于您对问题的反应方式:
In case of a "problem", it depends on the type of problem you have how to respond to it:
- 如果您发现无法解决的问题,则只能抛出异常并停止运行"(参见下文).
- 如果您有可恢复的错误,则需要在自己的代码中循环"(例如,在
Processor#process()
或KeyValueMapper#apply()
中),直到问题解决并且可以成功处理当前消息为止(注意,您可能使用此策略会遇到超时(即异常)的情况,请参阅-消费者配置heartbeat.interval.ms
并针对0.10.1session.timeout.ms
[KIP-62] ) - 一种替代方法是,将当前无法处理的记录放入
StateStore
中,然后再进行处理.但是,很难正确,并且还会打破一些Streams假设(例如,处理顺序).不建议使用,如果使用,则必须非常小心其含义
- if you detect a problem you cannot recover from, you can only throw an exception and "stop the world" (cf. below).
- if you have a recoverable error, you need to "loop" within your own code (e.g., within
Processor#process()
orKeyValueMapper#apply()
until the problem got resolved and you can successfully process the current message (note, that you might run into a timeout, ie, exception, using this strategy -- cf. consumer configsheartbeat.interval.ms
and for 0.10.1session.timeout.ms
[KIP-62]) - an alternative would be, to put records that cannot be processed right now into an
StateStore
and process them later on. However, it's hard to get right and also breaks a few Streams assumptions (eg, processing order). It's not recommended to use, and if used, you must be very carefully about the implications
如果有未捕获的异常,StreamThread
将死亡,并且不会发生提交(您可以注册异常处理程序以获取有关以下内容的通知: http://docs.confluent.io/current/streams/developer- guide.html#using-kafka-streams-in-your-application-code .如果所有StreamThread
都已死亡,则需要创建一个KafkaStreams
的新实例来重新启动应用程序.
If there is an uncaught exception, StreamThread
will die and no commit happens (you can register an exception handler to get notified about this: http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code. If all you StreamThread
died, you will need to create a new instance of KafkaStreams
to restart you application.
在成功处理一条消息之前,您不得从用户代码中返回,因为如果返回,则Streams会假定该消息已成功处理(因此可能会提交相应的偏移量).关于要点(3),将记录放入特殊的StateStore中以供以后处理被认为是成功"处理的记录.
这篇关于使用Kafka Streams DSL时如何处理错误且不提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!