前言

在使用rabbitmq时,我们可以通过消息持久化来解决服务器因异常崩溃而造成的消息丢失。除此之外,我们还会遇到一个问题,当消息生产者发消息发送出去后,消息到底有没有正确到达服务器呢?如果不进行特殊配置,默认情况下发送的消息是不会给生产者返回任何响应的,也就是默认情况下生产者并不知道消息是否正常到达了服务器。对于数据必达的需求,你肯定对消息的来龙去脉都有个了接,这种情况下就需要用到rabbitmq消息确认。

消息确认

rabbitmq消息确认分为生产者确认和消费者确认。

生产者消费确认提供了两种机制:

  • 通过事务机制实现
  • 通过confirm机制实现

事务机制则用到channel.txSelect、channel.txCommit、channel.txRollback。可以参考下面AMQP协议流转过程(参考Rabbitmq实战指南)

SpringBoot集成rabbitmq(二)-LMLPHP

事务机制在一条消息发送之后会阻塞发送端,以等待rabbitmq回应,之后才继续发送下一条消息。所以相对来说事务机制的性能要差一些。事务机制会降低rabbitmq的吞吐量,所以又引入了另一种轻量级的方式:confirm机制。

     生产者通过调用channel.confirmSelect将信道设置为confirm模式,之后Rabbitmq会返回Confirm.Select-Ok命令表示同意生产者将当前信道设置为confirm模式。所有被发送的后续消息都被ack或nack一次。类似如下代码:

channel.confirmSelect()

channel.basicPublish("exchange","routingkey",null,"test".getBytes())

confirm机制流转过程参考下图(参考Rabbitmq实战指南)

SpringBoot集成rabbitmq(二)-LMLPHP

消费者确认

消费者在订阅消息队列时指定autoAck参数。当参数设置为false时rabbitmq会等待消费者显式回复确认信号才会从内存或者磁盘种删除这条消息。参数默认为true。当autoAck设置为false时,对于rabbitmq服务器而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息、一部分是已经投递给消费者的消息但是还没有收到确认信号的消息。可通过RabbitMQ Web平台查看队列中Ready和UnAck对应的数量。

消费者消息确认涉及到3个方法:channel.basicAck、channel.basicNack、channel.basicReject

SpringBoot集成rabbitmq下实现消息确认

springboot集成rabbitmq实现消息确认主要涉及两个回调方法(ReturnCallback、ConfirmCallback)。这里消费者部分我用两种方式来实现。一种是基于SimpleMessageListenerContainer。 另一种就是用RabbitListener注解实现。

1、application.yml

spring:
rabbitmq:
host: 192.168.80.128
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
concurrency: 1
max-concurrency: 10
retry:
enabled: true

 

2、配置文件(这里实现ReturnCallback、ConfirmCallback)

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable; @Configuration
public class MqConfig { private Logger logger= LoggerFactory.getLogger(MqConfig.class); @Autowired
RabbitTemplate rabbitTemplate; @Autowired
ConnectionFactory connectionFactory; @Bean
public Queue queue(){
return new Queue("testMq",true); //持久化队列(默认值也是true)
} @Bean
public DirectExchange directExchange(){
return new DirectExchange("testMq",true,false);
} @Bean
Binding binding(Queue queue,DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("testMq");
} /**
* i->replyCode
* s->replyText
* s1->exchange
* s2->routingKey
* **/
//消息从交换器发送到队列失败时触发
RabbitTemplate.ReturnCallback msgReturnCallback=new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) { logger.info("消息:{},错误码:{},失败原因:{},交换器:{},路由key:{}",message.getMessageProperties().getCorrelationId(),i,s,s1,s2);
}
}; //消息发送到交换器时触发
RabbitTemplate.ConfirmCallback msgConfirmCallback=new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) {
if(b){
logger.info("消息{}发送exchange成功",correlationData.getId());
}else{
logger.info("消息发送到exchange失败,原因:{}",s);
}
}
}; /***
* 消费者确认(方式二)
* **/
@Bean
public SimpleMessageListenerContainer listenerContainer(){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("testMq");
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(10);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
logger.info("接收消息:{}",new String(message.getBody()));
}catch (Exception ex){ //channel.basicReject
//channel.basicNack } }
}); return container;
} /**
* 生产者的回调都在这里
* **/
@Autowired
public RabbitTemplate rabbitTemplate(){
//消息发送失败后返回到队列中
rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(msgReturnCallback);
rabbitTemplate.setConfirmCallback(msgConfirmCallback); return rabbitTemplate;
} }

 另一种消费端实现方式

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import java.io.IOException; @Component
public class MqConsumer { private Logger logger= LoggerFactory.getLogger(MqConsumer.class);
@RabbitListener(queues = "testMq")
public void handler(Message message,Channel channel){
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
logger.info("接收消息:{}",new String(message.getBody()));
} catch (IOException e) { e.printStackTrace();
}
}
}

3、消息生产者

消息发送时注意生成一个消息id。一开始没用到这个参数,在消息接收时消费者会抛空指针异常

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody; import java.util.UUID; @Controller
@RequestMapping("/rabbitMq")
public class MqController { private Logger logger= LoggerFactory.getLogger(MqController.class); @Autowired
RabbitTemplate rabbitTemplate; @RequestMapping("/sendMq")
@ResponseBody
public String sendMq(){ /**
* 这里exchange、routingkey都叫testMq
* **/
Object message=null;
for(int i=0;i<10;i++){
logger.info("生产者:第{}条消息",i);
CorrelationData correlationId=new CorrelationData(UUID.randomUUID().toString());
message="第"+i+"条消息";
rabbitTemplate.convertAndSend("testMq","testMq",message,correlationId);
} return "sending..."; } }

  SpringBoot集成rabbitmq(二)-LMLPHP

从运行截图中可以看到生产者和消费者都收到对应的回调消息。

05-01 05:11