前言
一、设置mandotory参数、AE备份交换器
针对前言中的第(4)个问题,我们可以通过设置mandotory参数与AE备份交换器来解决
1、mandotory参数
1)当为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,此时RabbitMQ会调用Basic.Return命令将消息返回给生产者,消息将不会丢失
2)当为false时,消息将会被直接丢弃。
3)RabbitMQ通过addReturnListener添加ReturnLisener监听器监听获取没有被正确路由到合适队列的消息。
channel.basicPublish(EXCHANGE NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes()); channel.addReturnListener(new ReturnListener(){ public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); System.out.println("Basic.Return 返回的结果是: " + message);
}
});
2、AE备份交换器
Alternate Exchange,简称AE,不设置mandatory参数,那么消息将会被丢失,设置mandatory参数的话,需要添加ReturnListner监听器,增加复杂代码,如果既不想增加代码又不想消息丢失,则使用AE,将没有被路由的消息存储于RabbitMQ中。当mandatory参数用AE一起使用时,mandatory将失效。在介绍AE之前,也认识RabbitMQ对于消息的过期时间TTL设置以及队列的过期时间TTL设置
2.1 TTL过期时间设置
可以对队列设置TTL与消息设置TTL,其中消息设置TTL经常用于死信队列、延迟队列等高级应用中。
1)设置消息TTL
设置TTL过期时间一般有两种当时:一是通过队列属性,对队列中所有消息设置相同的TTL。二就是对消息本身单独设置,每条消息TTL不同。如果一起使用时候,TTL小的为准,当一旦超过设置的TTL时间时,就会变成“死信”。
方式一:针对每条消息设置TTL是通过增加expiration的属性参数实现的,不可能像方式二一样扫描整个队列再判断是否过期,只有当该消息即将被消费时再判定是否过期即可删除,也就是消息即使已经过期,但不一定立马被删除!
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 持久化消息 builder deliveryMode(2); // 设置 TTL=60000ms builder expiration( 60000 ); AMQP.BasicProperties properties = builder. build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());
方式二:通过队列属性设置消息TTL是增加x-message-ttl参数实现的,只需要扫描整个队列头部即可立即删除,也就是消息一旦过期就会被删除!
Map<String, Object> argss = new HashMap<String , Object>(); argss.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss) ;
2)设置队列TTL
通过在队列中添加参数x-message-ttl参数实现,设置队列被自动删除前处于未被使用状态的时间,注意是队列的使用状态,并不是消息是否被消费的状态
设置ttl=30min的队列,时间一到RabbitMQ会保证队列被删除,但是不会保证删除的速度有多快。
Map<String, Object> args = new HashMap<String, Object>{); args.put("x-expires", 1800000); channel.queueDeclare("myqueue", false, false, false, args);
2.2 AE备份交换器的使用
声明交换器的时候,添加alternate-exchange参数实现,或通过策略实现。前者优先级高。从代码角度需要以下三个步骤,具体代码如下:
Map<String, Object> args = new HashMap<String, Object>(); args.put("a1ternate-exchange", "myAe"); channe1.exchangeDec1are("norma1Exchange", "direct", true, fa1se, args); channe1.exchangeDec1are("myAe", "fanout", true, fa1se, nu11) ; channe1.queueDec1are( "norma1Queue", true, fa1se, fa1se, nu11); channe1.queueB nd("norma1Queue", "norma1Exchange", "norma1Key"); channe1.queueDec1are("unroutedQueue", true, fa1se, fa1se, nu11);
1)声明normalExchange类型为direct的交换器、类型为fanout的myAe备份交换器;并且normalExchange的备份交换器为myAe(备份交换器建议使用fanout类型交换器)
2)声明normalQueue队列,声明unrouteQueue队列;
3)通过路由键normalKey绑定normalExchange与normalQueue,不适用路由键绑定unrouteQueue与myAe
二、消费者手动确认
针对前言中第(3)个问题,我们需要在消费者消费完消息后手动进行确认,保证消息数据不丢失!
1、autoAck参数设置
1) 当autoAck参数为false时,手动确认:
RabbitMQ会等待消费者显式地回复确认信号后从内存中移去消息(实际上是先标示删除标记,之后再删除),这是一般推荐使用的方式,因为使用手动确认有足够的时间处理消息,不需要担心消费者进程挂掉之后消息丢失问题。此时的消息就会分为两个部分:一是等待投递给消费者的消息;二是已经投递给消费者但还没有收到消费者确认信号的消息。
2) 当autoAck为true时,自动确认:
RabbitMQ会自动隐式地回复确认信号后从内存中移去消息, RabbitMQ不需要管消费者是否真正消费了这些消息,RabbitMQ会自动把发送出去的消息置为确认,然后直接从内存中删除。
2、重新投递
问:如果选择手动确认,即autoAck为false时,消费者由于某些原因断开了,那么消息的确认会受到影响,那么此时的消息会丢失吗?
3、消费者拒绝消息
1)使用channel.basicReject方法,但只能拒绝一条。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:消息的唯一标识
requeue:表示是否可以拒绝的消息重新存入队列
2)使用channel.basicNack。不同于前者,此方法可以批量拒绝。
void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
multiple:设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
3)问:关键在于,消费者拒绝消费消息后怎么处理?是丢弃,还是重新回到队列呢?
三、生产者确认机制
针对前言的第(1)个问题,我们可以通过生产者的确认消息机制来解决,主要分为两种:第一是事务机制、第二是发送方确认机制
1、事务机制
与事务机制相关的有三种方法,分别是channel.txSelect设置当前信道为事务模式、channel.txCommit提交事务和channel.txRollback事务回滚。如果事务提交成功,则消息一定是到达了RabbitMQ中,如果事务提交之前由于发送异常或者其他原因,捕获后可以进行channel.txRollback回滚。
// 将信道设置为事务模式,开启事务 channel.txSelect(); // 发送持久化消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, "transaction messages".getBytes()); // 事务提交 channel.txCommit();
发生异常之后事务回滚
try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, "transaction messages".getBytes()); channel.txCommit(); } catch (Exception e){ e.printStackTrace(); channel.txRollback(); }
2、确认机制
确认机制相对来说,相比较代码来说比较复杂。主要有单条确认、批量确认、异步批量确认
-----------------------------------------未完待续,写不动了,休息会再补充下一篇-------------------------------------------