侦听器中的错误处理

侦听器中的错误处理

本文介绍了事务性 Kafka 侦听器中的错误处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题遵循我在请求/回复和重试策略上的帖子Kafka 侦听器,但在事务性 Kafka 侦听器的上下文中(因此,当前的实现类似于提议的解决方案).基本上,这个想法是能够支持完整的错误管理,即根据异常的类型,重试 X 次记录或将其发送到死信主题以在 Kafka Listener 中引发异常,并用 @Transactional 标记.当我将 errorHandler 参数指定给我的 @KafkaListener 时,我可以看到它第一次通过我的逻辑,但是在发送到死信主题之后(并返回我的自定义响应,以防万一@SendTo),它会回滚事务并重试处理我的记录,如 DefaultAfterRollbackProcessor 的 BackOff 时间段所定义.如果异常已被正确处理,然后继续进行下一个事务,是否有任何方法可以防止这些重试?

This question follows my post on Request/Reply and Retry Policy for Kafka Listeners but in the context of Transactional Kafka Listeners (the current implementation is therefore similar to the proposed solution).Basically, the idea is to be able to support a complete error management which is, based on the type of exception, either retry X times the record or send it to a dead letter topic for exception raised inside a Kafka Listener tagged with @Transactional.When I specify the errorHandler parameter to my @KafkaListener, I can see that it goes through my logic for the first time but then, after sending to the dead letter topic (and returning my custom response in case of @SendTo), it rolls back the transaction and retries to process my record as defined by the BackOff period of the DefaultAfterRollbackProcessor.Is there anyway to prevent these retries in the case the exception has been properly handled and then just carry on with the next transaction ?

以下是我按照上述链接中的解决方案建议定义的各种处理程序:

Here are my various handlers defined as suggested by the solution in the above link:

    @Bean
    public ErrorHandler errorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler) {
        //set with retry policy higher than KafkaListenerErrorHandler
        return new SeekToCurrentErrorHandler((data, thrownException) -> {
                deadLetterQueueHandler.send(data, thrownException);
        }, new FixedBackOff(15000, 20));
    }

    @Bean
    public AfterRollbackProcessor<?, ?> afterRollbackProcessor(MyDeadLetterQueueHandler deadLetterQueueHandler) {
        //set with retry policy higher than KafkaListenerErrorHandler
        final var afterRollbackProcessor = new DefaultAfterRollbackProcessor<Object, Object>(((data, thrownException) -> {
                deadLetterQueueHandler.send(data, thrownException);
        }, new FixedBackOff(15000, 20));
        afterRollbackProcessor.setCommitRecovered(true);
        return afterRollbackProcessor;
    }


    @Primary
    KafkaListenerErrorHandler kafkaListenerErrorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler,
                                                        MyExceptionHandler exceptionHandler) {
        return (message, exception) -> {
            final var cause = (Exception) exception.getCause();
            final var consumerRecord = message.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
            if (shouldGoToDLT(cause)) {
                sendToDeadLetterTopic(deadLetterQueueHandler, consumerRecord, cause);
                return new CustomResponse(cause.getMessage());
                // should end transaction rollback and go to next transaction
            } else {
                // retry 10 times before killing the app
                var deliveryAttempt = message.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class);
                if (deliveryAttempt > 10) {
                    exceptionHandler.handle(cause);
                }
            }
            throw exception;
        };
    }

以及我从使用 EmbeddedKafkaBroker 的测试中获得的日志,并在@Transactional Kafka 侦听器中抛出异常:

and the logs that I get from my test using EmbeddedKafkaBroker and throwing an exception in a @Transactional Kafka Listener:

2021-07-01 17:12:34.791  INFO [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:34.812 ERROR [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
    ... 11 common frames omitted

2021-07-01 17:12:34.918  INFO [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-consumer-group-3, groupId=consumer-group] Seeking to offset 0 for partition rollback-db-employee-topic-0
2021-07-01 17:12:34.929  INFO [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:34.931 ERROR [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
    ... 11 common frames omitted

2021-07-01 17:12:35.034  INFO [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-consumer-group-3, groupId=consumer-group] Seeking to offset 0 for partition rollback-db-employee-topic-0
2021-07-01 17:12:35.445  INFO [,0beec62e5e3dbb97,4ef5c58e90699a09] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:35.448 ERROR [,0beec62e5e3dbb97,4ef5c58e90699a09] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
    ... 11 common frames omitted

…

感谢您的帮助.

编辑:这是我的听众:

    @KafkaListener(topics = "user-topic", groupId = "consumer-group-1", errorHandler ="errorHandler")
    @Transactional
    public void onReceive(User command) {
        // update database
        userRepository.save(command);

        switch (command.getName()) {
            case "GOTODLT":
                var volunteerArithmeticException = 7 / 0;
                break;
            case "SHOULDRETRY":
                throw new IllegalStateException("Should be retried 10 times");
        }
    }

推荐答案

不清楚为什么要使用 @Transactional,因为您使用的是 ChainedKafkaTransactionManager(这是顺便说一下,已弃用;请参阅父类 javadocs).可以使用它,只要您了解其限制.

It is not clear why you are using @Transactional since you are using a ChainedKafkaTransactionManager (which is deprecated by the way; see the parent class javadocs). It is ok to use it, as long as you are aware of the limitations.

事务已经由事务管理器启动,所以不需要注解.

The transactions have already been started by the transaction managers, so the annotation is not needed.

由于您的侦听器方法包含在事务拦截器中,因此在调用侦听器错误处理程序之前会回滚事务.

Since your listener method is wrapped in a transaction interceptor, the transaction is rolled back before your listener error handler is invoked.

因此 Transaction 静默回滚,因为它已被标记为仅回滚 消息.

删除注释,它应该可以按预期工作.

Remove the annotation and it should work as you expect.

您不应同时配置 STCEH 和 ARP.前者在事务内部运行,后者在回滚后运行.

You should not configure both a STCEH and ARP. The former runs inside the transaction, the latter after a rollback.

这篇关于事务性 Kafka 侦听器中的错误处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 17:06