RabbiMQ消息可靠性
生产者可靠性
生产者重试机制
问题:生产者发送消息时,出现了网络故障,导致与MQ的连接中断
解决:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,不过AMQP提供的重试机制是阻塞式的重试,就是说多次重试等待的过程中,当前线程是被阻塞的。
生产者确认机制
问题:
- MQ内部处理消息的进程发生了异常
- 生产者发送消息到达MQ未找到交换机(Exchage)
- 生产者发送消息到达MQ的交换机但是未找找到合适的队列
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
总结如下:
- 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
- 其它情况都会返回NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
MQ可靠性
数据持久化
- 交换机持久化
- 队列持久化
- 消息持久化:在springAMQP中 默认消息是持久化的
LazyQueue
在默认情况下,RabbitMQ会将接受到的信息保存在内存中已降低消息收发的延迟,但在某些特殊情况下,回导致消息积压
- 消费者宕机或出现网络故障
- 消息发送量激增,超过了消费者处理速度
- 消费者处理业务发生阻塞
一但出现消息堆积问题,RabbitMQ的内存占用就会越来越高,知道出发内存预警上线,此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut
PageOut会耗费一段时间,并且阻塞队列进城,因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有需求都会被阻塞。
为了解决这个问题RabbitMQ3.6版本增加的LazyQueue特征:
- 接受到消息直接存入磁盘并非内存
- 消费者需要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条消息存储
接受者可靠性
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack
; - 如果是消息处理或校验异常,自动返回
reject
;
- 如果是业务异常,会自动返回
通过下面的配置可以修改SpringAMQP的ACK处理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 消费消息ACK策略
失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回reject,消息会被丢弃
失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。