在上一篇文章中,笔者整理了从消息生产出来到消费结束的整个生命周期过程中,为了确保消息能够可靠到达或者消费,我们需要在哪些环节进行哪些处理,同时也展示了使用Java原生代码怎么样在这些环节进行处理。本文主要介绍使用spring boot集成RabbitMQ的方式时,针对这些环节应该进行怎样的处理。
一、创建Exchange、Queue和Binding
首先,需要创建待测试的交换机、队列和绑定。相对于原生代码,spring boot对ConnectionFactory、Channel这些对象的创建和销毁进行了封装,使得我们不再需要手动创建Connection或者Channel,也不需要手动进行释放。这样做的一个好处就是:我们不必再关心这些系统资源的生命周期,从而简化了开发,而且避免了因忘记释放资源造成的内存泄露。RabbitMQ的Exchange、Queue和Binding这些组件的创建,只需要创建相应的Bean即可,注入到IOC中。
比如,笔者通过一个RabbitConfig的自动配置类,对这些Bean进行了注入:
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue", true, false, false);
}
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange", true, false);
}
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectQueue");
}
}
第3~10行:是RabbitMQ连接参数的配置
第11~19行:根据配置的连接参数,创建链接工厂ConnectionFactory ,这里我们只需要创建ConnectionFactory ,spring boot会自动创建和管理Connection以及Channel。
第20~24行:创建RabbitTemplate对象。RabbitTemplate是采用模板方法模式进行消息发送的一个模板,后面我们在发送消息时就是使用RabbitTemplate的相应的方法。实际上是在RabbitTemplate内部对Connection和Channel的创建进行了封装,而且只有我们在使用其第一次发送消息时,才会真正在RabbitMQ的broker上创建声明好的Exchange、Queue、Binding等组件。
第25~28行:创建名称为TestDirectQueue的队列
第29~32行:创建名称为TestDirectExchange的直连交换机
第33~36行:创建绑定,将上面创建的交换机和队列绑定在一起,路由键为队列的名称。
二、生产者端处理连接异常
在前面我们已经提到,spring boot集成RabbitMQ之后,使用RabbitTemplate对象进行消息的发送,所以生产者端的异常处理需要在调用RabbitTemplate对象发送消息的代码上。我们在处理Java原生代码调用RabbitMQ要处理的异常主要是:
- IOException —— 客户端连不上broker的情况,抛出的异常。
- TimeoutException —— 客户端连接broker超时抛出的异常。
- ShutdownSignalException —— broker的交换机不存在时出现的异常。
但是在spring boot的框架中,引入了AmqpConnectException异常,实际上一旦发生MQ掉线或者超时的情况,AmqpConnectException异常取代了IOException 和TimeoutException。所以完整的异常处理的代码如下:
try {
Message message = new Message(str.getBytes());
rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
User user = new User("张三", 18);
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
} catch (ShutdownSignalException e) {
System.out.println("交换机故障或者不存在。");
e.printStackTrace();
}catch (AmqpConnectException e) {
System.out.println("服务器连接失败。");
e.printStackTrace();
}
第3行和第5行:分别调用rabbitTemplate的send和convertAndSend来发送消息,两者的不同在于前者发送的参数为Message对象,而该对象中封装了发送的消息的字节数组;后者发送的是一个自定义的类对象,但是要注意类需要实现Serializable接口。
第6~8行:对ShutdownSignalException 类型异常的处理,当交换机不存在时会产生此类异常。
第9~11行:对AmqpConnectException类型的异常的处理,当客户端连接不上Broker时会抛出此类的异常。
三、生产者端手动确认消息
在生产者端开启手动确认消息,可以保证由于RabbitMQ自身的原因导致消息路由到队列失败时,我们可以进行手动的消息重发。在java原生代码调用RabbitMQ时需要手动开启通道的confirm模式,而且提供了3种接受broker返回的ack的方法。在spring-boot-starter-amqp中,进一步对这个逻辑进行了封装,只保留了使用回调方法的方式。
下面,先创建一个回调的类:
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("MyConfirmCallback:ack=" + ack);
}
}
可以看到,回调类实现了ConfirmCallback 接口,接口中有一个回调方法confirm,confirm具有三个参数:
- correlationData —— 发送消息时携带的附加数据,send方法和convertAndSend方法都具有携带这个参数的重载类型:
void send(String routingKey, Message message, CorrelationData correlationData);
void convertAndSend(String routingKey, Object object, CorrelationData correlationData);
- ack —— 消息是否成功发送到exchange
- cause —— 消息发送失败的原因
有了上述回调类的实现,我们就可以在发送时指定回调类,比如上面的发送消息代码就可以改成这样:
try {
Message message = new Message(str.getBytes());
rabbitTemplate.setConfirmCallback(new MyConfirmCallback());
rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
User user = new User("张三", 18);
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
} catch (ShutdownSignalException e) {
System.out.println("交换机故障或者不存在。");
e.printStackTrace();
}catch (AmqpConnectException e) {
System.out.println("服务器连接失败。");
e.printStackTrace();
}
在第3行中,我们为rabbitTemplate指定了回调的类对象,这里需要特别注意:
(1)一定要在发送消息之前,也就是在调用send方法或者convertAndSend方法之前调用setConfirmCallback指定回调对象
(2)只能为rabbitTemplate指定一次回调对象,否则会抛出异常
四、生产者端处理Return的消息
在上一篇文章中,我们特别指出了一种场景:消息从交换机路由到队列中时,我们可以选择当队列缺失时,将消息返回给生产者。在spring-boot-starter-amqp中,对这种应用也进行了封装,与上一小节中的手动确认消息类似,我们需要先创建一个接受Return消息的回调类:
public class MyReturnCallback implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
String str = String.format("消息发送失败-消息回退,应答码:{},原因:{},交换机:{},路由键:{}",
returnedMessage.getReplyCode(),
returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey());
System.out.println(str);
}
}
回调类实现了ReturnsCallback 接口,接口中有一个回调方法returnedMessage,方法只有一个参数ReturnedMessage,先看下ReturnedMessage类型的定义:
public class ReturnedMessage {
private final Message message;
private final int replyCode;
private final String replyText;
private final String exchange;
private final String routingKey;
}
- message —— 细心的朋友可能已经观察到,这个参数的类型实际上就是rabbitTemplate.send方法中封装的消息的类型,当消息没有路由到队列被return时被原样返回。
- replyCode —— 返回时的应答码
- replyText —— 返回的原因
- exchange —— 交换机名称
- routingKey —— 路由键,如果在直连模式下就是队列名称
在实现了回调类之后,就可以进行回调类对象的装配了:
try {
Message message = new Message(str.getBytes());
rabbitTemplate.setConfirmCallback(new MyConfirmCallback());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new MyReturnCallback());
rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
User user = new User("张三", 18);
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
} catch (ShutdownSignalException e) {
System.out.println("交换机故障或者不存在。");
e.printStackTrace();
}catch (AmqpConnectException e) {
System.out.println("服务器连接失败。");
e.printStackTrace();
}
注意第4行和第5行是我们新增的代码,对于设置手动处理broker return的消息来说,第4行代码必不可少,意思是开启手动处理回退消息。对于Return消息的回调类的使用也需要注意:
(1)一定要在发送消息之前,也就是在调用send方法或者convertAndSend方法之前调用setReturnsCallback指定回调对象(注意是使用setReturnsCallback,而非setReturnCallback,后者已经被标注过时)
(2)只能为rabbitTemplate指定一次回调对象,否则会抛出异常
(3)在使用setReturnsCallback之前要使用rabbitTemplate.setMandatory(true)。
如果不出什么意外,当消息被return时,程序会打印类似以下的信息(红色圈出部分):
五、消费者端手动ACK
在前面我们梳理了RabbitMQ生产者端一些消息的可靠性传输的保证机制,下面再学习下消费者端怎么进行手动ack。消费者端的手动ack,首先要开启消费者端支持手动ack(默认是自动ack),开启的方式有两种:
- 第一种,使用配置文件开启手动ack:
spring:
#配置rabbitMq 服务器
rabbitmq:
listener:
type: simple
simple:
#simple关闭自动ack,手动ack
acknowledge-mode: manual
第8行,修改消费者确认消息的模式为手动
- 第二种,自定义containerFactory,开启手动确认模式:
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
注意第6行,开启了手动确认模式。
在完成了上述任意一种设置之后,然后配置监听器,并手动进行确认:
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "TestDirectQueue", durable = "true"),
exchange = @Exchange(name = "TestDirectQueue", durable = "true", type = "direct")))
public void consumer(@Payload Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("接受到的消息是:" + message.toString());
channel.basicAck(deliveryTag,false);
System.out.print("这里是接收者1答应消息: ");
System.out.println("SYS_TOPIC_ORDER_CALCULATE_ZZ_FEE process1 : " + message);
}
第1行 —— 使用RabbitListener注解指定监听器,同时装配自定义的containerFactory(如果使用配置文件的方式手动确认,无需装配containerFactory),同时指定的还有exchange、queue和binding等等。
第3行 —— consumer包含了三个参数:message是传递过来的消息,deliveryTag是为每个消息指定的自增长的id(详细在上一篇文章中已经解释过),channel传递信息的信道。
第5行 —— 通过调用channel的basicAck进行消息的确认,参数的解释可以参照笔者上一篇文章。
至此,整个消费者的手动确认方法也介绍完毕。
六、总结
下面,针对本文的内容进行总结:
(1)spring-boot-starter-amqp中,针对生产者端或者消费者端连接不到broker的IOException异常和TimeoutException异常,重新封装了新的异常类型AmqpConnectException异常。
(2)生产者发送消息时如果交换机不存在,会抛出ShutdownSignalException异常。
(3)生产者端可以手动确认发送的消息正常到达了交换机,方法是实现ReturnsCallback接口,然后使用rabbitTemplate.setConfirmCallback()方法进行装配。
(4)如果broker出现内部异常或者目标队列不存在时,可以设置消息返还给生产者,方法是设置rabbitTemplate.setMandatory(true),实现ReturnsCallback接口,然后使用rabbitTemplate.setReturnsCallback()进行装配。
(5)开启消费者手动ack有两种方法,一种是通过配置文件修改支持,另一种是创建自定义的SimpleRabbitListenerContainerFactory并注入到IOC中,在监听方法中调用channel.basicAck()。
七、附yml文件中常见的RabbitMQ相关配置:
spring:
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#虚拟host 可以不设置,使用server默认host
#virtual-host: JCcccHost
publisher-confirm-type: correlated
#发布者到达确认
publisher-returns: true
listener:
type: simple
simple:
#simple关闭自动ack,手动ack
acknowledge-mode: manual
retry:
### 开启重试机制(调用监听方法失败时会重试,不是从队列中重复拿消息)
enabled: true
#最大重试传递次数
max-attempts: 3
#第一次和第二次尝试传递消息的间隔时间 单位毫秒
initial-interval: 5000ms
#最大重试时间间隔,单位毫秒
max-interval: 300000ms
#应用前一次重试间隔的乘法器,multiplier默认为1
multiplier: 3
#以上配置的间隔0s 5s 15s 45s
#重试次数超过上面的设置之后是否丢弃(消费者listener抛出异常,是否重回队列,默认true:重回队列, false为不重回队列(结合死信交换机))
default-requeue-rejected: true
### 模板配置
##设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
template:
mandatory: true