我正在使用Zookeeper从kafka获取数据。在这里,我总是从最后一个偏移点获取数据。有什么办法指定获取旧数据的偏移时间?

有一个选项autooffset.reset。它接受最小或最大。有人可以解释什么是最小和最大的。 autooffset.reset可以帮助从旧的偏移点而不是最新的偏移点获取数据吗?

最佳答案

使用者始终属于一个组,对于每个分区,Zookeeper都会跟踪该使用者组在分区中的进度。

要从头开始获取,您可以删除侯赛因所引用的与进度相关的所有数据

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

您还可以指定所需分区的偏移量,如core / src / main / scala / kafka / tools / UpdateOffsetsInZK.scala中所指定
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

但是,偏移量不是按时间索引的,但是您知道每个分区都是一个序列。

如果您的消息包含时间戳(请注意,该时间戳与Kafka收到消息的时刻无关),则可以尝试建立索引器,尝试通过将偏移量增加N来逐步检索一个条目,并存储元组(主题X,第2部分,偏移量100,时间戳)在某处。

当您想从指定的时间点检索条目时,可以对您的粗略索引进行二进制搜索,直到找到所需的条目并从那里获取。

关于apache-kafka - 如何从Kafka中的旧偏移点获取数据?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/14935755/

10-16 16:33