本文介绍了如何从 Camel 以事务方式轮询 Kafka?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在研究基于kafka并由camel和Spring管理的消息总线.我有一个 XML 路由定义来轮询事件并从如下所示的外部 API 检索相应的完整业务对象:

I'm currently working on message bus based on kafka and managed by camel and Spring. I have a XML route definition to poll events and retrieve the corresponding complete business objects from an external API that looks like that :

`

<route id="station-event-enrich-route" autoStartup="true" >
        <from
            uri="kafka:{{kafka.cluster.url}}?brokers={{kafka.cluster.url}}&amp;topic={{events.topic.name}}&amp;autoCommitEnable=false&amp;allowManualCommit=true&amp;maxPollRecords={{station.brocker.bulk.limit}}&amp;groupId={{kafka.groupId}}" />

        <!-- SNIP logic to aggregate several events -->

        <pollEnrich strategyRef="keepHeadersAggregationStrategy">
            <simple>{{api.url}}?view=full&amp;id=$simple{in.headers.BUSINESS_ID}</simple>
        </pollEnrich>

        <!-- SNIP logic to split the retrieved events according to their ids -->

        <to uri="velocity:velocity/resource-object.vm"/>

        <removeHeaders pattern="*" excludePattern="MANUAL_COMMIT"/>

        <to uri="kafka:{{kafka.cluster.url}}?brokers={{kafka.cluster.url}}&amp;topic={{objects.topic.name}}&amp;groupId={{kafka.groupId}}&amp;requestRequiredAcks=all" />

        <transform>
            <simple>${headers.MANUAL_COMMIT.commitSync()}</simple>
        </transform>
</route>

`我的问题如下:轮询 kafka 事件主题时,如果我的 pollEnrich 中的 api.url 不可用,则不会检索到业务对象并且事件丢失.所以我需要实现一个事务逻辑,以便能够在我的路由中回滚初始 kafka 轮询,以便可以多次轮询同一个事件,直到 api.url 向我发送等待的业务对象.

