问题描述
我正在使用 kafka-python 来使用来自 kafka 队列的消息(kafka 0.10 版).2.0).特别是我使用 KafkaConsumer 类型.如果消费者停止并在一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息.我怎样才能做到这一点?
i am using kafka-python to consume messages from a kafka queue (kafka version 0.10.2.0). In particular i am using KafkaConsumer type.If the consumer stops and after a while it is restarted i would like to restart from the latest produced message, that is drop all the messages produced during the time the consumer was down.How can i achieve this?
谢谢
推荐答案
你不会seekToEnd()
到日志的末尾.
You will not to seekToEnd()
to the end of the log.
请记住,您首先需要订阅主题,然后才能搜索.此外,订阅是懒惰的.因此,您还需要先添加虚拟投票",然后才能进行搜索.
Keep in mind, that you first need to subscribe to a topic before you can seek. Also, subscribing is lazy. Thus, you will need to add a "dummy poll" before you can seek, too.
consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()
// now enter your regular poll-loop
这篇关于消费者重新启动后,kafka-python 从最后生成的消息中读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!