问题描述
感谢您抽出时间回答问题.我正在将 kafka 与 python 使用者一起使用.当消费者启动并运行并且消息被推送到 kafka 并由消费者读取时,一切都运行良好.
Thanks for taking time to answer the question. I am using kafka with a python consumer. Everything works great when the consumer is up and running and messages get pushed to kafka which are then read by the consumer.
但是,如果消费者由于某种原因宕机,当它恢复时,它只会读取消费者备份后发布到 kafka 的 NEW 消息.shutdown-poweron 之间的消息丢失,即消费者在重新启动后不读取这些消息.
However, if the consumer goes down for whatever reason, when it comes back up, it only reads the NEW messages that are posted to kafka after the consumer is back up. The messages between shutdown-poweron are lost, that is, the consumer does not read these messages after it comes back up.
consumer = KafkaConsumer(..)
是我用来创建消费者的东西.
is what I use to create the consumer.
推荐答案
您使用的是什么客户端?也许有必要为消费者设置起始偏移量.查看seek() 函数和自动提交设置.可能我的代码有帮助,但也许我们使用不同的消费者类(我的:http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):
What client are you using?Maybe it is necessary to set the start offset for the consumer. Have a look at the seek() function and auto-commit setting.May my codes help, but maybe we use different consumer classes (mine:http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):
def connect(self):
'''Initialize Kafka Client and Consumer.'''
try:
print "Try to init KafkaClient:", self.Brokers
self.__kafka_client = KafkaClient( self.Brokers )
print "Try to init Kafka Consumer."
self.__consumer = SimpleConsumer(
self.__kafka_client,
self.GroupID,
self.Topic,
auto_commit = True,
partitions=self.Partitions,
auto_commit_every_n = 100,
auto_commit_every_t=5000,
fetch_size_bytes=4096,
buffer_size=4096,
max_buffer_size=32768,
iter_timeout=None,
auto_offset_reset='largest' )
print "Set the starting offset."
self.__consumer.seek(0, self.OffsetMode)
self.__consumer.seek(0, 0) =>start reading from the beginning of the queue.
self.__consumer.seek(0, 1) =>start reading from current offset.
self.__consumer.seek(0, 2) =>skip all the pending messages and start reading only new messages (** maybeyour case**).
这篇关于Kafka 消费者在关闭后丢失消息状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!