注意如果您持有消费者线程超过 5 分钟,将发生重新平衡这里新的 Java Consumer 现在支持来自后台线程的心跳.有一个新的配置 max.poll.interval.ms 用于控制在消费者主动离开组之前轮询调用之间的最长时间(默认为 5 分钟).配置 request.timeout.ms 的值必须始终大于 max.poll.interval.ms 因为这是在消费者重新平衡时JoinGroup 请求可以在服务器上阻塞的最长时间,所以我们已将其默认值更改为略高于 5 分钟.最后将session.timeout.ms的默认值调整为10秒,将max.poll.records的默认值改为500.特别说明来自 spring kafka >2.1.5在下一次轮询之前,消费者线程将执行对外部线程的确认感谢@Gary Russell 提供此信息I have a spring-kafka consumer which reads records and hands them over to a cache. A scheduled task will clear the records in the cache periodically. I want to update the COMMIT OFFSET only after a batch has been successfully saved in the database. I tried passing the acknowledgment object to the cache service to invoke the acknowledge method as shown below.public class KafkaConsumer { @KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" ) public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) { cacheService.add( record.value(), acknowledgment ); }}public class CacheService { // concurrency handling has been left out in favor of readability public void add( String record, Acknowledgment acknowledgment ) { this.records.add(record); this.lastAcknowledgment = acknowledgment; } public void saveBatch() { //called by scheduled task if( records.size() == BATCH_SIZE ) { // perform batch insert into database this.lastAcknowledgment.acknowledge(); this.records.clear(); } }}The AckMode has been set as follows:factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );And the Auto Commit is false:config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );Even though the acknowledge method is called, the commit offset is not updated. What is the best way to update the commit offset after persisting the records?I'm using spring-kafka 2.1.7.RELEASE.EDIT: After @GaryRussell confirmed that acknowledgements made by foreign threads are performed by the consumer thread during the next poll, I rechecked my code and found a bug in how the last acknowledgement object is set. After fixing this, the commit offset IS UPDATED as expected. So this issue has been resolved. However, I have no way to mark this question as answered. 解决方案 Here is the problem, Consumer thread is responsible to commit the offset. At the time of poll consumer thread will submit the previous batch offset.Since in your case AUTO_COMMIT is false and lastAcknowledgment.acknowledge() is not acknowledge the offset is not submit.Only one way to do this, As soon as you get the poll records make Schedule task as Async and hold the consumer thread and submit the offset after completion of Async task, Check this answer for reference answerNote If you hold consumer thread more than 5 minutes rebalance will takes place here he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.Special Note from spring kafka >2.1.5Acknowledgments made on foreign threads will be performed by the consumer thread just before the next poll Thanks for @Gary Russell for this information 这篇关于成功批量插入后更新Kafka提交偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!