`My problem is the following : when the kafka event topic is polled, and if the api.url in my pollEnrich is not available, no business object is retrieved and the event is lost. So I need to implement a transactional logic to be able to rollback on the initial kafka polling in my route so that the same event could be polled several times till the api.url send me the awaited business object.

我尝试了几种方法,从将我的 org.apache.camel:camel-kafka 版本更新到 2.22.0 开始,以便能够进行手动提交.然后,我尝试实现一个基本的错误处理程序(配置为 maximumRedeliveries=-1 以进行无限重试),以便当 pollEnrich 触发 onException 时,我可以设置一个标头以避免执行最终的手动提交.显然,它有效,但我的事件再也不会被重新轮询.

I tried several approaches, starting from updating my version of org.apache.camel:camel-kafka to 2.22.0 to be able to play with the manual commit. Then, I tried to implement a basic error handler (configured with maximumRedeliveries=-1 to have infinite retries) so that when the pollEnrich triggers a onException, I can set a header to avoid doing the final manual commit. It works, apparently, but my event is never repolled again.

我还尝试将事务标记与 spring-kafka 的 org.springframework.kafka.transaction.KafkaTransactionManager 实例一起使用,但这不是好方法,因为只有生产者是事务性的.

I also tried to use the transacted tag with a org.springframework.kafka.transaction.KafkaTransactionManager instance from spring-kafka, but it's not the good approach as only the producers are transactional.

我缺少什么,正确的方法是什么?

What I am missing, and what is the correct approach ?

我使用 Java 8、Camel 2.22.0 和 Spring 4.3.18.RELEASE(不推荐,但应该可以).

I use Java 8, Camel 2.22.0 and Spring 4.3.18.RELEASE (not recommended but it should work).

推荐答案

它看起来像是 Camel 中一个相对较新的功能来支持 Kafka 手动提交.而且文档也不是特别清楚.我正在使用 Camel 2.22.1.

It looks like a relatively new feature in Camel to support Kafka manual commits. And the documentation wasn't particularly clear. I'm using Camel 2.22.1.

根据您的问题描述,您正在寻找至少一次"语义.也就是说,您希望能够在出现问题时重新处理消息.当然,这种方法的结果是在应用程序可以成功处理它之前,无法处理(或看到)分区中带有失败消息的其他消息.在服务失败的情况下,这可能会导致给定主题的所有分区被阻塞,直到服务备份.

From the description of your problem, you are looking for "at least once" semantics. That is you want to be able to re-process a message when there was an issue. Of course the result of this approach is that no other messages in the partition w/ a failing message can be processed (or seen) until the application can successfully process it. In the case of a failing service, this would likely result in all partitions for a given topic being blocked until the service is back up.

Kafka uri 使其工作如下所示:kafka:TestLog?brokers=localhost:9092&groupId=kafkaGroup&maxPollRecords=3&consumersCount=1&autoOffsetReset=earliest&autoCommitEnable=false&allowManualCommit=true&breakOnFirstError=true

The Kafka uri to get this to work would look like this:kafka:TestLog?brokers=localhost:9092&groupId=kafkaGroup&maxPollRecords=3&consumersCount=1&autoOffsetReset=earliest&autoCommitEnable=false&allowManualCommit=true&breakOnFirstError=true

稍微分解一下:

  • kafka:TestLog : 指定要消费的 Kafka 主题
  • brokers=localhost:9092 :指定 Kafka 集群的引导服务器
  • groupId=kafkaGroup :指定Kafka消费者组
  • consumersCount=1 :指定该 Camel 路由的 Kafka 消费者数量
  • kafka:TestLog : specifies the Kafka topic to consume from
  • brokers=localhost:9092 : specifies the bootstrap servers for Kafka cluster
  • groupId=kafkaGroup : specifies the Kafka consumer group
  • consumersCount=1 : specifies the number of Kafka consumers for that Camel route

从具有多个分区的 Kafka 主题消费时,最后两个配置设置很重要.它们需要进行调整/配置,以便将您计划运行的 Camel 实例数量考虑在内.

The last two configuration settings are important when consuming from a Kafka topic with a number of partitions. They need to be tuned/configured so that they are taking into account the number of Camel instances you are planning to run.

获得至少一次"语义的更有趣的配置:

The more interesting configuration to get to "at least once" semantics:

  • autoCommitEnable=false :关闭偏移量的自动提交,以便我们可以使用手动提交.
  • allowManualCommit=true :打开手动提交,让我们可以访问 KafkaManualCommit 功能(见下面的代码).
  • breakOnFirstError=true :当此值为 true 时,路由将停止处理上次轮询主题时收到的批处理中的其余消息.
  • maxPollRecords=3 :指定在 Kafka 主题的单次轮询期间消耗的消息数.将此设置为较低的数字可能是一个好主意,因为问题与批处理中的消息将导致批处理中的所有消息都被重新处理.
  • autoOffsetReset=earliest :当当前偏移量与标记分区结束的偏移量之间存在差异时,将导致消费者从最早的偏移量读取(稍后会详细介绍).
  • autoCommitEnable=false : turn off auto committing of offsets so we can use manual commits.
  • allowManualCommit=true : turn on manual commits, giving us access to the KafkaManualCommit capability (see code below).
  • breakOnFirstError=true : when this is true, the route will stop processing the rest of the messages in the batch received on last poll of the topic.
  • maxPollRecords=3 : specifies the number of messages consumed during a single poll of the Kafka topic.It is probably is a good idea to keep this set to a low number, since issues w/ a message in the batch would cause all of the messages in the batch to be re-processed.
  • autoOffsetReset=earliest : will cause the consumer to read from the earliest offset when there is a difference between the current offset and the offset marking the end of the partition (more on that in a bit).

Camel 路线如下所示:

The Camel route would look something like this:

      from(kafkaUrl)
        .routeId("consumeFromKafka")
        .process(exchange -> {
            LOGGER.info(this.dumpKafkaDetails(exchange));
        })
        .process(exchange -> {
            // do something
        })
        .process(exchange -> {
            // do something else
        })
        .process(exchange -> {
            exchange.setProperty(Exchange.FILE_NAME, UUID.randomUUID().toString() + ".txt");
        })
        .to("file://files")
        // at the end of the route
        // manage the manual commit
        .process(exchange -> {
            // manually commit offset if it is last message in batch
            Boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);

            if (lastOne) {
                KafkaManualCommit manual =
                        exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
                if (manual != null) {
                    LOGGER.info("manually committing the offset for batch");
                    manual.commitSync();
                }
            } else {
                LOGGER.info("NOT time to commit the offset yet");
            }
        });

运行此路由并得到错误后,您可以使用以下命令查看消费者组的状态:

After running this route and getting an error you can see the state of the consumer group with this command:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group kafkaGroup --describe

可能会产生这样的结果:

that might yield this result:

主题分区电流偏移日志结束偏移滞后
测试日志 0 92 95 3

这是 autoOffsetReset 设置发挥作用的地方.当前偏移量是消费者组想要消费的地方.如果该偏移量 (92) 是错误消息,则该组将随着更多消息(在本例中为另外两条)的添加而落后.路由(使用给定的设置)将导致 Camel 继续处理偏移量 92 处的消息,直到成功为止.如果 Camel 路由停止并启动,应用程序将从 earliest 偏移量(92)而不是 latest 开始消费,后者将是基于 的 95自动偏移重置.使用 latest 会导致丢失"消息,因为重新启动 Camel 会使用最新的偏移量开始处理.

This is where the autoOffsetReset setting comes into play. The current offset is where the consumer group wants to consume from. If that offset (92) is the error message, then the group will fall behind as more messages (in this case two more) are added. The route (using the given settings) will cause Camel to continually process the message at offset 92 until it succeeds. If the Camel route is stopped and started, the application would pick up consuming from the earliest offset (the 92) and not the latest which would be 95 based on autoOffsetReset. Using latest would result in "lost" messages, because a restart of Camel would start processing using latest offset.

一个示例应用程序可用这里

这篇关于如何从 Camel 以事务方式轮询 Kafka?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 17:06