本文介绍了使用springboot在KafkaConsumer中反序列化kafka消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个springboot应用程序,可监听kafka消息并将其转换为对象
I have a springboot app that listen kafka messages and convert them to object
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
Hostel hostel = objectMapper.readValue(message, Hostel.class);
}
如果可以直接做ti,我会花时间
I woder if it is possible to do ti directly
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(Hostel hostel) throws IOException {
}
推荐答案
您可以使用 spring-kafka
进行操作.但是随后您需要使用自定义反序列化器(或 JsonDeserializer
)在容器工厂中
You can do it using spring-kafka
. But then you need to use a custom deserializer (or a JsonDeserializer
) in the container factory
@KafkaListener(topics = "test", groupId = "my.group", containerFactory = "myKafkaFactory")
fun genericMessageListener(myRequest: MyRequest, ack: Acknowledgment) {
//do Something with myRequest
ack.acknowledge()
}
您的ContainerFactory看起来像
Your ContainerFactory will look something like
@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> {
val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
return factory
}
您的反序列化器外观如下
Your Deserialiser will look like
public class MyRequestDeserializer implements Deserializer {
private static ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map map, boolean b) {
}
@Override
public MyRequest deserialize(String arg0, byte[] msgBytes) {
try {
return objectMapper.readValue(new String(msgBytes), MyRequest.class);
} catch (IOException ex) {
log.warn("JSON parse/ mapping exception occurred. ", ex);
return new MyRequest();
}
}
@Override
public void close() {
log.debug("MyRequestDeserializer closed");
}
}
或者,您可以使用春季文档
这篇关于使用springboot在KafkaConsumer中反序列化kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!