本文介绍了Spring Kafka 总是在 5 分钟后重新平衡,即使我暂停消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个耗时的操作(大约 10 分钟),但是 kafka 在 5 分钟后重新平衡,即使我暂停了消费者.消费者方法:

there is a Time-consuming operation (about 10 min) ,but kafka aways rebalance after 5 min,even i pause the consumer. the consumer method :

    @KafkaListener(topics = {TopicAppoint.EXECUTE_SCHOOL_DATA_STATICS_TASK})
public void receiveMessage(@Payload String payload, Consumer<String, String> consumer) {

    Set<TopicPartition> assignment = consumer.assignment();
    consumer.pause(assignment);

    if (StringUtils.isNotEmpty(payload)) {
        SchoolStatisticsTaskDTO staticsTaskDTO = JSONObject.parseObject(payload, SchoolStatisticsTaskDTO.class);
        Optional<SchoolStatisticsTaskDO> taskOptional = schoolStatisticsTaskRepository.findById(staticsTaskDTO.getTrackId());
        taskOptional.ifPresent(schoolStaticsTaskDO -> {
            // handler
        });
    }

    consumer.resume(assignment);
}

这是我的配置:

  kafka:
bootstrap-servers: 192.168.0.230:9092
producer:
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  value-serializer: org.apache.kafka.common.serialization.StringSerializer
  retries: 3
  properties:
    max.request.size: 12582912
consumer:
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  group-id: dc-fitness-data-consumer-group
  properties:
    max.partition.fetch.bytes: 12582912
  #enable-auto-commit: false
listener:
  ack-mode: record
  concurrency: 6

日志

  13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
   13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
   13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
  13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.220 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#5-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-47, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.225 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] (Re-)joining group

推荐答案

先看一下文档,从 pause() 方法开始

Lets look at the documentation for a while, begin with pause() method

public void pause(Collection partitions) 这里

暂停从请求的分区中提取.以后对 poll(long) 的调用将不会从这些分区返回任何记录,直到使用 resume(Collection) 恢复它们.请注意,此方法不影响分区订阅.特别是在使用自动分配时,它不会导致组重新平衡.

所以根据上面的描述,pause() 方法会暂停分区获取消息,但不会暂停消费者线程,这意味着消费者线程将执行后续的poll() 请求暂停的分区但不会获取任何记录

So from the above description pause() method will suspend the partition from fetching the messages but it will not pause the consumer thread, which means consumer thread will do the subsequent poll() request to paused partitions but will not fetch any records

在您的应用程序中:分区已暂停,但消费者线程忙于执行超过 10 分钟的耗时操作,而没有执行任何轮询请求.

In your application : partitions are paused but consumer thread is busy in executing Time consuming operations for more that 10 minutes without doing any poll request.

检测消费者故障:所以当轮询停止时心跳不会发送到集群

Detecting Consumer Failures : so when poll stops heartbeat will not sent to the cluster

消费者订阅一组主题后,会在调用 poll(long) 时自动加入该组.poll API 旨在确保消费者的活跃度.只要您继续调用 poll,消费者就会留在组中并继续从分配给它的分区接收消息.在幕后,poll API 会定期向服务器发送心跳;当您停止调用 poll(可能是因为抛出了异常)时,将不会发送心跳.如果在服务器收到心跳之前经过了配置的会话超时时间,则消费者将被踢出组,其分区将被重新分配.

解决方案:增加以下属性的超时时间,因为默认时间是 5 分钟,您会看到每 5 分钟重新平衡一次(个人不支持增加超时时间)这里

Solution : increase the timeouts for below properties since default time is 5 minutes you are seeing rebalancing for every 5 minutes (personally will not support increasing timeouts) here

新的 Java Consumer 现在支持来自后台线程的心跳.有一个新的配置 max.poll.interval.ms 控制消费者主动离开组之前轮询调用之间的最长时间(默认为 5 分钟).配置 request.timeout.ms 的值必须始终大于 max.poll.interval.ms 因为这是在消费者重新平衡时 JoinGroup 请求可以在服务器上阻塞的最长时间,所以我们已将其默认值更改为略高于 5 分钟.最后将session.timeout.ms的默认值调整为10秒,将max.poll.records的默认值改为500.

解决方案2:如果您需要用更少的时间处理大量数据,则增加更多分区并使用每个线程消耗每个分区(这意味着更多的并发),可以在 5 中处理更少的数据分钟

Solution 2: if you need to process huge data with less amount of time increase more partition and consume each partition with each thread (which means more concurrency), less data that can be processed in 5 minutes

这篇关于Spring Kafka 总是在 5 分钟后重新平衡,即使我暂停消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 20:25