本文介绍了Kafka标头以批处理方式显示为列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试访问某些标头,同时以批处理方式使用消息.如果将侦听器设置为处理 Message<?> ,则可以手动提取标头

I am trying to access some headers whilst consuming messages in batch mode.If I setup the listener to handle Message<?> I can manually extract the headers

@KafkaListener(topics = "${kafka.topic}")
public void receive (List<Message<?> data, Acknowledgment ack) throws SQLException {
  for (int i = 0; i < data.size(); ++) {
    Object message = data.get(i).getPayload();
    MessageHeaders mh = data.get(i).getHeaders();
    Object value = mh.get("test");

我想为我做一些这样的事情,但是当我尝试

I would like to have some of this done for me but when I try

@KafkaListener(topics = "${kafka.topic}")
public voic receive (List<string> data,
  @Header (KafkaHeaders.OFFSET) List<Integer> offsets,
  @Header ("test") List<String> testHeaders,
  Acknowledgment ack) throws SQLException {

我得到 MessageHandlingException:方法参数类型[interface.java.util.list]缺少标头'test'但是,此方法对于offset标头可以正常工作.

I get MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list]This method however works fine for the offset header.

这是因为存在内置代码来处理标准标头,并且该方法不能用于自定义标头,还是我错过了一些使该方法起作用的东西?

Is this because there is inbuilt code to handle the standard headers and this approach cannot be used for custom ones or have I missed something that would make this approach work?

推荐答案

是的,该框架仅映射它知道的标头.它将所有其他映射的标头放入

Yes, the framework only maps the headers it knows about; it puts all other mapped headers into

/**
 * The header for a list of Maps of converted native Kafka headers. Used for batch
 * listeners; the map at a particular list position corresponds to the data in the
 * payload list position.
 */
public static final String BATCH_CONVERTED_HEADERS = PREFIX + "batchConvertedHeaders";

如果您想要该标头的离散映射,则需要创建一个自定义的 BatchMessageConverter -可能是 BatchMessagingMessageConverter 的子类.

If you want a discrete mapping for that header, you would need to create a custom BatchMessageConverter - probably a subclass of BatchMessagingMessageConverter.

最简单的方法可能是覆盖此方法,调用 super.toMessage()然后添加标题.

Probably the easiest would be to override this method, call super.toMessage() then add your headers.

return MessageBuilder.fromMessage(super.toMessage(...))
    .setHeader("test", ...)
    .build();

如果您使用的是Spring Boot,只需将转换器添加为Bean,然后Boot会将其插入.

If you are using Spring Boot, just add the converter as a bean and boot will wire it in.

否则将转换器添加到容器工厂.

Otherwise add the converter to the container factory.

编辑

如果消息转换器没有头映射器;所有标头都放在标头 KafkaHeaders.NATIVE_HEADERS 中,该标头是 List< Headers> .

If the message converter has no header mapper; all headers are put in the header KafkaHeaders.NATIVE_HEADERS which is a List<Headers>.

这篇关于Kafka标头以批处理方式显示为列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 11:09