前言
很久之前的笔记,整理下
1.新建工程,添加依赖
compile('org.springframework.boot:spring-boot-starter-amqp')
2.配置application.yml
配置服务端
spring:
application:
name: rabbit-test
rabbitmq:
host: 10.2.6.33
port: 5672
username: gaojinliang
password: 【自己对应的密码】
3.RabbitMq的基本概念
内部实际上也是AMQP中的基本概念,AMQP中增加了Exchange和Binding的角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而
Binding决定交换器的消息应该发送到哪个队列。
Exchange 类型
Exchange分发消息的时候根据类型的不同分发策略有区别,目前有四种类型,direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header
而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
3.1direct
消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发routing key标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。他是完全匹配,单播的模式。
以下是direct的场景,也是最简单,最常用的模式。
MQ配置
@Configuration
public class RabbitQueueConfig {
@Bean
public Queue queue1() {
return new Queue("queue1", true); //持久化
}
}
发送者
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("/queue")
public String sendQueueMsg(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend("queue1", msg);
return "success";
}
接收者
@Component
public class QueueReceiver {
@RabbitListener(queues = "queue1")
@RabbitHandler
public void process(String strs) {
System.out.println("接收到:" + strs);
}
}
3.2fanout
每个发到fanout类型交换器的消息都会分到所有绑定的队列上。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被
转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。
以下是fanout场景,单个微服务集群的情况下每个节点都绑定到一个direct交换机上 (例子中是服务端口区分队列,也可以是别的key),然后每个节点都能收到
消息做一些配置更新等操作。
MQ配置
@Configuration
public class RabbitFanoutConfig {
@Value("${server.port}")
private String port;
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange ("fanoutExchange");
}
@Bean
public Queue fanoutTp() {
return new Queue("fanout.tp."+port);
}
@Bean
public Binding fanoutBinding() {
return BindingBuilder.bind(fanoutTp()).to(fanoutExchange());
}
}
发送者
@GetMapping("/fanout")
public String sendFanoutMsg(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend("fanoutExchange","", msg);
return "success";
}
接收者
@Component
public class FanoutRecerver {
@RabbitListener(queues = "fanout.tp.${server.port}")
@RabbitHandler
public void process(String strs) {
System.out.println("fanout.tp接收到:" + strs);
}
}
3.3topic
topic交换器通过模式匹配分配消息的路由键属性,将路由键和单个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定建的字符串切分成单词,这些单词
之间用点隔开。他同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
以下是topic场景,单个微服务接收到同一个消息后多个功能协调工作,也适用按分类更新、同一问题需要特定人员知晓等场景
MQ配置:
@Configuration
public class RabbitTopicConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange ("topicExchange");
}
@Bean
public Queue topicTp1() {
return new Queue("topic.tp2.one");
}
@Bean
public Queue topicTp2() {
return new Queue("topic.tp2.two");
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(topicTp1()).to(topicExchange()).with("topic.tp2.*"); //* 后面匹配单个key
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(topicTp2()).to(topicExchange()).with("topic.tp2.#"); //# 后面匹配多个key
}
}
发送者
@GetMapping("/topic")
public String sendTopicMsg(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend("topicExchange","topic.tp2.all", msg);
return "success";
}
接收者
@Component
public class TopicRecerver {
@RabbitListener(queues = "topic.tp2.one")
@RabbitHandler
public void process(String strs) {
System.out.println("topic.tp2.one 接收到:" + strs);
}
@RabbitListener(queues = "topic.tp2.two")
@RabbitHandler
public void process2(String strs) {
System.out.println("topic.tp2.two 接收到:" + strs);
}
}
4.接收端手动确认
上面提到的内容都是消息自动确认,当我们程序中收到消息处理失败需要MQ重发时就需要用到手动确认。
application.yml 配置 spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring:
application:
name: rabbit-test
rabbitmq:
host: 10.2.6.33
port: 5672
username: liuyanhui
password: liuyanhui
listener:
simple:
acknowledge-mode: MANUAL
在监听一端
@Component
public class QueueReceiver {
@RabbitListener(queues = "queue2")
@RabbitHandler
public void process(String strs, Message message, Channel channel) throws IOException {
System.out.println("接收到:" + strs);
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认成功
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //确认失败
}
}
channel.basicAck 确认成功
channel.basicNack 确认失败 : 最后一个参数代表是否需要重发,需要根据实际执行情况做选择,否则可能陷入死循环。