本文介绍了Kafka Consumer-ClassCastException Java的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Kafka使用者在尝试批量处理消息(即处理消息列表)时抛出异常

错误消息

is java.lang.ClassCastException: class kafka.psmessage.PMessage cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (kafka.psmessage.pMessage and org.apache.kafka.clients.consumer.ConsumerRecord are in unnamed module of loader 'app'); nested exception is java.lang.ClassCastException: class kafka.psmessage.PMessage cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (kafka.psmessage.PMessage and org.apache.kafka.clients.consumer.ConsumerRecord are in unnamed module of loader 'app')

代码片段

   public void receive(List<ConsumerRecord<String, PMessage>> records) {
           List<PMessage> msgList = records.stream().map(message -> message.value()).collect(Collectors.toList());

PMessage test = records.get(0).value();
ConsumerRecord<String, PMessage> firstMessage = records.get(0);

上面的所有3个语句都给出ClassCastException

@Override
    protected DefaultKafkaConsumerFactory<String, PMessage> createConsumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.putAll(kafkaProperties.buildConsumerProperties());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceInfo.getBrokersAuthUrl());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.putAll(KafkaSaslUtils.getSaslProperties(serviceInfo));

        return new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(),
                new MessageDeserializer());
    }

消息反序列化程序:

@Override
    public PMessage deserialize(String topic, byte[] data) {
        if (data == null) {
            throw new SerializationException("Can not deserialize. data is null for topic: '" + topic + "'");
        }

        try {
            SeekableByteArrayInput seekableByteArrayInput = new SeekableByteArrayInput(data);
            GenericDatumReader reader = new GenericDatumReader<GenericRecord>();
            DataFileReader<GenericRecord> dataFileReader = new 
         DataFileReader(seekableByteArrayInput, reader);
            GenericRecord genericRecord = extractGenericRecord(dataFileReader);

任何指针都将受到赞赏。谢谢!

推荐答案

简而言之

我遇到了同样的问题,我花了一晚时间才找到了一个非常简单的解决方案:您的Kafka侦听器容器必须以批处理模式工作(默认情况下,该模式为Single)。您可以通过属性spring.kafka.listener.type进行配置。如果您使用自己的KafkaListenerContainerFactoryBean,则只需将factory.setBatchListener(true);添加到Bean定义中。

详细信息

重现情况的步骤(这不是错误!)。

创建表示Kafka消息的类和相应的反序列化程序。在您的情况下,它是kafka.psmessage.PMessage,我将使用CommunicationRequest.

@ToString
public class CommunicationRequest {
    public int id;
    public String action;
}

@Component
public class CommunicationRequestDeserializer implements Deserializer<CommunicationRequest> {
    @Autowired
    ObjectMapper objectMapper;

    @SneakyThrows
    @Override
    public CommunicationRequest deserialize(String topic, byte[] data) {
        String json = new String(data);
        return objectMapper.readValue(json, CommunicationRequest.class);
    }
}

配置Kafka:

@Slf4j
@Configuration
public class KafkaConfiguration {
    @Autowired
    private final KafkaProperties kafkaProperties;

    @Bean
    public ConsumerFactory<String, CommunicationRequest> consumerFactory(
        CommunicationRequestDeserializer deserializer
    ) {
        return new DefaultKafkaConsumerFactory<>(
            this.kafkaProperties.buildConsumerProperties(),
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> kafkaListenerContainerFactory(
        ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers,
        ConsumerFactory<String, CommunicationRequest> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> factory =
            new ConcurrentKafkaListenerContainerFactory<>()
        ;
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(false); /* Important line! */
        return factory;
    }

    /* other beans are not shown to make example shorter */
}

现在您可以使用KafkaListener注释创建一个类来读取消息。您在方法签名上有几个选择(删除所有方法,只保留一个方法!)。

@Slf4j
@Component
public class MyListener {
    @KafkaListener(topics = "test-topic")
    public void consume1(Object record) {
        log.info("consume1: {}", record.toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume2(CommunicationRequest record) {
        log.info("consume2: {}", record.toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume3(ConsumerRecord<String, CommunicationRequest> record) {
        log.info("consume3: {}", record.toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume4(List<Object> records) {
        log.info("consume4: size = {}, first = {}", records.size(), records.get(0).toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume5(List<CommunicationRequest> records) {
        log.info("consume5: size = {}, first = {}", records.size(), records.get(0).toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume6(List<ConsumerRecord<String, CommunicationRequest>> records) {
        log.info("consume6: size = {}, first = {}", records.size(), records.get(0).toString());
    }
}

将消息放入Kafka,当Listener type=Single,即指定了setBatchListener(false);时,会得到如下内容:

2021-11-19 01:34:54,029 INFO  MyListener - consume1: ConsumerRecord(topic = test-topic, ...)
2021-11-19 01:31:53,141 INFO  MyListener - consume2: CommunicationRequest(action=abc, id=1)
2021-11-19 01:36:40,483 INFO  MyListener - consume3: ConsumerRecord(topic = test-topic, ...)
2021-11-19 01:41:23,265 INFO  MyListener - consume4: size = 1, first = CommunicationRequest(action=abc, id=1)
2021-11-19 01:42:34,259 INFO  MyListener - consume5: size = 1, first = CommunicationRequest(action=abc, id=1)
2021-11-19 01:44:39,999 ERROR SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(...

如您所见,Spring Kafka尝试猜测需要向其传递消息的类。如果您请求消息列表,Spring Kafka将返回一个大小为1的列表,而不管在最后一次轮询中实际阅读了多少消息()。顺便说一句,某些签名无效,无法像上一个签名那样被调用。


上面描述了当KafkaListenerContainer具有类型Single而不是Batch时的行为(请查看清单中的重要行)。现在让我们创建批处理监听器!

/* ... */

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> kafkaListenerContainerFactory(
    ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers,
    ConsumerFactory<String, CommunicationRequest> consumerFactory
) {
    ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> factory =
        new ConcurrentKafkaListenerContainerFactory<>()
    ;
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true); /* Important line! */
    return factory;
}

/* ... */

让我们使用相同的MyListener类并查看结果:

2021-11-19 01:57:10,505 INFO  MyListener - consume1: io.opentracing.contrib.kafka.TracingKafkaConsumer@6a9f8235
2021-11-19 01:58:18,382 INFO  MyListener - consume2: CommunicationRequest(action=abc, id=1)
2021-11-19 01:59:43,188 ERROR KafkaMessageListenerContainer$ListenerConsumer - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
2021-11-19 02:00:50,468 INFO  MyListener - consume4: size = 2, first = CommunicationRequest(action=abc, id=1)
2021-11-19 02:01:53,214 INFO  MyListener - consume5: size = 2, first = CommunicationRequest(action=abc, id=1)
2021-11-19 02:03:04,642 INFO  MyListener - consume6: size = 2, first = ConsumerRecord(topic = test-topic, ...)
同样,Spring Kafka足够好,可以识别几乎所有的签名。最重要的是,您现在收到了一个列表--上一次poll()调用中使用的所有批处理。在我的情况下,主题中只有2条消息。

结论

您可以使用不同的方法签名,并将它们与监听器类型组合在一起,Single和Batch。并不是所有组合都被允许。你的签名是不被允许的,与阿帕奇·卡夫卡本身和话题再创造无关:尽管你的签名说你期待一份Consumer erRecord列表,但你会收到一份PMessage列表。这就是春天·卡夫卡的工作方式。

这篇关于Kafka Consumer-ClassCastException Java的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-29 21:11