背景

在kafka的消费者中,如果消费某条消息出错,会导致该条消息不会被ack,该消息会被不断的重试,阻塞该分区的其他消息的消费,因此,为了保证消息队列不被阻塞,在出现异常的情况下,我们一般还是会ack该条消息,再另外对失败的情况进行重试

目标

实现一个完善的重试逻辑,一般需要考虑一下几个因素:

  1. 重试的时间间隔
  2. 最大重试次数
  3. 是否会漏掉消息

实现

扔回队尾

在消息出错时,将消息扔回队尾

优点:

  1. 实现简单,没有别的依赖项

缺点:

  1. 无法控制重试时间间隔

基于数据库任务表的扫描方案

在数据库中增加一个任务的状态表,然后用一个定时任务去扫描任务表中,失败的任务,然后进行重试,其中记录下重试的次数即可

优点:

  1. 实现简单,一般这种离线任务,根据统计的需求,都会有一个任务状态表的,所以仅仅是增加一个定时任务去扫表

缺点:

  1. 性能较差,定时任务,一般都在无意义的扫描,浪费性能

新增重试队列的方案

新增一个重试队列,消费消息出错时,将时间戳和消息发送到重试队列,然后在重试队列中,根据时间,来判断阻塞时间,代码如下:

func handleRetryEvent(ctx context.Context, conf *util.Conf, data []byte) (err error) {
    defer common.Recover(ctx, &err)
    log := common.Logger(ctx).WithField("Method", "consumer.handleRetryEvent")
    retryEvent := &MergeRetryEvent{}
    err = json.Unmarshal(data, retryEvent)
    if err != nil {
        log.WithError(err).Error("failed to unmarshal data")
        return nil
    }
    log.WithField("contact_id", retryEvent.ContactId).Info("receive message")
    delaySecond := (retryEvent.CreateTime + SLEEPSECOND) - time.Now().Unix()
    if delaySecond <= 0 {
        log.Info("send message to account merge event")
        err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId)
        return err
    } else {
        log.Infof("sleep %d seconds", delaySecond)
        time.Sleep(time.Duration(delaySecond) * time.Second)
        err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId)
        return err
    }
}

优点:

相对于扫表的方案,改方案没有无意义的扫表操作,性能更好

注意:之前在网上看到一个重试队列的实现,因为害怕开过多的线程(协程),作者用了一个channel来缓存重试消息,然后在一个协程池中去消费消息,消费的逻辑和上面的实例代码差不多,这样做是有风险的,因为channel是在本机的内存中,没有本地存储的,是存在丢消息的风险的(服务重启等情况)

参考链接:

https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a

02-10 22:03