在上一篇文章中,笔者整理了从消息生产出来到消费结束的整个生命周期过程中,为了确保消息能够可靠到达或者消费,我们需要在哪些环节进行哪些处理,同时也展示了使用Java原生代码怎么样在这些环节进行处理。本文主要介绍使用spring boot集成RabbitMQ的方式时,针对这些环节应该进行怎样的处理。

一、创建Exchange、Queue和Binding

        首先,需要创建待测试的交换机、队列和绑定。相对于原生代码,spring boot对ConnectionFactory、Channel这些对象的创建和销毁进行了封装,使得我们不再需要手动创建Connection或者Channel,也不需要手动进行释放。这样做的一个好处就是:我们不必再关心这些系统资源的生命周期,从而简化了开发,而且避免了因忘记释放资源造成的内存泄露。RabbitMQ的Exchange、Queue和Binding这些组件的创建,只需要创建相应的Bean即可,注入到IOC中。

        比如,笔者通过一个RabbitConfig的自动配置类,对这些Bean进行了注入:

@Configuration
public class RabbitConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("TestDirectQueue", true, false, false);
    }
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange", true, false);
    }
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectQueue");
    }
}

第3~10行:是RabbitMQ连接参数的配置

第11~19行:根据配置的连接参数,创建链接工厂ConnectionFactory ,这里我们只需要创建ConnectionFactory ,spring boot会自动创建和管理Connection以及Channel。

第20~24行:创建RabbitTemplate对象。RabbitTemplate是采用模板方法模式进行消息发送的一个模板,后面我们在发送消息时就是使用RabbitTemplate的相应的方法。实际上是在RabbitTemplate内部对Connection和Channel的创建进行了封装,而且只有我们在使用其第一次发送消息时,才会真正在RabbitMQ的broker上创建声明好的Exchange、Queue、Binding等组件。

第25~28行:创建名称为TestDirectQueue的队列

第29~32行:创建名称为TestDirectExchange的直连交换机

第33~36行:创建绑定,将上面创建的交换机和队列绑定在一起,路由键为队列的名称。

二、生产者端处理连接异常

        在前面我们已经提到,spring boot集成RabbitMQ之后,使用RabbitTemplate对象进行消息的发送,所以生产者端的异常处理需要在调用RabbitTemplate对象发送消息的代码上。我们在处理Java原生代码调用RabbitMQ要处理的异常主要是:

  • IOException —— 客户端连不上broker的情况,抛出的异常。
  • TimeoutException —— 客户端连接broker超时抛出的异常。
  • ShutdownSignalException —— broker的交换机不存在时出现的异常。

        但是在spring boot的框架中,引入了AmqpConnectException异常,实际上一旦发生MQ掉线或者超时的情况,AmqpConnectException异常取代了IOException 和TimeoutException。所以完整的异常处理的代码如下:

try {
    Message message = new Message(str.getBytes());
    rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
    User user = new User("张三", 18);
    rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
}  catch (ShutdownSignalException e) {
    System.out.println("交换机故障或者不存在。");
    e.printStackTrace();
}catch (AmqpConnectException e) {
    System.out.println("服务器连接失败。");
    e.printStackTrace();
}

第3行和第5行:分别调用rabbitTemplate的send和convertAndSend来发送消息,两者的不同在于前者发送的参数为Message对象,而该对象中封装了发送的消息的字节数组;后者发送的是一个自定义的类对象,但是要注意类需要实现Serializable接口。

第6~8行:对ShutdownSignalException 类型异常的处理,当交换机不存在时会产生此类异常。

第9~11行:对AmqpConnectException类型的异常的处理,当客户端连接不上Broker时会抛出此类的异常。

三、生产者端手动确认消息

        在生产者端开启手动确认消息,可以保证由于RabbitMQ自身的原因导致消息路由到队列失败时,我们可以进行手动的消息重发。在java原生代码调用RabbitMQ时需要手动开启通道的confirm模式,而且提供了3种接受broker返回的ack的方法。在spring-boot-starter-amqp中,进一步对这个逻辑进行了封装,只保留了使用回调方法的方式。

        下面,先创建一个回调的类:

