问题描述
我正在为Kafka使用Python高级消费者,并且想知道一个主题的每个分区的最新偏移量.但是我无法使它正常工作.
I am using the Python high level consumer for Kafka and want to know the latest offsets for each partition of a topic. However I cannot get it to work.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
但是我得到的输出是
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
我有一种使用assign
的替代方法,但结果是相同的
I have an alternate approach using assign
but the result is the same
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
从某些文档看来,如果未发布fetch
,则可能会出现这种情况.但是我找不到强迫这种方法的方法.我在做什么错了?
It seems from some of the documentation that I might get this behaviour if a fetch
has not been issued. But I cannot find a way to force that. What am I doing wrong?
或者是否有其他不同/简单的方法来获取主题的最新偏移量?
Or is there a different/simpler way to get the latest offsets for a topic?
推荐答案
最后,在花了一天时间和一些错误的开始之后,我找到了一个解决方案并使它工作.将其发布给她,以便其他人参考.
Finally after spending a day on this and several false starts, I was able to find a solution and get it working. Posting it her so that others may refer to it.
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload
client = SimpleClient(brokers)
partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
for r in offsets_responses:
print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
这篇关于如何获取kafka主题分区的最新偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!