本文介绍了Kafka 命令行消费者读取,但无法通过 Java 读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经用这个命令手动创建了主题 test:

I have manually created topic test with this command:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

并使用此命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

我插入了这些记录:

This is a message
This is another message
This is a message2

首先,我像这样通过命令行消费消息:

First, I consume messages through the command line like this:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

并且所有记录都成功显示.然后,我尝试使用以下代码在 Java 中实现消费者:

and all the records are successfully shown. Then, I try to implement a consumer in Java using this code:

public class KafkaSubscriber {

    public void consume() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));
        // also with this command
        // consumer.subscribe(Arrays.asList("test"));

        System.out.println("Starting to read data...");

        try {
            while (true) {
                try {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    System.out.println("Number of records found: " + records.count());
                    for (ConsumerRecord rec : records) {
                        System.out.println(rec.value());
                    }
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
        catch (Exception e) {
                e.printStackTrace();
        } finally {
            consumer.close();
        }
}

但输出是:

Starting to read data...
0
0
0
0
0
....

这意味着它在主题test中没有找到任何记录.我还尝试在 Java 使用者启动之后 发布一些记录,但还是一样.有什么想法可能会出错吗?

Which means that it does not find any records in topic test. I also tried to publish some records after the Java consumer has started, but the same again. Any ideas what might be going wrong?

EDIT:添加以下行后:

 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

消费者现在只在我向主题写入新记录时读取.它不会从一开始就读取所有记录.

the consumer now reads only when I write new records to the topic. It does not read all the records from the beggining.

推荐答案

默认情况下,如果之前没有为该组提交偏移量,则消费者从结束主题开始.

By default, if no offsets have previously been committed for the group, the consumer starts at the end topics.

因此,如果您在生成记录后运行它,它将不会收到它们.

Hence if you are running it after having produced records, it won't receive them.

请注意,在您的 kafka-console-consumer.sh 中,您有 --from-beginning 标志,它强制消费者从主题的开头开始.

Notice in your kafka-console-consumer.sh, you have the --from-beginning flag which forces the consumer to instead start from the beginning of the topic.

如评论中所建议的,一种解决方法是将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 earliest.但是,我会谨慎使用该设置,因为您的消费者将从主题开始就开始使用,并且在实际用例中这可能是大量数据.

One workaround, as suggested in a comment, is to set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest. However I'd be careful with that setting as your consumer will consume from the start of the topics and this could be a lot of data in a real use case.

现在最简单的解决方案是您已经运行了一次消费者并且它已经创建了一个组,您只需重新运行生产者即可.之后,当您再次运行消费者时,它将从新的生产者消息之前的最后一个位置接收数据.

The easiest solution is now that you've run your consumer once and it has created a group, you can simply rerun the producer. After that when you run the consumer again it will pick up from its last position which will be before the new producer messages.

另一方面,如果您打算始终重新接收所有消息,那么您有两个选择:

On the other hand, if you mean to always reconsume all messages then you have 2 options:

  • 显式使用 seekToBeginning() 当您的消费者开始将其位置移动到主题的开头

  • explicitely use seekToBeginning() when your consumer starts to move its position to the start of topics

auto.offset.reset 设置为 earliest 并通过将 enable.auto.commit 设置为 来禁用自动偏移提交>false

set auto.offset.reset to earliest and disable auto offset commit by setting enable.auto.commit to false

这篇关于Kafka 命令行消费者读取,但无法通过 Java 读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 02:42