本文介绍了如何使用 Header 从一条 Kafka 消息中获取多个对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从一条消息(来自同一主题)中反序列化不同的对象,并根据对象类型将其保存/更新到数据库中的相应表.正如加里提到的这里我可以使用标题来提供我的对象必须反序列化为哪种类型的信息.您能否提供有关如何实现这一目标的示例?

解决方案

参见 sample-02 此处.

示例 2

这个示例演示了一个简单的生产者和一个多方法的消费者;生产者发送 Foo1Bar1 类型的对象,消费者接收 Foo2Bar2 类型的对象(对象具有相同的字段,foo).

生产者使用一个JsonSerializer;消费者使用 ByteArrayDeserializerByteArrayJsonMessageConverter 一起转换为侦听器方法参数的所需类型.在这种情况下我们无法推断类型(因为类型用于选择要调用的方法).因此,我们在生产者和消费者端配置类型映射.请参阅生产者端的 application.yml 和消费者端的 converter bean.

MultiMethods @KafkaListener 有 3 个方法;一个用于每个已知对象,一个用于其他对象的后备默认方法.

运行应用程序并使用 curl 发送一些数据:

$ curl -X POST http://localhost:8080/send/foo/bar

$ curl -X POST http://localhost:8080/send/bar/baz

$ curl -X POST http://localhost:8080/send/unknown/xxx

控制台:

收到:Foo2 [foo=bar]

收到:Bar2 [bar=baz]

收到未知:xxx

I want to deserialize different objects from one message (from the same topic), and according to object type save/update it to the appropriate table in DB. As Gary mentioned here I can use headers to provide information to which type my object must be deserialized. Can you help with examples of how to achieve this?

解决方案

See sample-02 here.

这篇关于如何使用 Header 从一条 Kafka 消息中获取多个对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 10:40