我已经使用以下命令手动创建了主题test
:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
并使用以下命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
我插入了这些记录:
This is a message
This is another message
This is a message2
首先,我通过命令行使用消息,如下所示:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
并成功显示所有记录。然后,我尝试使用以下代码在Java中实现使用者:
public class KafkaSubscriber {
public void consume() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
// also with this command
// consumer.subscribe(Arrays.asList("test"));
System.out.println("Starting to read data...");
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("Number of records found: " + records.count());
for (ConsumerRecord rec : records) {
System.out.println(rec.value());
}
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
但是输出是:
Starting to read data...
0
0
0
0
0
....
这意味着它在主题
test
中找不到任何记录。在Java使用者启动后,我还尝试发布一些记录,但是还是一样。任何想法可能出什么问题吗?编辑:添加以下行后:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
现在,消费者仅在我向该主题写入新记录时读取。它不会从开始读取所有记录。
最佳答案
默认情况下,如果以前没有为该组提交任何偏移量,则使用者从结束主题开始。
因此,如果您在生成记录后运行它,它将不会收到它们。
注意,在您的kafka-console-consumer.sh
中,您具有--from-beginning
标志,该标志迫使使用者改为从主题的开头开始。
如注释中所建议的,一种解决方法是将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为earliest
。但是,我会谨慎地使用该设置,因为您的消费者将从主题开始就开始消费,在实际用例中这可能是很多数据。
最简单的解决方案是,您现在已经运行了使用者一次并创建了一个组,则只需重新运行生产者即可。之后,当您再次运行使用者时,它将从其上一个新生产者消息之前的位置开始拾取。
另一方面,如果您要始终重新使用所有消息,则有两种选择:
当消费者开始将其位置移至主题开头时,显式使用seekToBeginning()
将auto.offset.reset
设置为earliest
并通过将enable.auto.commit
设置为false
禁用自动偏移提交