本文介绍了Spring Kafka@DltHandler注释方法在非阻塞重试实现中未正确接收标头的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试用Spring Kafka实现一个非阻塞的Retries。根据文档here,在完成在@KafkaListnener中设置的所有尝试之后,我们可以设置一个处理程序方法来处理来自DLT主题的消息。我打算捕获DLT处理程序方法上的一些标头,如以下代码所示:
@DltHandler
fun processaDlt(
@Payload mensagem: String,
@Header("event") eventName: String,
@Header(KafkaHeaders.ORIGINAL_OFFSET) offset: String,
@Header(KafkaHeaders.EXCEPTION_FQCN) descException: String,
@Header(KafkaHeaders.EXCEPTION_STACKTRACE) stacktrace: String,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) errorMessage: String
) {
但是,有些标头没有正确发送,或者根本没有发送。我已经尝试了一些标头的值,比如KafkaHeaders.DLT_ORIGING_OFFSET、KafkaHeaders.OFFSET等。我在Spring Kafka代码中看到,一些标头以字符串";kafka_";为前缀,并且我在重试失败后转发到xpto-DLT主题的消息上看到了这些值,但一些标头的值被截断,如:
我的Listen方法使用原始消息,代码如下:
@KafkaListener(topics = xpto-topic, groupId = my-group-to-xpto)
fun listen(@Payload mensagem: String,
@Header("event") event: String,
@Header(KafkaHeaders.OFFSET) offset: Long,
@Header(KafkaHeaders.CONSUMER) consumer: KafkaConsumer<String, String>,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) timestamp: Long
) {
毕竟,使用@DltHandler注释的方法可以接受哪些标头?为什么某些值的值被截断?
操作:
- 上面的代码是用带有Spring Boot的Kotlin编写的
- 我使用的是Spring Boot 2.5.4-Release和:
推荐答案
这些标头由byte[]
转换而来。
偏移量似乎存在转换问题-当我将参数声明为long
时,它返回零。
这对我来说很好...
@RetryableTopic(attempts = "1")
@KafkaListener(id = "so69229529", topics = "so69229529")
void listen(String in) {
throw new RuntimeException();
}
@DltHandler
void handler(Message<?> msg,
@Header(KafkaHeaders.ORIGINAL_OFFSET) byte[] offset,
@Header(KafkaHeaders.EXCEPTION_FQCN) String descException,
@Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stacktrace,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
System.out.println(msg);
System.out.println(ByteBuffer.wrap(offset).getLong());
System.out.println(descException);
System.out.println(stacktrace);
System.out.println(errorMessage);
}
4
org.springframework.kafka.listener.ListenerExecutionFailedException
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.TimestampedException: Listener method 'void com.example.demo.So69229529Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'void com.example.demo.So69229529Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException
...
Caused by: java.lang.RuntimeException
...
Listener failed; nested exception is org.springframework.kafka.listener.TimestampedException: Listener method 'void com.example.demo.So69229529Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'void com.example.demo.So69229529Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException
https://github.com/spring-projects/spring-kafka/issues/1951
这篇关于Spring Kafka@DltHandler注释方法在非阻塞重试实现中未正确接收标头的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!