本文介绍了Kafka - 在 Java 中聆听主题的比 poll() 更好的替代方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试用 Java 创建一个消费者客户端.我意识到 poll() 函数已贬值.听卡夫卡的话题有哪些选择?

I'm trying to create a consumer client in Java. I realized the poll() function is depreciated. What are the alternatives to listen to a topic of Kafka?

我的代码:

KafkaConsumer< String, UserSegmentPayload > kc = new KafkaConsumer<>(props2);
kc.subscribe(Collections.singletonList(topicName));
while (true) {
    ConsumerRecords<String, UserSegmentPayload> records = kc.poll(100);
    for (ConsumerRecord<String, UserSegmentPayload> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s\n",
        record.offset(), record.key(), record.value());
    }
}

推荐答案

不推荐使用 poll()poll(long) 的原因是它们可能会无限期地阻塞(即使在第二种情况下,指定了超时).此行为的根本原因是这些方法中的初始元数据更新可能会永远阻塞(请参阅 此处).相反,您应该使用 KafkaConsumerpoll(Duration) 方法.因此,在您的代码中,您所要做的就是将 kc.poll(100) 替换为 kc.poll(Duration.ofMillis(100)).

The reason poll() and poll(long) are deprecated is that they may block indefinitely (even in the second case, where a timeout is specified). The root cause for this behaviour is that an initial metadata update in those methods may block forever (see here). Instead, you should use the poll(Duration)-method of the KafkaConsumer. So, in your code, all you have to do is to replace kc.poll(100) with kc.poll(Duration.ofMillis(100)).

这篇关于Kafka - 在 Java 中聆听主题的比 poll() 更好的替代方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 20:19