在使用spring-kafka 2.1.x的Spring Boot 2.0应用程序中实现死信队列(DLQ)概念的最佳方法是,将某些Bean的 @KafkaListener无法处理的所有消息发送给某些bean预定义的Kafka DLQ主题并且不会丢失单个消息?

因此,消耗的卡夫卡记录是:

  • 成功处理,
  • 无法处理,并被发送到DLQ主题
  • 无法处理,未发送到DLQ主题(由于意外问题),因此将被监听器再次使用。

  • 我试图使用 ErrorHandler 的自定义实现创建监听器容器,但使用KafkaTemplate无法将发送记录处理为DLQ主题。使用禁用的自动提交和 RECORD AckMode。
    spring.kafka.enable-auto-ack=false
    spring.kafka.listener.ack-mode=RECORD
    
    @Configuration
    public class KafkaConfig {
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
            ...
            factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
            return factory;
        }
    }
    
    @Component
    public class DlqErrorHandler implements ErrorHandler {
    
        @Autowired
        private KafkaTemplate<Object, Object> kafkaTemplate;
    
        @Value("${dlqTopic}")
        private String dlqTopic;
    
        @Override
        public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
            log.error("Error, sending to DLQ...");
            kafkaTemplate.send(dlqTopic, record.key(), record.value());
        }
    }
    

    似乎此实现不能保证#3 项目。如果将在DlqErrorHandler中引发异常,则记录将不再被监听器使用。

    使用事务监听器容器会有所帮助吗?
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
    

    是否有使用Spring Kafka实现DLQ概念的便捷方法?

    更新2018年3月28日

    多亏了加里·罗素(Gary Russell)的回答,我能够通过实现DlqErrorHandler如下实现预期的行为
    @Configuration
    public class KafkaConfig {
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
            ...
            factory.getContainerProperties().setAckOnError(false);
            factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
            return factory;
        }
    }
    
    @Component
    public class DlqErrorHandler implements ContainerAwareErrorHandler {
        ...
        @Override
        public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            Consumerrecord<?, ? record = records.get(0);
            try {
                kafkaTemplate.send("dlqTopic", record.key, record.value());
                consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
                // Other records may be from other partitions, so seek to current offset for other partitions too
                // ...
            } catch (Exception e) {
                consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                // Other records may be from other partitions, so seek to current offset for other partitions too
                // ...
                throw new KafkaException("Seek to current after exception", thrownException);
            }
        }
    }
    

    这样,如果消费者调查返回3条记录(1、2、3),而第二条记录将无法处理:
  • 1将被处理
  • 2将无法处理并发送到DLQ
  • 3感谢消费者寻求record.offset()+ 1,它将被传递给监听器

  • 如果发送给DLQ失败,则消费者寻求到record.offset(),并且记录将重新传递给监听器(发送给DLQ的消息可能会被淘汰)。

    最佳答案

    参见 SeekToCurrentErrorHandler

    发生异常时,它将寻找使用者,以便所有未处理的记录都在下一次轮询时重新发送。

    您可以使用相同的技术(例如,子类)写入DLQ,如果DLQ写入失败,则查找当前偏移量(以及其他未处理的偏移量);如果DLQ写入成功,则仅查找其余记录。

    10-08 18:59