本文介绍了Consumer.endOffsets 在 Kafka 中是如何工作的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交偏移和结束偏移.类似于 Kafka 控制台消费者组脚本的工作方式,但它适用于所有组.

Assume I've a timer task running indefinitely which iterates over the all the consumer groups in the kafka cluster and outputs lag, committed offset and end offset for all partitions for each group. Similar to how Kafka console consumer group script works except it's for all groups.

类似的东西

单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)

Single Consumer - Not Working - Doesn't return offsets for some of the provided topic partitions ( ex. 10 provided - 5 Offsets Returned )

Consumer consumer;

static {
  consumer = createConsumer();
}

run() {
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}

多个消费者 - 工作

run() {
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }

版本:Kafka-Client 2.0.0

Versions:Kafka-Client 2.0.0

我是否错误地使用了消费者 API?理想情况下,我想使用单个消费者.

Am I using the consumer api incorrectly ? Ideally I would like to use single consumer.

如果您需要更多详细信息,请告诉我.

Let me know if you need more details.

推荐答案

这是 Fetcher.fetchOffsetsByTimes() 中的一个错误,特别是在 groupListOffsetRequests 方法中,其中的逻辑没有添加用于重试的分区,其中用于请求分区偏移量的领导者未知或不可用.

It's a bug in Fetcher.fetchOffsetsByTimes() specifically inside groupListOffsetRequests method in which the logic was not adding the partitions for retry where leader for requesting offset for a partition was unknown or unavailable.

当您在我们请求 endoffsets 时某些组已经拥有主题分区领导信息的所有消费者组分区以及没有领导信息的主题分区中使用单个消费者时,这一点更加明显由于错误,未知或不可用被忽略.

This was more noticeable when you use the single consumer across all consumer group partitions where some groups already have the topics partition leader information when we requested endoffsets and for the topics partitions where there is no leader information is unknown or unavailable are left off because of the bug.

后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是进行了更改以从 AdminClient.listTopics & 中读取主题分区.AdminClient.describeTopics 并一次性传递给 Consumer.endOffsets.

Later, I realized it was not a good idea to pull topics partitions from each consumer group instead made the change to read the topics partitions from AdminClient.listTopics & AdminClient.describeTopics and pass all at once to Consumer.endOffsets.

尽管这完全不能解决问题,因为主题/分区在多次运行之间可能仍然不可用或未知.

Although this completely doesn't resolve the issue as topics/partitions may still be unavailable or unknown between multiple runs.

可以找到更多信息 - KAFKA-7044 &拉取请求.此问题已修复并计划在 2.1.0 版本发布.

More information can be found - KAFKA-7044 & pull request. This has been fixed and scheduled for 2.1.0 release.

这篇关于Consumer.endOffsets 在 Kafka 中是如何工作的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 02:38