public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("MyConfirmCallback:ack=" + ack);
    }
}

        可以看到,回调类实现了ConfirmCallback 接口,接口中有一个回调方法confirm,confirm具有三个参数:

  • correlationData —— 发送消息时携带的附加数据,send方法和convertAndSend方法都具有携带这个参数的重载类型:
void send(String routingKey, Message message, CorrelationData correlationData);
void convertAndSend(String routingKey, Object object, CorrelationData correlationData);
  • ack —— 消息是否成功发送到exchange
  • cause —— 消息发送失败的原因

        有了上述回调类的实现,我们就可以在发送时指定回调类,比如上面的发送消息代码就可以改成这样:

try {
    Message message = new Message(str.getBytes());
    rabbitTemplate.setConfirmCallback(new MyConfirmCallback());
    rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
    User user = new User("张三", 18);
    rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
}  catch (ShutdownSignalException e) {
    System.out.println("交换机故障或者不存在。");
    e.printStackTrace();
}catch (AmqpConnectException e) {
    System.out.println("服务器连接失败。");
    e.printStackTrace();
}

        在第3行中,我们为rabbitTemplate指定了回调的类对象,这里需要特别注意:

(1)一定要在发送消息之前,也就是在调用send方法或者convertAndSend方法之前调用setConfirmCallback指定回调对象

(2)只能为rabbitTemplate指定一次回调对象,否则会抛出异常

四、生产者端处理Return的消息

        在上一篇文章中,我们特别指出了一种场景:消息从交换机路由到队列中时,我们可以选择当队列缺失时,将消息返回给生产者。在spring-boot-starter-amqp中,对这种应用也进行了封装,与上一小节中的手动确认消息类似,我们需要先创建一个接受Return消息的回调类:

public class MyReturnCallback implements RabbitTemplate.ReturnsCallback {

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        String str = String.format("消息发送失败-消息回退,应答码:{},原因:{},交换机:{},路由键:{}",
                returnedMessage.getReplyCode(),
                returnedMessage.getReplyText(),
                returnedMessage.getExchange(),
                returnedMessage.getRoutingKey());
        System.out.println(str);
    }
}

        回调类实现了ReturnsCallback 接口,接口中有一个回调方法returnedMessage,方法只有一个参数ReturnedMessage,先看下ReturnedMessage类型的定义:

public class ReturnedMessage {
    private final Message message;
    private final int replyCode;
    private final String replyText;
    private final String exchange;
    private final String routingKey;
}
  • message —— 细心的朋友可能已经观察到,这个参数的类型实际上就是rabbitTemplate.send方法中封装的消息的类型,当消息没有路由到队列被return时被原样返回。
  • replyCode —— 返回时的应答码
  • replyText —— 返回的原因
  • exchange —— 交换机名称
  • routingKey —— 路由键,如果在直连模式下就是队列名称

        在实现了回调类之后,就可以进行回调类对象的装配了:

try {
    Message message = new Message(str.getBytes());
    rabbitTemplate.setConfirmCallback(new MyConfirmCallback());
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnsCallback(new MyReturnCallback());
    rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
    User user = new User("张三", 18);
    rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
}  catch (ShutdownSignalException e) {
    System.out.println("交换机故障或者不存在。");
    e.printStackTrace();
}catch (AmqpConnectException e) {
    System.out.println("服务器连接失败。");
    e.printStackTrace();
}

        注意第4行和第5行是我们新增的代码,对于设置手动处理broker return的消息来说,第4行代码必不可少,意思是开启手动处理回退消息。对于Return消息的回调类的使用也需要注意:

(1)一定要在发送消息之前,也就是在调用send方法或者convertAndSend方法之前调用setReturnsCallback指定回调对象(注意是使用setReturnsCallback,而非setReturnCallback,后者已经被标注过时)

(2)只能为rabbitTemplate指定一次回调对象,否则会抛出异常

(3)在使用setReturnsCallback之前要使用rabbitTemplate.setMandatory(true)。

        如果不出什么意外,当消息被return时,程序会打印类似以下的信息(红色圈出部分):

