本文介绍了如何使用没有@KafkaHandler作为记录值的类级别@KafkaListener处理Kafka记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

通常,当我们定义类级别的 @KafkaListener 和方法级别的 @KafkaHandler s时,我们可以定义默认的 @KafkaHandler 来处理意外的有效载荷.

Normally, when we define a class-level @KafkaListener and method level @KafkaHandlers, we can define a default @KafkaHandler to handle unexpected payloads.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener

但是,如果没有默认方法,该怎么办?

But, what should we do if we don't have a default method?

推荐答案

在2.6版和更高版本中,您可以配置 SeekToCurrentErrorHandler ,以通过检查异常立即将此类消息发送给死信主题

With version 2.6 and later, you can configure a SeekToCurrentErrorHandler to immediately send such messages to a dead letter topic, by examining the exception.

这里是一个简单的Spring Boot应用程序,演示了该技术:

Here is a simple Spring Boot application that demonstrates the technique:

@SpringBootApplication
public class So59256214Application {

    public static void main(String[] args) {
        SpringApplication.run(So59256214Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so59256214").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so59256214.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so59256214.DLT", topics = "so59256214.DLT")
    void listen(ConsumerRecord<?, ?> in) {
        System.out.println("dlt: " + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
        return args -> {
            template.send("so59256214", 42);
            template.send("so59256214", 42.0);
            template.send("so59256214", "No handler for this");
        };
    }

    @Bean
    ErrorHandler eh(KafkaOperations<String, Object> template) {
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template));
        BackOff neverRetryOrBackOff = new FixedBackOff(0L, 0);
        BackOff normalBackOff = new FixedBackOff(2000L, 3);
        eh.setBackOffFunction((rec, ex) -> {
            if (ex.getMessage().contains("No method found for class")) {
                return neverRetryOrBackOff;
            }
            else {
                return normalBackOff;
            }
        });
        return eh;
    }

}

@Component
@KafkaListener(id = "so59256214", topics = "so59256214")
class Listener {

    @KafkaHandler
    void integerHandler(Integer in) {
        System.out.println("int: " + in);
    }

    @KafkaHandler
    void doubleHandler(Double in) {
        System.out.println("double: " + in);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

结果:

int: 42
double: 42.0
dlt: ConsumerRecord(topic = so59256214.DLT, ...

这篇关于如何使用没有@KafkaHandler作为记录值的类级别@KafkaListener处理Kafka记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-19 04:27