问题描述
请谁能告诉我每次运行消费者时如何从头开始使用 Kafka Consumer API 读取消息.
Please can anyone tell me how to read messages using the Kafka Consumer API from the beginning every time when I run the consumer.
推荐答案
这适用于 0.9.x 使用者.基本上,当您创建消费者时,您需要使用属性 ConsumerConfig.GROUP_ID_CONFIG
为该消费者分配一个消费者组 ID.每次启动消费者时随机生成消费者组 ID properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
(properties 是 java.lang.您将传递给构造函数 new KafkaConsumer(properties)
的 util.Properties.
This works with the 0.9.x consumer. Basically when you create a consumer, you need to assign a consumer group id to this consumer using the property ConsumerConfig.GROUP_ID_CONFIG
. Generate the consumer group id randomly every time you start the consumer doing something like this properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
(properties is an instance of java.util.Properties that you will pass to the constructor new KafkaConsumer(properties)
).
随机生成客户端意味着新的消费者组在 kafka 中没有任何与之关联的偏移量.所以在这之后我们要做的就是为这个场景制定一个策略.正如 auto.offset.reset
属性的文档所说:
Generating the client randomly means that the new consumer group doesn't have any offset associated to it in kafka. So what we have to do after this is to set a policy for this scenario. As the documentation for the auto.offset.reset
property says:
当 Kafka 中没有初始偏移量或当前偏移量在服务器上不再存在时(例如,因为该数据已被删除)该怎么办:
- earliest:自动将偏移量重置为最早的偏移量
- latest:自动将偏移量重置为最新的偏移量
- none:如果没有找到先前的偏移量或消费者的组,则向消费者抛出异常
- 其他任何事情:向消费者抛出异常.
所以从上面列出的选项中我们需要选择earliest
策略,这样新的消费者组每次都从头开始.
So from the options above listed we need to choose the earliest
policy so the new consumer group starts from the beginning every time.
您的 Java 代码如下所示:
Your code in java, will look something like this:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);
你现在唯一需要弄清楚的是,当有多个消费者属于同一个消费者组但分布式时,如何生成一个随机 id 并将其分布在这些实例之间,以便它们都属于同一个消费者团体.
The only thing that you need to figure it out now, is when having multiple consumers that belong to the same consumer group but are distributed how to generate a random id and distribute it between those instances so they all belong to the same consumer group.
希望有帮助!
这篇关于如何从一开始就使用 Kafka Consumer API 读取数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!