我正在尝试实现基本方案,从头开始重新阅读主题(至少1条消息),并且我遇到了意外的行为。
假设有1个分区主题正好保存了100万条消息,有1个具有偏移量的使用者已经在中间某个地方提交,而没有活动的生产者。
首先我尝试过
consumer.subscribe(Collections.singletonList(topic));
consumer.seekToBeginning(Collections.emptySet());
consumer.poll(Duration.ofMillis(longTimeout)); //no loop to simplify
那是行不通的(没有轮询消息)。我读过
seekToBeginning
是懒惰的(没关系),但事实证明,seekToBeginning
完全没有影响,因为它已经需要分配分区,这只会在第一次轮询时发生。应该在文档中描述它,还是错过了?然后我尝试了
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofMillis(assignTimeout));
consumer.seekToBeginning(Collections.emptySet());
consumer.poll(Duration.ofMillis(longTimeout));//no loop to simplify
事实证明,这取决于
assignTimeout
。它应该足以完成加入过程。时间可能会有所不同,因此无法依靠它。然后我为
ConsumerRebalanceListener
提供了 @Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
还有一个
poll
。它终于似乎起作用了。所以问题是:
seekToBeginning
之后的subscribe
没用吗?是否应记录在案?ConsumerRebalanceListener
解决方案是否可靠?是否保证在应用搜索之前不会轮询来自中间(承诺偏移量)的消息? 最佳答案
对于第一个:
您在问题中正确提到了这一点,即seek()
或seekToXXXX()
操作的先决条件是需要分配分区。在加入消费者组之前不会发生这种情况,只有在调用poll()
时才会发生这种情况。因此,seek()
操作在subscribe()
之后立即无法正常工作是预期的行为。
这实际上记录在《卡夫卡权威指南》第4章“卡夫卡消费者”中的“使用特定偏移量消费记录”部分。
对于第二个问题:
是的,根据Kafka的权威指南,使用ConsumerRebalanceListener
是可靠的并且是推荐的方法。
这是同一章的声明,该声明确认了相同的内容:
有很多不同的方法可以实现一次语义
...................,但所有人都会
需要使用ConsumerRebalance Listener和seek()来确保
偏移量被及时存储,消费者开始阅读
来自正确位置的邮件。
希望这可以帮助!