消费者停止和启动之间丢失的消息

消费者停止和启动之间丢失的消息

本文介绍了Apache kafka 消费者停止和启动之间丢失的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 kafka 的新手,我使用 Apache kafka 消费者来读取来自生产者的消息.但是当我停止并开始一段时间时.之间产生的所有消息都丢失了.如何处理这种情况.我正在使用这些属性auto.offset.reset"、latest"和enable.auto.commit"、false".

I am new to kafka and using Apache kafka consumer to read messages from producer. But when I stop and start for certain time. All the produced messages between are lost. how to handle this scenario. I am using these properties "auto.offset.reset", "latest" and "enable.auto.commit", "false" .

这是我正在使用的代码.感谢任何帮助.

This is the code I am using.Any help is appreciated.

Properties props = new Properties();
        props.put("bootstrap.servers", localhost:9092);
        props.put("group.id", "service");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);
        props.put("value.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);

        @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicname));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                JSONObject jsonObj = new JSONObject(record.value());
                JdbcUtilToUdm.insertdataintodb(args, jsonObj);
            }
        }

推荐答案

由于禁用了自动提交,因此您必须显式调用 consumer.commitSync() 或 consumer.commitAsync().您可以同步或不同步提交,这取决于您需要/喜欢哪种方法.这就是消费者组在日志中的位置将被持久化的方式.您应该在处理记录后调用 commit(因此可能在您完成所有插入之后但在您的情况下再次轮询之前).

You have to explicitly call consumer.commitSync() or consumer.commitAsync() since you disabled auto commit. You can do the commit synchronously or not depending on which method you need/prefer. This is how the consumer group position in the log will be persisted. You should call commit after records are processed (so probably after you have finished all inserts but before you poll again in your case).

这篇关于Apache kafka 消费者停止和启动之间丢失的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 02:44