问题描述
使用Kafka经纪人:1.0.1spring-kafka:2.1.6.发布
Using Kafka Broker: 1.0.1spring-kafka: 2.1.6.RELEASE
我正在使用具有以下设置的批处理使用者:
I'm using a batched consumer with the following settings:
// Other settings are not shown..
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
我通过以下方式使用spring侦听器:
I use spring listener in the following way:
@KafkaListener(topics = "${topics}", groupId = "${consumer.group.id}")
public void receive(final List<String> data,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) Set<String> topics,
@Header(KafkaHeaders.OFFSET) final List<Long> offsets) { // ......code... }
我总是发现一些消息保留在批处理中,而在我的侦听器中未收到.看来,如果其余消息小于批处理大小,则不会消耗该消息(可能在内存中并发布给我的侦听器).有什么方法可以设置一个时间间隔后自动刷新批处理,从而避免不刷新消息?用批处理者来处理这种情况的最佳方法是什么?
I always find the a few messages remain in the batch and not received in my listener. It appears to be that if the remaining messages are less than a batch size, it isn't consumed (may be in memory and published to my listener). Is there any way to have a setting to auto-flush the batch after a time interval so as to avoid the messages not being flushed?What's the best way to deal with such kind of situation with a batch consumer?
推荐答案
我刚刚进行了测试,没有任何问题...
I just ran a test without any problems...
@SpringBootApplication
public class So50370851Application {
public static void main(String[] args) {
SpringApplication.run(So50370851Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
for (int i = 0; i < 230; i++) {
template.send("so50370851", "foo" + i);
}
};
}
@KafkaListener(id = "foo", topics = "so50370851")
public void listen(List<String> in) {
System.out.println(in.size());
}
@Bean
public NewTopic topic() {
return new NewTopic("so50370851", 1, (short) 1);
}
}
和
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.type=batch
和
100
100
30
这意味着问题出在发送方.
That implies the problem is on the sending side.
这篇关于Kafka + Spring Batch Listener刷新批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!