问题描述
在轮询Kafka时,我使用 subscribe()
函数订阅了多个主题。现在,我想设置我想从每个主题中读取的偏移量,而不是在每次 seek()
和 poll()之后重新订阅
来自一个主题。将在每个主题名称上迭代地调用 seek()
,在轮询数据之前实现结果?
如何在Kafka中准确存储偏移量?
While polling Kafka, I have subscribed to multiple topics using the subscribe()
function. Now, I want to set the offset from which I want to read from each topic, without resubscribing after every seek()
and poll()
from a topic. Will calling seek()
iteratively over each of the topic names, before polling for data achieve the result?How are the offsets exactly stored in Kafka?
我每个主题只有一个分区,只有一个消费者可以从所有主题中读取。
I have one partition per topic and just one consumer to read from all topics.
推荐答案
每个主题的KAFKA商店如何抵消?
Kafka已将偏移存储从zookeeper转移到kafka经纪人。原因如下:
Kafka has moved the offset storage from zookeeper to kafka brokers. The reason is below:
Kafka将偏移提交存储在主题中,当使用者提交偏移量时,kafka将提交偏移消息发布到提交 - 日志主题,并保留将组/主题/分区映射到最新偏移量以便快速检索的内存中结构。更多设计信息可以在中找到。
Kafka store the offset commits in a topic, when consumer commit the offset, kafka publish an commit offset message to an "commit-log" topic and keep an in-memory structure that mapped group/topic/partition to the latest offset for fast retrieval. More design infomation could be found in this.
现在,我想设置我想从每个主题中读取的偏移量,而不是在主题的每次seek()和poll()之后重新订阅。
kafka管理工具有一项新功能可以重置偏移量。
There is a new feature about kafka admin tools to reset offset.
您可以参考的更多选项
more options you can refer https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
这篇关于Kafka如何为每个主题存储偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!