问题描述
我有一个 kafka 消费者,我正在从该消费者中消费来自特定主题的数据,但我看到以下异常.我使用的是 0.10.0.0
kafka 版本.
I have a kafka consumer from which I am consuming data from a particular topic and I am seeing below exception. I am using 0.10.0.0
kafka version.
LoggingCommitCallback.onComplete: Commit failed for offsets= {....}, eventType= some_type, time taken= 19ms, error= org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
我添加了这两个额外的消费者属性,但仍然没有帮助:
I added these two extra consumer properties but still it didn't helped:
session.timeout.ms=20000
max.poll.records=500
我正在不同的后台线程中提交偏移量,如下所示:
I am committing offsets in a different background thread as shown below:
kafkaConsumer.commitAsync(new LoggingCommitCallback(consumerType.name()));
该错误是什么意思,我该如何解决?我是否需要添加一些其他消费者属性?
What does that error mean and how can I resolve it? Do I need to add some other consumer properties?
推荐答案
是的,降低 max.poll.records.您将获得较小批量的数据,但会更频繁地调用 poll 将有助于保持会话活跃.
Yes, lower max.poll.records. You'll get smaller batches of data but there more frequent calls to poll that will result will help keep the session alive.
这篇关于“提交失败的偏移量"异步提交偏移量时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!