本文介绍了KafkaConsumer Assignment()返回空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.1</version>
</dependency>

以下代码返回非空分配的分区,但poll(0)已弃用。

val records = kafkaConsumer.poll(0) // <= deprecated
logInfo(s"Dummy call ${records.count()}")

val partitions = kafkaConsumer.assignment()
logInfo(s"partitions=${partitions}")

以下返回分区:

val records = kafkaConsumer.poll(Duration.ofMillis(0)) // <= not working
logInfo(s"Dummy call ${records.count()}")

val partitions = kafkaConsumer.assignment()
logInfo(s"partitions=${partitions}")

为什么?有什么主意吗?谢谢

推荐答案

这两个调用的不同之处在于获取元数据的方式。被弃用的poll会无限期地等待,直到成功检索元数据,而另一个poll只尝试一次,通常在非常短的时间间隔内(对于您的情况是0)无法连接到协调器,并返回任何有用的东西。这就是为什么您在调用poll(Duration.ofMillis(0))一次后会看到一个空的赋值。

这篇关于KafkaConsumer Assignment()返回空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-14 08:05