RabbitMQ消息队列实战(4)—— spring-boot-starter-amqp中消息的可靠性传输和确认机制-LMLPHP

五、消费者端手动ACK

        在前面我们梳理了RabbitMQ生产者端一些消息的可靠性传输的保证机制,下面再学习下消费者端怎么进行手动ack。消费者端的手动ack,首先要开启消费者端支持手动ack(默认是自动ack),开启的方式有两种:

  • 第一种,使用配置文件开启手动ack:
spring:
  #配置rabbitMq 服务器
  rabbitmq:
    listener:
      type: simple
      simple:
        #simple关闭自动ack,手动ack
        acknowledge-mode: manual

第8行,修改消费者确认消息的模式为手动

  • 第二种,自定义containerFactory,开启手动确认模式:
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

        注意第6行,开启了手动确认模式。

        在完成了上述任意一种设置之后,然后配置监听器,并手动进行确认:

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "TestDirectQueue", durable = "true"),
        exchange = @Exchange(name = "TestDirectQueue", durable = "true", type = "direct")))
public void consumer(@Payload Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
    System.out.println("接受到的消息是:" + message.toString());
    channel.basicAck(deliveryTag,false);
    System.out.print("这里是接收者1答应消息: ");
    System.out.println("SYS_TOPIC_ORDER_CALCULATE_ZZ_FEE process1  : " + message);
}

第1行 —— 使用RabbitListener注解指定监听器,同时装配自定义的containerFactory(如果使用配置文件的方式手动确认,无需装配containerFactory),同时指定的还有exchange、queue和binding等等。

第3行 —— consumer包含了三个参数:message是传递过来的消息,deliveryTag是为每个消息指定的自增长的id(详细在上一篇文章中已经解释过),channel传递信息的信道。

第5行 —— 通过调用channel的basicAck进行消息的确认,参数的解释可以参照笔者上一篇文章。

        至此,整个消费者的手动确认方法也介绍完毕。

六、总结

        下面,针对本文的内容进行总结:

(1)spring-boot-starter-amqp中,针对生产者端或者消费者端连接不到broker的IOException异常和TimeoutException异常,重新封装了新的异常类型AmqpConnectException异常。

(2)生产者发送消息时如果交换机不存在,会抛出ShutdownSignalException异常。

(3)生产者端可以手动确认发送的消息正常到达了交换机,方法是实现ReturnsCallback接口,然后使用rabbitTemplate.setConfirmCallback()方法进行装配。

(4)如果broker出现内部异常或者目标队列不存在时,可以设置消息返还给生产者,方法是设置rabbitTemplate.setMandatory(true),实现ReturnsCallback接口,然后使用rabbitTemplate.setReturnsCallback()进行装配。

(5)开启消费者手动ack有两种方法,一种是通过配置文件修改支持,另一种是创建自定义的SimpleRabbitListenerContainerFactory并注入到IOC中,在监听方法中调用channel.basicAck()。

七、附yml文件中常见的RabbitMQ相关配置:

spring:
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #虚拟host 可以不设置,使用server默认host
    #virtual-host: JCcccHost
    publisher-confirm-type: correlated
    #发布者到达确认
    publisher-returns: true
    listener:
      type: simple
      simple:
        #simple关闭自动ack,手动ack
        acknowledge-mode: manual
        retry:
          ### 开启重试机制(调用监听方法失败时会重试,不是从队列中重复拿消息)
          enabled: true
          #最大重试传递次数
          max-attempts: 3
          #第一次和第二次尝试传递消息的间隔时间 单位毫秒
          initial-interval: 5000ms
          #最大重试时间间隔,单位毫秒
          max-interval: 300000ms
          #应用前一次重试间隔的乘法器,multiplier默认为1
          multiplier: 3
          #以上配置的间隔0s  5s  15s  45s
        #重试次数超过上面的设置之后是否丢弃(消费者listener抛出异常,是否重回队列,默认true:重回队列, false为不重回队列(结合死信交换机))
        default-requeue-rejected: true
    ### 模板配置
    ##设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
    template:
      mandatory: true
04-13 04:28