问题描述
我正在尝试使用 Kafka 制作一个小型 PoC.但是,在 Java 中创建消费者时,此消费者不会收到任何消息.即使当我使用相同的 url/topic 启动 kafka-console-consumer.sh 时,我也会收到消息.有谁知道我可能做错了什么?此代码由 GET API 调用.
I am trying to makea small PoC with Kafka. However, when making the consumer in java, this consumer gets no messages. Even though when I fire up a kafka-console-consumer.sh with the same url/topic, I do get messages. Does anyone know what I might do wrong? This code is called by a GET API.
public List<KafkaTextMessage> receiveMessages() {
log.info("Retrieving messages from kafka");
val props = new Properties();
// See https://kafka.apache.org/documentation/#consumerconfigs
props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
//props.put("client.id", "my-topic consumer");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
ImmutableList.Builder<KafkaTextMessage> builder;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TEXT_MESSAGE_TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
builder = ImmutableList.builder();
for (ConsumerRecord<String, String> record : records) {
builder.add(new KafkaTextMessage(record.value()));
log.info("We got at position: {} key:{} value: {}", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
return builder.build();
}
推荐答案
尝试在您的使用者属性中添加 auto.offset.reset=earliest
.默认值设置为 latest
.我建议这样做是因为我看到您的 group.id
设置为 test
,您可能已经在之前的测试中使用过该值.
Try adding auto.offset.reset=earliest
in your consumer properties. The default value is set to latest
. I'm suggesting this because I see that your group.id
is set to test
, value that you may have already use in previous tests.
这篇关于卡夫卡消费者没有返回任何记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!