本文介绍了卡夫卡正确的方式来轮询没有记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为了使我的消费者保持生命(非常长的可变长度处理),我正在后台线程中实现一个空的poll()调用,如果我在polls()之间花费太多时间,它将使代理无法重新平衡.我将轮询间隔设置为非常长,但是我不想一直不断增加轮询间隔,以进行越来越长的处理.

for keeping my consumer alive (very long variable length processing) I'm implementing a empty poll() call in a background thread that will keep the broker from rebalancing if I spend too much time between polls(). I have set my poll-interval to be very long, but I don't want to just keep increasing it forever for longer and longer processing.

轮询无记录的正确方法是什么?当前,我正在调用poll(),然后重新查询poll call()中返回的每个分区的最早偏移量,以便一旦完成处理先前的消息,主线程就可以正确读取它们.

What's the proper way to poll for no records? Currently I'm calling poll(), then re-seeking back to the earliest offsets for each partition returned in the poll call() so they can be read properly by the main thread once it's done processing the previous messages.

ConsumerRecords<String, String> msgs = kafkaConsumer.poll(timeout);
Map<Integer, Long> partitionToOffsets = getEarliestPartitionOffsets(msgs); // helper method
seekToOffsets(partitionToOffsets);

推荐答案

处理较长时间(并避免消费者重新平衡)的正确方法是使用KafkaConsumer.pause()/KafkaConsumer.resume()方法.您可以在此处了解更多信息:

The proper way to handle long processing time (and avoiding consumer rebalance) is to use KafkaConsumer.pause() / KafkaConsumer.resume() methods. You can read more about it here:

  • KafkaConsumer JavaDoc
  • Apache Kafka JIRA

这篇关于卡夫卡正确的方式来轮询没有记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-10 08:09