问题描述
我已经使用spring-kafka在Spring引导应用程序中用SeekToCurrentErrorHandler配置了一个kafka使用者.我的使用者配置是:
I've configured a kafka consumer with SeekToCurrentErrorHandler in Spring boot application using spring-kafka. My consumer configuration is :
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkaserver");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, StringDeserializer.class.getName());
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "java.lang.String");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.lang.String");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(5);
seekToCurrentErrorHandler.setCommitRecovered(true);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(seekToCurrentErrorHandler);
return factory;
}
为了测试SeekToCurrentErrorHandler配置,我以不正确的格式在kafka中推送了一条记录,因此该记录因反序列化异常而失败.根据我的理解,错误处理程序应尝试处理失败的记录5次,之后应登录并继续下一条记录.但是它会无限次读取失败的记录.
To test SeekToCurrentErrorHandler config, I pushed a record in kafka with incorrect format so that it fails with deserialization exception. As per my understanding the error handler should try to handle the failed record 5 times and after that it should log and move on to the next record.But it keeps on reading the failed record infinite number of times.
请告诉我我要去哪里错了.
Please tell me where am I going wrong.
推荐答案
我完全是,我要做的唯一修复是确保并发级别与该主题的分区数相同.否则它将无限重试.
I have exactly the same problem and the only fix I do is make sure the concurrency level is same as the number of partition for the topic. Otherwise it will keeps retrying infinitely.
听起来像春天卡夫卡的虫子.
Sounds like a bug of spring kafka.
这篇关于在Kafka使用者中使用SeekToCurrentErrorHandler进行无限重试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!