本文介绍了使用 Kafka Streams DSL 时如何处理错误和不提交的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制提交与否.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka 会多次重新传递此消息,直到问题得到解决.

For Kafka Streams, if we use lower-level processor API, we can control to commit or not. So if problems happens in our code, and we don't want to commit this message. In this case, Kafka will redeliver this message multiple times until the problem gets fixed.

但是在使用其更高级别的流 DSL API 时如何控制是否提交消息?

But how to control whether commit the message when use its higher-level stream DSL API?

资源:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

推荐答案

你的说法并不完全正确.您不能控制提交与否"——至少不能直接(无论是在处理器 API 中还是在 DSL 中).您只能使用 ProcessorContext#commit() 请求附加提交.因此,在调用 #commit() 之后,Streams 会尝试尽快提交,但这不是立即提交.此外,即使您从不调用 #commit(),Streams 也会自动提交.您可以通过 Streams 配置 commit.interval.m 控制 Streams 提交间隔(参见 http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

Your statement is not completely true. You cannot "control to commit or not" -- at least not directly (neither in Processor API nor in DSL). You can only use ProcessorContext#commit() to request additional commits. Thus, after a call to #commit() Streams tries to commit as soon as possible, but it's not an immediate commit. Furthermore, Streams will commit automatically even if you never call #commit(). You can control Streams commit interval via Streams configuration commit.interval.m (cf. http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

如果出现问题",这取决于您如何应对问题的类型:

In case of a "problem", it depends on the type of problem you have how to respond to it:

  • 如果您检测到无法恢复的问题,您只能抛出异常并停止世界"(参见下文).
  • 如果您有可恢复的错误,则需要在您自己的代码中循环"(例如,在 Processor#process()KeyValueMapper#apply() 中,直到问题得到解决,您可以成功处理当前消息(注意,您可能会遇到超时,即异常,使用此策略 - 参见消费者配置 heartbeat.interval.ms 和0.10.1 session.timeout.ms [KIP-62])
  • 另一种方法是,将现在无法处理的记录放入 StateStore 并在以后处理它们.然而,这很难做到正确并且也打破了一些 Streams 假设(例如,处理顺序).不推荐使用,如果使用了,一定要慎重考虑
  • if you detect a problem you cannot recover from, you can only throw an exception and "stop the world" (cf. below).
  • if you have a recoverable error, you need to "loop" within your own code (e.g., within Processor#process() or KeyValueMapper#apply() until the problem got resolved and you can successfully process the current message (note, that you might run into a timeout, ie, exception, using this strategy -- cf. consumer configs heartbeat.interval.ms and for 0.10.1 session.timeout.ms [KIP-62])
  • an alternative would be, to put records that cannot be processed right now into an StateStore and process them later on. However, it's hard to get right and also breaks a few Streams assumptions (eg, processing order). It's not recommended to use, and if used, you must be very carefully about the implications

如果有一个未捕获的异常StreamThread 就会死掉并且不会发生任何提交(你可以注册一个异常处理程序来得到这个通知:http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code.如果你的 StreamThread 都死了,你需要创建一个 StreamThread 的新实例code>KafkaStreams 重新启动您的应用程序.

If there is an uncaught exception, StreamThread will die and no commit happens (you can register an exception handler to get notified about this: http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code. If all you StreamThread died, you will need to create a new instance of KafkaStreams to restart you application.

在成功处理消息之前,您不能从用户代码返回,因为如果返回,Streams 假定消息已成功处理(因此可能会提交相应的偏移量).关于要点 (3),将记录放入特殊的 StateStore 以供以后处理被视为成功"处理的记录.

这篇关于使用 Kafka Streams DSL 时如何处理错误和不提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 15:35