ConsumerRebalanceListener

ConsumerRebalanceListener

我正在尝试实现基本方案,从头开始重新阅读主题(至少1条消息),并且我遇到了意外的行为。

假设有1个分区主题正好保存了100万条消息,有1个具有偏移量的使用者已经在中间某个地方提交,而没有活动的生产者。

首先我尝试过

  consumer.subscribe(Collections.singletonList(topic));
  consumer.seekToBeginning(Collections.emptySet());
  consumer.poll(Duration.ofMillis(longTimeout)); //no loop to simplify


那是行不通的(没有轮询消息)。我读过seekToBeginning是懒惰的(没关系),但事实证明,seekToBeginning完全没有影响,因为它已经需要分配分区,这只会在第一次轮询时发生。应该在文档中描述它,还是错过了?

然后我尝试了

  consumer.subscribe(Collections.singletonList(topic));
  consumer.poll(Duration.ofMillis(assignTimeout));
  consumer.seekToBeginning(Collections.emptySet());
  consumer.poll(Duration.ofMillis(longTimeout));//no loop to simplify


事实证明,这取决于assignTimeout。它应该足以完成加入过程。时间可能会有所不同,因此无法依靠它。

然后我为ConsumerRebalanceListener提供了

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      consumer.seekToBeginning(partitions);
    }


还有一个poll。它终于似乎起作用了。

所以问题是:


seekToBeginning之后的subscribe没用吗?是否应记录在案?
ConsumerRebalanceListener解决方案是否可靠?是否保证在应用搜索之前不会轮询来自中间(承诺偏移量)的消息?

最佳答案

对于第一个:

您在问题中正确提到了这一点,即seek()seekToXXXX()操作的先决条件是需要分配分区。在加入消费者组之前不会发生这种情况,只有在调用poll()时才会发生这种情况。因此,seek()操作在subscribe()之后立即无法正常工作是预期的行为。

这实际上记录在《卡夫卡权威指南》第4章“卡夫卡消费者”中的“使用特定偏移量消费记录”部分。

对于第二个问题:

是的,根据Kafka的权威指南,使用ConsumerRebalanceListener是可靠的并且是推荐的方法。

这是同一章的声明,该声明确认了相同的内容:


  有很多不同的方法可以实现一次语义
  ...................,但所有人都会
  需要使用ConsumerRebalance Listener和seek()来确保
  偏移量被及时存储,消费者开始阅读
  来自正确位置的邮件。


希望这可以帮助!

09-25 20:23