问题描述
我的Kafka使用者在尝试批量处理消息(即处理消息列表)时抛出异常
错误消息
is java.lang.ClassCastException: class kafka.psmessage.PMessage cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (kafka.psmessage.pMessage and org.apache.kafka.clients.consumer.ConsumerRecord are in unnamed module of loader 'app'); nested exception is java.lang.ClassCastException: class kafka.psmessage.PMessage cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (kafka.psmessage.PMessage and org.apache.kafka.clients.consumer.ConsumerRecord are in unnamed module of loader 'app')
代码片段
public void receive(List<ConsumerRecord<String, PMessage>> records) {
List<PMessage> msgList = records.stream().map(message -> message.value()).collect(Collectors.toList());
PMessage test = records.get(0).value();
ConsumerRecord<String, PMessage> firstMessage = records.get(0);
上面的所有3个语句都给出ClassCastException
@Override
protected DefaultKafkaConsumerFactory<String, PMessage> createConsumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.putAll(kafkaProperties.buildConsumerProperties());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceInfo.getBrokersAuthUrl());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.putAll(KafkaSaslUtils.getSaslProperties(serviceInfo));
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new MessageDeserializer());
}
消息反序列化程序:
@Override
public PMessage deserialize(String topic, byte[] data) {
if (data == null) {
throw new SerializationException("Can not deserialize. data is null for topic: '" + topic + "'");
}
try {
SeekableByteArrayInput seekableByteArrayInput = new SeekableByteArrayInput(data);
GenericDatumReader reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> dataFileReader = new
DataFileReader(seekableByteArrayInput, reader);
GenericRecord genericRecord = extractGenericRecord(dataFileReader);
任何指针都将受到赞赏。谢谢!
推荐答案
简而言之
我遇到了同样的问题,我花了一晚时间才找到了一个非常简单的解决方案:您的Kafka侦听器容器必须以批处理模式工作(默认情况下,该模式为Single)。您可以通过属性spring.kafka.listener.type
进行配置。如果您使用自己的KafkaListenerContainerFactory
Bean,则只需将factory.setBatchListener(true);
添加到Bean定义中。
详细信息
重现情况的步骤(这不是错误!)。
创建表示Kafka消息的类和相应的反序列化程序。在您的情况下,它是kafka.psmessage.PMessage,我将使用CommunicationRequest.@ToString
public class CommunicationRequest {
public int id;
public String action;
}
@Component
public class CommunicationRequestDeserializer implements Deserializer<CommunicationRequest> {
@Autowired
ObjectMapper objectMapper;
@SneakyThrows
@Override
public CommunicationRequest deserialize(String topic, byte[] data) {
String json = new String(data);
return objectMapper.readValue(json, CommunicationRequest.class);
}
}
配置Kafka:
@Slf4j
@Configuration
public class KafkaConfiguration {
@Autowired
private final KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, CommunicationRequest> consumerFactory(
CommunicationRequestDeserializer deserializer
) {
return new DefaultKafkaConsumerFactory<>(
this.kafkaProperties.buildConsumerProperties(),
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> kafkaListenerContainerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers,
ConsumerFactory<String, CommunicationRequest> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>()
;
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(false); /* Important line! */
return factory;
}
/* other beans are not shown to make example shorter */
}
现在您可以使用KafkaListener注释创建一个类来读取消息。您在方法签名上有几个选择(删除所有方法,只保留一个方法!)。
@Slf4j
@Component
public class MyListener {
@KafkaListener(topics = "test-topic")
public void consume1(Object record) {
log.info("consume1: {}", record.toString());
}
@KafkaListener(topics = "test-topic")
public void consume2(CommunicationRequest record) {
log.info("consume2: {}", record.toString());
}
@KafkaListener(topics = "test-topic")
public void consume3(ConsumerRecord<String, CommunicationRequest> record) {
log.info("consume3: {}", record.toString());
}
@KafkaListener(topics = "test-topic")
public void consume4(List<Object> records) {
log.info("consume4: size = {}, first = {}", records.size(), records.get(0).toString());
}
@KafkaListener(topics = "test-topic")
public void consume5(List<CommunicationRequest> records) {
log.info("consume5: size = {}, first = {}", records.size(), records.get(0).toString());
}
@KafkaListener(topics = "test-topic")
public void consume6(List<ConsumerRecord<String, CommunicationRequest>> records) {
log.info("consume6: size = {}, first = {}", records.size(), records.get(0).toString());
}
}
将消息放入Kafka,当Listener type=Single,即指定了setBatchListener(false);
时,会得到如下内容:
2021-11-19 01:34:54,029 INFO MyListener - consume1: ConsumerRecord(topic = test-topic, ...)
2021-11-19 01:31:53,141 INFO MyListener - consume2: CommunicationRequest(action=abc, id=1)
2021-11-19 01:36:40,483 INFO MyListener - consume3: ConsumerRecord(topic = test-topic, ...)
2021-11-19 01:41:23,265 INFO MyListener - consume4: size = 1, first = CommunicationRequest(action=abc, id=1)
2021-11-19 01:42:34,259 INFO MyListener - consume5: size = 1, first = CommunicationRequest(action=abc, id=1)
2021-11-19 01:44:39,999 ERROR SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(...
如您所见,Spring Kafka尝试猜测需要向其传递消息的类。如果您请求消息列表,Spring Kafka将返回一个大小为1的列表,而不管在最后一次轮询中实际阅读了多少消息()。顺便说一句,某些签名无效,无法像上一个签名那样被调用。
上面描述了当KafkaListenerContainer具有类型Single而不是Batch时的行为(请查看清单中的重要行)。现在让我们创建批处理监听器!
/* ... */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> kafkaListenerContainerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers,
ConsumerFactory<String, CommunicationRequest> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>()
;
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); /* Important line! */
return factory;
}
/* ... */
让我们使用相同的MyListener类并查看结果:
2021-11-19 01:57:10,505 INFO MyListener - consume1: io.opentracing.contrib.kafka.TracingKafkaConsumer@6a9f8235
2021-11-19 01:58:18,382 INFO MyListener - consume2: CommunicationRequest(action=abc, id=1)
2021-11-19 01:59:43,188 ERROR KafkaMessageListenerContainer$ListenerConsumer - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
2021-11-19 02:00:50,468 INFO MyListener - consume4: size = 2, first = CommunicationRequest(action=abc, id=1)
2021-11-19 02:01:53,214 INFO MyListener - consume5: size = 2, first = CommunicationRequest(action=abc, id=1)
2021-11-19 02:03:04,642 INFO MyListener - consume6: size = 2, first = ConsumerRecord(topic = test-topic, ...)
同样,Spring Kafka足够好,可以识别几乎所有的签名。最重要的是,您现在收到了一个列表--上一次poll()调用中使用的所有批处理。在我的情况下,主题中只有2条消息。结论
您可以使用不同的方法签名,并将它们与监听器类型组合在一起,Single和Batch。并不是所有组合都被允许。你的签名是不被允许的,与阿帕奇·卡夫卡本身和话题再创造无关:尽管你的签名说你期待一份Consumer erRecord列表,但你会收到一份PMessage列表。这就是春天·卡夫卡的工作方式。
这篇关于Kafka Consumer-ClassCastException Java的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!