问题描述

配置

Kafka-client 2.x, Spring-Kafka

默认配置

Kafka 三个partition, 使用KafkaListener按group消费。

现象

某天突然发现两个partition出现了Lag堆积,并且一直没有下降。看业务日志有相同消息在不断重复消费。

分析

看日志发现一直在刷,某个partition的任务超过了max.poll.intervals.ms, reassign other partition.

 OffsetAndMetadata{offset=3107, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

思考应该是消息后续的任务执行时间太长了,导致超过了默认时间,kafka server认为任务失败,不断指定新的partition去做,而之前在规定时间外做完的已经不可以提交offset了。

修复

做异步,使用线程池,将consumer收到的消息打到线程池去做,放进去之后即可立刻提交offset。

hotfix之后lag很快消费掉了,恢复正常。

优化

在consumer中最好加入partition, offset 信息到日志中,有助排查问题。

可以在消费时计算Lag信息,提供异常报警。

使用线程池异步在重启时有风险,可能会丢失数据。最好还是加长max.poll.intervals.ms。

对于关键业务统计耗时,并且设置max.poll.intervals.ms时留有余力。

KafkaListener默认一次消费一条 也可以指定 max.poll.records。

在producer端send时指定key,可以确定消费端唯一性。

Demo

Producer

   @Resource
private KafkaTemplate kafkaTemplate; @SuppressWarnings("unchecked")
public void sendMessage(String topic, final String message) {
LogUtil.info(LogTypeEnum.APPLICATION, "sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic, message);
}

Consumer

  @KafkaListener(topics = "${topic}", groupId = "groupId",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "${kafka.consumer.listen.auto.start}",
properties = "max.poll.records=1")
public void businessConsumer(ConsumerRecord<Integer, String> consumerRecord,
@Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer) {
LogUtil.info(LogTypeEnum.APPLICATION,
"business log, offset is {}, partition id is {}, received message key is {}, param is {}",
consumerRecord.offset(), consumerRecord.partition(), consumerRecord.key(),
consumerRecord.value());
checkLag(consumer);
 } private void checkLag(KafkaConsumer<String, String> consumer) {
final Set<TopicPartition> topicPartitions = consumer.assignment();
Map<TopicPartition, Long> tailOffsetMap = consumer.endOffsets(topicPartitions);
for (final TopicPartition topicPartition : topicPartitions) {
final long tailOffset = tailOffsetMap.get(topicPartition);
final long currentOffset = consumer.position(topicPartition);
if (tailOffset < currentOffset || (tailOffset - currentOffset) > 30) {
LogUtil.error(LogTypeEnum.APPLICATION,
"kafka consumer lag exception, current offset is {}, max offset is {}",
currentOffset, tailOffset);
}
}
}
05-28 23:28