适用场景:订单超时未支付,倘若适用定时器的话,那么数据量大的话,轮询查询数据,首先IO开销大,其次任务时间要求高,扫描越频繁性能可能就越低。
延迟队列顾名思义延迟消费数据,那么先解释下延迟队列涉及的关键概念
1、消息的TTL(Time To Live)
RabbitMQ允许为消息和队列设置TTL(生存时间),若对消息设置了ttl,如果超过了ttl配置则消息死了,称之为死信.请注意,路由到多个队列的消息可能会在其所在的每个队列中的不同时间或根本不会消亡(不同队列分别设置ttl)。故一个队列中的消息死亡对其他队列中相同消息的生命没有影响。对队列设置就是队列没有消费者连着的保留时间。
1.1 对消息设置统一TTL
@Bean
public Queue delayTTLQueue() {
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
paramMap.put("x-message-ttl",3000);
return new Queue(RabbitMqConfig.QUEUE_TTL_NAME,true,false,false,paramMap);
}
1.2 对消息分别设置TTL,对比上一个少了个x-messgae-ttl参数设置
/**
* 延迟队列,超时时间不一致
* @return
*/
@Bean
public Queue delayTTLQueue2() {
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
return new Queue(RabbitMqConfig.QUEUE_TTL_NAME2,true,false,false,paramMap);
}
注意:生产者发送消息时,给他们设置了不同时间的过期
/**
* 延迟时间不一致
* @param message
* @throws InterruptedException
*/
public void sendPerQueueTTL2(Object message) throws InterruptedException {
DelayConsumer.latch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {
long expiration = i * 1000;
rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2,
message+" the expiration time is "+expiration,new ExpirationMessagePostProcessor(expiration));
}
DelayConsumer.latch.await();
}
2、死信交换机DLX(Dead Letter Exchanges)
一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。
2.1 消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
2.2 上面的消息的TTL到了,消息过期了。
2.3 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机
大致流程图
3、代码示例
3.1 queue配置
/**
* 实际消费队列
* @return
*/
@Bean
public Queue delayQueue() {
return new Queue(RabbitMqConfig.DELAY_PROCESS_QUEUE_NAME,true,false,false);
}
/**
* 延迟队列
* @return
*/
@Bean
public Queue delayTTLQueue() {
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
paramMap.put("x-message-ttl",3000);
return new Queue(RabbitMqConfig.QUEUE_TTL_NAME,true,false,false,paramMap);
}
/**
* 延迟队列,超时时间不一致
* @return
*/
@Bean
public Queue delayTTLQueue2() {
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
return new Queue(RabbitMqConfig.QUEUE_TTL_NAME2,true,false,false,paramMap);
}
3.2 exchange 配置
/**
* 死信交换机
* @return
*/
@Bean
public DirectExchange delayExchange(){
DirectExchange directExchange = new DirectExchange(RabbitMqConfig.DELAY_EXCHANGE_NAME,true,false);
return directExchange;
}
/**
* 延迟交换机
* @return
*/
@Bean
public DirectExchange perQueueTTLExchange(){
DirectExchange directExchange = new DirectExchange(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,true,false);
return directExchange;
}
3.3 exchange和queue绑定关系
/**
* 绑定死信队列
* @return
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(queueConfig.delayQueue()).to(exchangeConfig.delayExchange()).with(RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
}
/**
* 绑定延迟队列1
* @return
*/
@Bean
public Binding queueTTLBinding() {
return BindingBuilder.bind(queueConfig.delayTTLQueue()).to(exchangeConfig.perQueueTTLExchange()).with(RabbitMqConfig.QUEUE_TTL_ROUTING_KEY);
}
/**
* 绑定延迟队列2
* @return
*/
@Bean
public Binding queueTTLBinding2() {
return BindingBuilder.bind(queueConfig.delayTTLQueue2()).to(exchangeConfig.perQueueTTLExchange()).with(RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2);
}
3.4 生产者发送消息
public void sendPerQueueTTL(Object message) throws InterruptedException {
DelayConsumer.latch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {
rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY, message);
}
DelayConsumer.latch.await();
}
/**
* 延迟时间不一致
* @param message
* @throws InterruptedException
*/
public void sendPerQueueTTL2(Object message) throws InterruptedException {
DelayConsumer.latch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {
long expiration = i * 1000;
rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2,
message+" the expiration time is "+expiration,new ExpirationMessagePostProcessor(expiration));
}
DelayConsumer.latch.await();
}
3.5 消费者消费对应消息
@Component
public class DelayConsumer implements ChannelAwareMessageListener {
public static CountDownLatch latch;
public static final String FAIL_MESSAGE = "This message will fail";
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
processMessage(message);
}
catch (Exception e) {
// 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
channel.basicPublish(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME, RabbitMqConfig.QUEUE_TTL_ROUTING_KEY, null,
"The failed message will auto retry after a certain delay".getBytes());
}
if (latch != null) {
latch.countDown();
}
}
/**
* 模拟消息处理。如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常
*
* @param message
* @throws Exception
*/
public void processMessage(Message message) throws Exception {
String realMessage = new String(message.getBody());
System.out.println("Received <" + realMessage + ">");
if (Objects.equals(realMessage, FAIL_MESSAGE)) {
throw new Exception("Some exception happened");
}
}
}
测试结果
设置消息超时一致的测试结果如图:
设置消息超时时间一致
本文示例代码 详见 GitHub 死信队列+TTL实现延迟队列