问题描述
我是一名学习卡夫卡的新生,我遇到了一些基本问题,因为他们了解到多个消费者,到目前为止文章,文档等都没有太多帮助。
I am a new student studying Kafka and I've run into some fundamental issues with understanding multiple consumers that articles, documentations, etc. have not been too helpful with so far.
我试图做的一件事是编写我自己的高级Kafka制作人和消费者并同时运行它们,向主题发布100条简单消息并让我的消费者检索它们。我成功地做到了这一点,但是当我尝试引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。
One thing I have tried to do is write my own high level Kafka producer and consumer and run them simultaneously, publishing 100 simple messages to a topic and having my consumer retrieve them. I have managed to do this successfully, but when I try to introduce a second consumer to consume from the same topic that messages were just published to, it receives no messages.
我的理解是,对于每个主题,您可以拥有来自不同消费者群体的消费者,并且每个消费者群体都可以获得针对某个主题生成的消息的完整副本。它是否正确?如果没有,那么建立多个消费者的正确方法是什么?这是我到目前为止写的消费者类:
It was my understanding that for each topic, you could have consumers from separate consumer groups and each of these consumer groups would get a full copy of the messages produced to some topic. Is this correct? If not, what would be the proper way for me to set up multiple consumers? This is the consumer class that I have written so far:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
此外,我注意到最初我只使用一个分区测试上述主题'test'的消耗。当我将另一个消费者添加到现有的消费者群体中时说'testGroup'时,这触发了Kafka重新平衡,这使得我的消费延迟减少了很多,在几秒钟内。我认为这是重新平衡的一个问题,因为我只有一个分区,但是当我创建一个新的主题多个分区,比如6个分区时,出现了类似的问题,即向同一个消费者群体添加更多的消费者会导致延迟问题。我环顾四周,有人告诉我,我应该使用多线程消费者 - 有人可以解释一下吗?
Furthermore, I noticed that originally I was testing the above consumption for a topic 'test' with only a single partition. When I added another consumer to an existing consumer group say 'testGroup', this trigged a Kafka rebalance which slowed down the latency of my consumption by a significant amount, in the magnitude of seconds. I thought that this was an issue with rebalancing since I only had a single partition, but when I created a new topic 'multiplepartitions' with say 6 partitions, similar issues arose where adding more consumers to the same consumer group caused latency issues. I have looked around and people are telling me I should be using a multi-threaded consumer -- can anyone shed light on that?
推荐答案
我认为你的问题在于auto.offset.reset属性。当新的使用者从分区读取并且没有先前提交的偏移量时,auto.offset.reset属性用于决定起始偏移量应该是什么。如果将其设置为最大(默认值),则会开始读取最新(最后)消息。如果您将其设置为最小,则会收到第一条可用消息。
I think your problem lies with the auto.offset.reset property. When a new consumer reads from a partition and there's no previous committed offset, the auto.offset.reset property is used to decide what the starting offset should be. If you set it to "largest" (the default) you start reading at the latest (last) message. If you set it to "smallest" you get the first available message.
所以添加:
properties.put("auto.offset.reset", "smallest");
再试一次。
这篇关于如何在Kafka中使用多个消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!