接着上一讲 消息中间件之RabbitMQ初识,这笔我们来讲讲RabbitMQ中消息丢失的问题。已经怎样在核心业务中避免消息丢失。
血泪故事:商品购物流程中的发货环节引入了RabbitMQ,某天由于网络抖动导致了生产者的消息没有发送到RabbitMQ中,由于没有做消息的可靠性传输保证,消息丢失,导致一批客户迟迟没收到货物而引发投诉,给公司造成了不小的损失。
为了避免上述悲剧重演,我们来了解下在RabbitMQ中我们需要怎样保证消息不丢失。
消息丢失会发生在什么时候
消息的传输过程大致如下图
消息丢失可能发生在
- Producer端 发送到RabbitMQ中由于网络异常或者服务异常导致消息发送失败。
- RabbitMQ服务端 异常或者重启导致消息丢失。
- Consumer端 接收到消息后,消息处理失败,消息丢失。
当然上一讲中有提到在RabbitMQ,生产者发送消息是和Exchange交互,Exchange根据路由规则投递到具体的Queue中,如果路由规则设置有问题,也会导致消息丢失,但此条不在本文讨论重点。
Producer 消息可靠性保证
为了避免由于网络抖动或者RabbitMQ服务端异常导致消息发送失败的问题。可以在Producer发送消息的使用引入了一个确认机制(ack),服务端接收到消息之后,会返回给Producer一个成功或者失败的确认消息。
RabbitMQ提供了两种解决方式:
- 事务机制
- 发送方确认机制
事务方式,主要方法有以下几个
- channel.txSelect() 将当前的channel设置成事务模式。
- channel.txCommit()用于提交事务。
- channel.txRollback()用于事务回滚
下面代码是简单示例
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
//发送失败后续处理,重发或者持久化异常消息稍后重试
}
信号的流转过程如下图
图片来源 RabbitMQ实战指南
如果事务能够提交成功,则消息一定到达了RabbitMQ中。
图片来源 RabbitMQ实战指南
事务机制能够解决消息生产者和RabbitMQ之间消息 确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功。但事务机制是同步阻塞进行的,回大大降低RabbitMQ的吞吐量,RabbitMQ提供了一种改进方案,即发送方确认机制。
发送方确认机制:
- channel.confirmSelect(); 将通道设置确认机制
- channel.addConfirmListener() 为通道添加ConfirmListener这个回调接口。
- com.rabbitmq.client.ConfirmListener#handleAck 回调处理正常被RabbitMQ接收的消息。
- com.rabbitmq.client.ConfirmListener#handleNack回调处理没有被RabbitMQ正常接收的消息。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
//这里需要添加消息发送失败处理的代码,重新发送或者持久化后补偿。
}
});
//模拟一直发送消息的场景
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}
上面例子演示了异步confirm的形式,在保证生产者消息被RabbitMQ正常接收,又没有同步阻塞导致明显降低RabbitMQ吞吐量的问题。
RabbitMQ端
为避免RabbitMQ服务异常或者重启导致的消息丢失,需要对做持久化操作,将相关信息保存到磁盘上。要保证消息不丢失需要持久化主队列、持久化。exchange不持久化,在RabbitMQ服务重启后,相关的exchange元数据会丢失,不过消息不丢失,但消息不能发送到这个exchange中了。
- 队列持久化需要在声明队列的时候将durable参数设置为true。(因为消息是存在与队列中,如果队列不持久化,那RabbitMQ重启后,消息将丢失)
- 消息持久化通过将投递模式设置成2(BasicProperties中的deliveryMode)。
channel.queueDeclare(QUEUE_NAME,true,//durable
false,false,null);
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,//具体属性见下面
message.getBytes(StandardCharsets.UTF_8));
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2, //deliveryMode
0, null, null, null,
null, null, null, null, null, null);
Consumer端
为保证Consumer端不因消费处理异常或消费者应用重启导致消息丢失。我们需要如下操作
- 关闭默认的自动确认。设置为手动确认模式。
当设置为手动确认模式,对于RabbitMQ服务端而言队列中的消息分为了两种
- Ready:等待投递给消费者的消息。
- Unacked:已经投递给消费者,但还没有收到消费者确认新号的消息。
对于Unacked消息,会出现下面几种情况:
- RabbitMQ收到持有消息的消费者的ack信号,RabbitMQ服务端将会删除该消息。
- RabbitMQ服务端收到持有消息的消费者nack/reject信号,requeue参数为true,RabbitMQ会重新将这条消息存入队列。
- RabbitMQ服务端收到持有消息的消费者nack/reject信号,requeue参数为false,如果队列配置了死信队列,则消息进入死信队列,如果没有配置死信队列,则消息被RabbitMQ从队列中删除。
- RabbitMQ服务端没有收到消息持有消费者的确认信号,且消费此消息的消费者没有断开连接,则服务端会一直等待,没有超时时间。
- RabbitMQ服务端没有收到消息持有消费者的确认信号,且消费此消息的消费者已经断开连接,RabbitMQ会安排该消息重新进入队列。
消息拒绝可以使用Channel类中的basicReject或者basicNack方法,下面我们来看下他们之间的差异。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicReject(long deliveryTag, boolean requeue) throws IOException;
- deliveryTag:64位的长整型值,作为消息的编号。
- requeue:是否重入队列配置项。
- multiple:是否批量处理未被当前消费者确认的消息。
我们来看一个代码示例:
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
try{
//消息处理业务逻辑处理
channel.basicAck(deliveryTag, false);
}catch(Exception e){
//处理失败处理逻辑
channel.basicReject(deliveryTag, false);
}
}
});
通过手动确认模式,RabbitMQ只有在收到持有消息的Consumer的应答信号时,才会删除掉消息,保证消息不因Consumer应用异常而导致消息丢失的问题发生。
看了消费端保证消息不丢失的方案,有小伙伴会有疑问,假如RabbitMQ已经把消息投递给了Consumer,Consumer正常的处理了消息,但是由于网络抖动等原因,RabbitMQ没有收到Consumer的ack消息,且认为Consumer已经断开连接,那么RabbitMQ会重新将消息放入队列,并投递给消费者。这样会导致某些消息重复投递给Consumer的问题产生。
在此种方案下RabbitMQ确实有可能产生重复消息的问题,我们将在接下来的文章中去处理这个问题。
该方案只保证消息至少一次投递(At least Once)
死信队列
DLX,全名Dead-Letter-Exchange,死信交换器。当一个消息变为死信(dead message)后,能够被重新DLX上,绑定DLX的队列就是死信队列。
消息变成私信有以下几种可能
- 消息被拒绝(basicNack/basicReject),并且设置requeue参数为false;
- 消息过期。
- 队列超过最大长度。
下面通过一个简化的代码示例来演示下死信队列的使用。详细说明见注释
//声明交换器
channe1.exchangeDeclare("exchange.dlx","direct ",true);
channe1.exchangeDeclare( "exchange.normal "," fanout ",true);
Map<String , Object> args = new HashMap<String, Object>( );
//设置消息超时时间
args.put("x-message-ttl " , 10000);
//通过x-dead-letter-exchange参数来执行DLX
args.put( "x-dead-letter-exchange ","exchange.dlx");
//为DLX指定路由键
args.put( "x-dead-letter-routing-key"," routingkey");
channe1.queueDec1are( "queue.norma1 ",true,fa1se,fa1se,args);
channe1.queueBind( "queue.normal ","exchange .normal", "");
channe1.queueDec1are( "queue.d1x ", true , false , false , null) ;
channe1.queueBind( "queue.dlx","exchange.dlx ", routingkey");
channe1.basicPublish( "exchange.normal" , "rk" ,
MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes()) ;
消息流程见下图
对于RabbitMQ来说,通过分析死信队列中的消息,可以用于改善和优化系统。
总结:消息丢失可能发生在生产端、服务端、消费端。对于重要业务我们可以通过上面介绍的方式来确保消息不丢失。大家也可以留言讨论下,在使用RabbitMQ过程中遇到过哪些坑。
参考文档