问题描述
通常,当我们定义类级别的 @KafkaListener
和方法级别的 @KafkaHandler
s时,我们可以定义默认的 @KafkaHandler
来处理意外的有效载荷.
Normally, when we define a class-level @KafkaListener
and method level @KafkaHandler
s, we can define a default @KafkaHandler
to handle unexpected payloads.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener
但是,如果没有默认方法,该怎么办?
But, what should we do if we don't have a default method?
推荐答案
在2.6版和更高版本中,您可以配置 SeekToCurrentErrorHandler
,以通过检查异常立即将此类消息发送给死信主题
With version 2.6 and later, you can configure a SeekToCurrentErrorHandler
to immediately send such messages to a dead letter topic, by examining the exception.
这里是一个简单的Spring Boot应用程序,演示了该技术:
Here is a simple Spring Boot application that demonstrates the technique:
@SpringBootApplication
public class So59256214Application {
public static void main(String[] args) {
SpringApplication.run(So59256214Application.class, args);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so59256214").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so59256214.DLT").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so59256214.DLT", topics = "so59256214.DLT")
void listen(ConsumerRecord<?, ?> in) {
System.out.println("dlt: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
return args -> {
template.send("so59256214", 42);
template.send("so59256214", 42.0);
template.send("so59256214", "No handler for this");
};
}
@Bean
ErrorHandler eh(KafkaOperations<String, Object> template) {
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template));
BackOff neverRetryOrBackOff = new FixedBackOff(0L, 0);
BackOff normalBackOff = new FixedBackOff(2000L, 3);
eh.setBackOffFunction((rec, ex) -> {
if (ex.getMessage().contains("No method found for class")) {
return neverRetryOrBackOff;
}
else {
return normalBackOff;
}
});
return eh;
}
}
@Component
@KafkaListener(id = "so59256214", topics = "so59256214")
class Listener {
@KafkaHandler
void integerHandler(Integer in) {
System.out.println("int: " + in);
}
@KafkaHandler
void doubleHandler(Double in) {
System.out.println("double: " + in);
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
结果:
int: 42
double: 42.0
dlt: ConsumerRecord(topic = so59256214.DLT, ...
这篇关于如何使用没有@KafkaHandler作为记录值的类级别@KafkaListener处理Kafka记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!