问题描述
我正在尝试访问某些标头,同时以批处理方式使用消息.如果将侦听器设置为处理 Message<?>
,则可以手动提取标头
I am trying to access some headers whilst consuming messages in batch mode.If I setup the listener to handle Message<?>
I can manually extract the headers
@KafkaListener(topics = "${kafka.topic}")
public void receive (List<Message<?> data, Acknowledgment ack) throws SQLException {
for (int i = 0; i < data.size(); ++) {
Object message = data.get(i).getPayload();
MessageHeaders mh = data.get(i).getHeaders();
Object value = mh.get("test");
我想为我做一些这样的事情,但是当我尝试
I would like to have some of this done for me but when I try
@KafkaListener(topics = "${kafka.topic}")
public voic receive (List<string> data,
@Header (KafkaHeaders.OFFSET) List<Integer> offsets,
@Header ("test") List<String> testHeaders,
Acknowledgment ack) throws SQLException {
我得到 MessageHandlingException:方法参数类型[interface.java.util.list]缺少标头'test'
但是,此方法对于offset标头可以正常工作.
I get MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list]
This method however works fine for the offset header.
这是因为存在内置代码来处理标准标头,并且该方法不能用于自定义标头,还是我错过了一些使该方法起作用的东西?
Is this because there is inbuilt code to handle the standard headers and this approach cannot be used for custom ones or have I missed something that would make this approach work?
推荐答案
是的,该框架仅映射它知道的标头.它将所有其他映射的标头放入
Yes, the framework only maps the headers it knows about; it puts all other mapped headers into
/**
* The header for a list of Maps of converted native Kafka headers. Used for batch
* listeners; the map at a particular list position corresponds to the data in the
* payload list position.
*/
public static final String BATCH_CONVERTED_HEADERS = PREFIX + "batchConvertedHeaders";
如果您想要该标头的离散映射,则需要创建一个自定义的 BatchMessageConverter
-可能是 BatchMessagingMessageConverter
的子类.
If you want a discrete mapping for that header, you would need to create a custom BatchMessageConverter
- probably a subclass of BatchMessagingMessageConverter
.
最简单的方法可能是覆盖此方法,调用 super.toMessage()
然后添加标题.
Probably the easiest would be to override this method, call super.toMessage()
then add your headers.
return MessageBuilder.fromMessage(super.toMessage(...))
.setHeader("test", ...)
.build();
如果您使用的是Spring Boot,只需将转换器添加为Bean,然后Boot会将其插入.
If you are using Spring Boot, just add the converter as a bean and boot will wire it in.
否则将转换器添加到容器工厂.
Otherwise add the converter to the container factory.
编辑
如果消息转换器没有头映射器;所有标头都放在标头 KafkaHeaders.NATIVE_HEADERS
中,该标头是 List< Headers>
.
If the message converter has no header mapper; all headers are put in the header KafkaHeaders.NATIVE_HEADERS
which is a List<Headers>
.
这篇关于Kafka标头以批处理方式显示为列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!