当我使用producer.flush()时,它可以工作,但根据kafka confluent issue,性能却很差,但是如建议的那样,我使用producer.poll(0),但不会对主题产生任何消息,是否需要任何配置,或者我在这里缺少任何内容?

self.producer.produce(topic.value, data.encode('utf-8'), callback=self.delivery_report)

self.producer.poll(0)  # -> doesn't work
self.producer.flush()  # -> works

最佳答案

问题是poll()不会使生产者同步,只是调用事件并触发您注册的任何callback()

您在上一行中发送的消息肯定仍在生产者缓冲区中,也就是说,它甚至没有从您的网卡中发出,因此代理人不认可它,并且您注册的回调也不是。被触发。这些内部缓冲区由内部kafka线程清空,这些线程等待达到一定的时间/大小,然后将消息发送给代理。 poll()调用在那里使零作业(只要生产者以前没有发送过任何消息),因为消息(最确定的)没有到达kafka经纪人(它甚至没有从制片人的主持人)。

当您调用flush()时,您正在使生产者成为同步者:它将使客户端等待任何未完成的消息传递到代理。这就是为什么您只能在调用flush()之后产生的原因。

Edenhill很好地解释了in this snippet

10-08 11:53