本文介绍了卡夫卡消费者未返回任何记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试与Kafka建立小型PoC.但是,当使用Java制作使用者时,该使用者不会收到任何消息.即使当我用相同的URL/主题启动kafka-console-consumer.sh时,我也得到了消息.有人知道我可能做错了吗?此代码由GET API调用.

I am trying to makea small PoC with Kafka. However, when making the consumer in java, this consumer gets no messages. Even though when I fire up a kafka-console-consumer.sh with the same url/topic, I do get messages. Does anyone know what I might do wrong? This code is called by a GET API.

public List<KafkaTextMessage> receiveMessages() {
    log.info("Retrieving messages from kafka");
    val props = new Properties();
    // See https://kafka.apache.org/documentation/#consumerconfigs
    props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
    //props.put("client.id", "my-topic consumer");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    ImmutableList.Builder<KafkaTextMessage> builder;
    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
        consumer.subscribe(Collections.singletonList(TEXT_MESSAGE_TOPIC));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        builder = ImmutableList.builder();
        for (ConsumerRecord<String, String> record : records) {
            builder.add(new KafkaTextMessage(record.value()));
            log.info("We got at position: {} key:{} value: {}", record.offset(), record.key(), record.value());
            consumer.commitSync();
        }
    }
    return builder.build();
}

推荐答案

尝试在使用者属性中添加 auto.offset.reset =最早的.默认值设置为 latest .我建议这样做是因为我发现您的 group.id 设置为 test ,这是您先前在测试中可能已经使用的值.

Try adding auto.offset.reset=earliest in your consumer properties. The default value is set to latest. I'm suggesting this because I see that your group.id is set to test, value that you may have already use in previous tests.

这篇关于卡夫卡消费者未返回任何记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 17:19