本文介绍了Kafka + Spring Batch 监听器刷新批处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 Kafka 代理:1.0.1spring-kafka: 2.1.6.RELEASE

我正在使用具有以下设置的批量消费者:

//其他设置不显示..props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

我以如下方式使用 spring 监听器:

 @KafkaListener(topics = "${topics}", groupId = "${consumer.group.id}")public void receive(final List data,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List分区,@Header(KafkaHeaders.RECEIVED_TOPIC) 设置话题,@Header(KafkaHeaders.OFFSET) 最终列表offsets) {//......代码...... }

我总是发现有几条消息保留在批处理中,而在我的侦听器中没有收到.似乎如果剩余的消息小于批处理大小,则不会消耗它(可能在内存中并发布给我的听众).有什么方法可以设置在一段时间间隔后自动刷新批处理,以避免消息不被刷新?用批量消费者处理这种情况的最佳方法是什么?

解决方案

我刚刚运行了一个测试,没有任何问题...

@SpringBootApplication公共类 So50370851Application {公共静态无效主(字符串 [] args){SpringApplication.run(So50370851Application.class, args);}@豆角,扁豆公共 ApplicationRunner 运行程序(KafkaTemplate 模板){返回参数 ->{for (int i = 0; i 

spring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.max-poll-records=100spring.kafka.listener.type=batch

10010030

此外,调试日志在一段时间后显示它正在轮询和获取 0 条记录(并且一遍又一遍地重复).

这意味着问题出在发送方.

Using Kafka Broker: 1.0.1spring-kafka: 2.1.6.RELEASE

I'm using a batched consumer with the following settings:

// Other settings are not shown..
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

I use spring listener in the following way:

 @KafkaListener(topics = "${topics}", groupId = "${consumer.group.id}")
    public void receive(final List<String> data,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitions,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) Set<String> topics,
                        @Header(KafkaHeaders.OFFSET) final List<Long> offsets) { // ......code... }

I always find the a few messages remain in the batch and not received in my listener. It appears to be that if the remaining messages are less than a batch size, it isn't consumed (may be in memory and published to my listener). Is there any way to have a setting to auto-flush the batch after a time interval so as to avoid the messages not being flushed?What's the best way to deal with such kind of situation with a batch consumer?

解决方案

I just ran a test without any problems...

@SpringBootApplication
public class So50370851Application {

    public static void main(String[] args) {
        SpringApplication.run(So50370851Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            for (int i = 0; i < 230; i++) {
                template.send("so50370851", "foo" + i);
            }
        };
    }

    @KafkaListener(id = "foo", topics = "so50370851")
    public void listen(List<String> in) {
        System.out.println(in.size());
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50370851", 1, (short) 1);
    }

}

and

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.type=batch

and

100
100
30

That implies the problem is on the sending side.

这篇关于Kafka + Spring Batch 监听器刷新批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 02:43
查看更多