本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~
参考文档
csdn博客:
基础部分:https://blog.csdn.net/qq_35387940/article/details/100514134
高级部分:https://blog.csdn.net/weixin_49076273/article/details/124991012
application.yml
server:
port: 8021
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: JCcccHost
#确认消息已发送到交换机(Exchange)
#publisher-confirms: true
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
完善更多信息
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
retry:
#发布重试,默认false
enabled: true
#重试时间 默认1000ms
initial-interval: 1000
#重试最大次数 最大3
max-attempts: 3
#重试最大间隔时间
max-interval: 10000
#重试的时间隔乘数,比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s
multiplier: 1
listener:
# 默认配置是simple
type: simple
simple:
# 手动ack Acknowledge mode of container. auto none
acknowledge-mode: manual
#消费者调用程序线程的最小数量
concurrency: 10
#消费者最大数量
max-concurrency: 10
#限制消费者每次只处理一条信息,处理完在继续下一条
prefetch: 1
#启动时是否默认启动容器
auto-startup: true
#被拒绝时重新进入队列
default-requeue-rejected: true
相关注解说明
@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue 里面的消息。
@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致。
@Component
public class PointConsumer {
//监听的队列名
@RabbitListener(queues = "point.to.point")
public void processOne(String name) {
System.out.println("point.to.point:" + name);
}
}
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}
@RabbitHandler
public void processMessage2(byte[] message) {
System.out.println(new String(message));
}
}
@Payload
可以获取消息中的 body 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body) {
System.out.println("body:"+body);
}
@Header,@Headers
可以获得消息中的 headers 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
System.out.println("body:"+body);
System.out.println("token:"+token);
}
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
System.out.println("body:"+body);
System.out.println("Headers:"+headers);
}
快速使用
配置xml文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置exchange、queue
注解快速创建版本
@Configuration
public class RabbitmqConfig {
//创建交换机
//通过ExchangeBuilder能创建direct、topic、Fanout类型的交换机
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange("zx_topic_exchange").durable(true).build();
}
//创建队列
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable("zx_queue").build();
}
/**
* 将队列与交换机绑定
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
Direct
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
Fanout
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class FanoutRabbitConfig {
/**
* 创建三个队列 :fanout.A fanout.B fanout.C
* 将三个队列都绑定在交换机 fanoutExchange 上
* 因为是扇型交换机, 路由键无需配置,配置也不起作用
*/
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
Topic
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class TopicRabbitConfig {
//绑定键
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
//这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
生产者发送消息
直接发送给队列
//指定消息队列的名字,直接发送消息到消息队列中
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
发送给交换机,然后走不同的模式
////指定交换机的名字,将消息发送给交换机,然后不同模式下,消息队列根据key得到消息
@Test
public void testSendDirectExchange() {
// 交换机名称,有三种类型
String exchangeName = "itcast.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息,red为队列的key,因此此队列会得到消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
也可以将发送的消息封装到HashMap中然后发送给交换机
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
消费者接收消息
//使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
@Component
public class MessageListener {
@RabbitListener(queues = "direct_queue")
public void receive(String id){
System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);
}
}
参数用Map接收也可以
@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
}
}
高级特性
消息可靠性传递
有confirm和return两种
在application.yml中添加以下配置项:
server:
port: 8021
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: JCcccHost
#确认消息已发送到交换机(Exchange)
#publisher-confirms: true
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
有两种配置方法:
写到配置类中
写到工具类或者普通类中,但是这个类得实现那两个接口
写法一
编写消息确认回调函数
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"消息:"+message);
System.out.println("ReturnCallback: "+"回应码:"+replyCode);
System.out.println("ReturnCallback: "+"回应信息:"+replyText);
System.out.println("ReturnCallback: "+"交换机:"+exchange);
System.out.println("ReturnCallback: "+"路由键:"+routingKey);
}
});
return rabbitTemplate;
}
}
写法二
@Component
@Slf4j
public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Resource
private RedisTemplate<String, String> redisTemplate;
@Resource
private RabbitTemplate rabbitTemplate;
private String finalId = null;
private SmsDTO smsDTO = null;
/**
* 发布者确认的回调
*
* @param correlationData 回调的相关数据。
* @param b ack为真,nack为假
* @param s 一个可选的原因,用于nack,如果可用,否则为空。
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
// 消息发送成功,将redis中消息的状态(status)修改为1
if (b) {
redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX + finalId, "status", 1);
} else {
// 发送失败,放入redis失败集合中,并删除集合数据
log.error("短信消息投送失败:{}-->{}", correlationData, s);
redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId, this.smsDTO);
}
}
/**
* 发生异常时的消息返回提醒
*
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("发生异常,返回消息回调:{}", returnedMessage);
// 发送失败,放入redis失败集合中,并删除集合数据
redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId, this.smsDTO);
}
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
}
消息确认机制
手动确认
yml配置
#手动确认 manual
listener:
simple:
acknowledge-mode: manual
写法一
首先在消费者项目中创建MessageListenerConfig
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;//消息接收处理类
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
//设置一个队列
container.setQueueNames("TestDirectQueue");
//如果同时设置多个如下: 前提是队列都是必须已经创建存在的
// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
//container.setQueues(new Queue("TestDirectQueue",true));
//container.addQueues(new Queue("TestDirectQueue2",true));
//container.addQueues(new Queue("TestDirectQueue3",true));
container.setMessageListener(myAckReceiver);
return container;
}
}
然后创建手动确认监听类MyAckReceiver(手动确认模式需要实现ChannelAwareMessageListener)
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
byte[] body = message.getBody();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));
Map<String,String> msgMap = (Map<String,String>) ois.readObject();
String messageId = msgMap.get("messageId");
String messageData = msgMap.get("messageData");
String createTime = msgMap.get("createTime");
ois.close();
System.out.println(" MyAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
//channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
如果想实现不同的队列,有不同的监听确认处理机制,做不同的业务处理,那么这样做:
首先需要在配置类中绑定队列,然后只需要根据消息来自不同的队列名进行区分处理即可
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
byte[] body = message.getBody();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));
Map<String,String> msgMap = (Map<String,String>) ois.readObject();
String messageId = msgMap.get("messageId");
String messageData = msgMap.get("messageData");
String createTime = msgMap.get("createTime");
ois.close();
if ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
System.out.println("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
System.out.println("执行TestDirectQueue中的消息的业务处理流程......");
}
if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
System.out.println("消息成功消费到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
System.out.println("执行fanout.A中的消息的业务处理流程......");
}
channel.basicAck(deliveryTag, true);
//channel.basicReject(deliveryTag, true);//为true会重新放回队列
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
写法二
@Component
@Slf4j
public class SendSmsListener {
@Resource
private RedisTemplate<String, String> redisTemplate;
@Resource
private SendSmsUtils sendSmsUtils;
/**
* 监听发送短信普通队列
* @param smsDTO
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = SMS_QUEUE_NAME)
public void sendSmsListener(SmsDTO smsDTO, Message message, Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
int retryCount = (int) redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX + messageId, "retryCount");
if (retryCount > 3) {
//重试次数大于3,直接放到死信队列
log.error("短信消息重试超过3次:{}", messageId);
//basicReject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
//该方法reject后,该消费者还是会消费到该条被reject的消息。
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
return;
}
try {
String phoneNum = smsDTO.getPhoneNum();
String code = smsDTO.getCode();
if(StringUtils.isAnyBlank(phoneNum,code)){
throw new RuntimeException("sendSmsListener参数为空");
}
// 发送消息
SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum, code);
SendStatus[] sendStatusSet = sendSmsResponse.getSendStatusSet();
SendStatus sendStatus = sendStatusSet[0];
if(!"Ok".equals(sendStatus.getCode()) ||!"send success".equals(sendStatus.getMessage())){
throw new RuntimeException("发送验证码失败");
}
//手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("短信发送成功:{}",smsDTO);
redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
} catch (Exception e) {
redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,"retryCount",retryCount+1);
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
/**
* 监听到发送短信死信队列
* @param sms
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)
public void smsDelayQueueListener(SmsDTO sms, Message message, Channel channel) throws IOException {
try{
log.error("监听到死信队列消息==>{}",sms);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
消费端限流
#配置RabbitMQ
spring:
rabbitmq:
host: 192.168.126.3
port: 5672
username: guest
password: guest
virtual-host: /
#开启自动确认 none 手动确认 manual
listener:
simple:
#消费端限流机制必须开启手动确认
acknowledge-mode: manual
#消费端最多拉取的消息条数,签收后不满该条数才会继续拉取
prefetch: 5
消息存活时间TTL
可以设置队列的存活时间,也可以设置具体消息的存活时间
设置队列中所有消息的存活时间
return QueueBuilder
.durable(QUEUE_NAME)//队列持久化
.ttl(10000)//设置队列的所有消息存活10s
.build();
即在创建队列时,设置存活时间
设置某条消息的存活时间
//发送消息,并设置该消息的存活时间
@Test
public void testSendMessage()
{
//1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
//2.设置存活时间
messageProperties.setExpiration("10000");
//3.创建消息对象
Message message = new Message("sendMessage...".getBytes(),messageProperties);
//4.发送消息
rabbitTemplate.convertAndSend("my_topic_exchange1","my_routing",message);
}
若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。
在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准
死信队列
死信队列和普通队列没有任何区别,只需要将普通队列需要绑定死信交换机和死信队列就能够实现功能
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration//Rabbit配置类
public class RabbitConfig4 {
private final String DEAD_EXCHANGE = "dead_exchange";
private final String DEAD_QUEUE = "dead_queue";
private final String NORMAL_EXCHANGE = "normal_exchange";
private final String NORMAL_QUEUE = "normal_queue";
//创建死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange()
{
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)//交换机类型 ;参数为名字 topic为通配符模式的交换机
.durable(true)//是否持久化,true即存到磁盘,false只在内存上
.build();
}
//创建死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue()
{
return QueueBuilder
.durable(DEAD_QUEUE)//队列持久化
//.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
.build();
}
//死信交换机绑定死信队列
@Bean
//@Qualifier注解,使用名称装配进行使用
public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE) Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dead_routing")
.noargs();
}
//创建普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange()
{
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)//交换机类型 ;参数为名字 topic为通配符模式的交换机
.durable(true)//是否持久化,true即存到磁盘,false只在内存上
.build();
}
//创建普通队列
@Bean(NORMAL_QUEUE)
public Queue normalQueue()
{
return QueueBuilder
.durable(NORMAL_QUEUE)//队列持久化
//.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
.deadLetterExchange(DEAD_EXCHANGE)//绑定死信交换机
.deadLetterRoutingKey("dead_routing")//死信队列路由关键字
.ttl(10000)//消息存活10s
.maxLength(10)//队列最大长度为10
.build();
}
//普通交换机绑定普通队列
@Bean
//@Qualifier注解,使用名称装配进行使用
public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange, @Qualifier(NORMAL_QUEUE) Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
延迟队列
RabbitMQ并未实现延迟队列功能,所以可以通过死信队列实现延迟队列的功能
即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。
消费者监听延迟队列
@Component
public class ExpireOrderConsumer {
//监听过期订单队列
@RabbitListener(queues = "expire_queue")
public void listenMessage(String orderId)
{
//模拟处理数据库等业务
System.out.println("查询"+orderId+"号订单的状态,如果已支付无需处理,如果未支付则回退库存");
}
}
控制层代码
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping(value = "/place/{orderId}",method = RequestMethod.GET)
public String placeOrder(@PathVariable String orderId)
{
//模拟service层处理
System.out.println("处理订单数据...");
//将订单id发送到订单队列
rabbitTemplate.convertAndSend("order_exchange","order_routing",orderId);
return "下单成功,修改库存";
}
}