一、什么是 RabbitMQ

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包。

二、如何与 Spring 集成

1. 我们都需要哪些 Jar 包?

抛开单独使用 Spring 的包不说,引入 RabbitMQ 我们还需要两个:

<!-- RabbitMQ -->
<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>3.5.1</version>
</dependency>
<dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit</artifactId>
 <version>1.4.5.RELEASE</version>
</dependency>

2. 使用外部参数文件 application.properties:

mq.host=127.0.0.1
mq.username=queue
mq.password=1234
mq.port=8001
# 统一XML配置中易变部分的命名
mq.queue=test_mq

易变指的是在实际项目中,如果测试与生产环境使用的同一个 RabbitMQ 服务器。那我们在部署时直接修改 properties 文件的参数即可,防止测试与生产环境混淆。

 修改 applicationContext.xml 文件,引入我们创建的 properties 文件

<context:property-placeholder location="classpath:application.properties"/>
<util:properties id="appConfig" location="classpath:application.properties"></util:properties>

3. 连接 RabbitMQ 服务器

<!-- 连接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
  password="${mq.password}" port="${mq.port}" />

<rabbit:admin connection-factory="connectionFactory"/>

4. 声明一个 RabbitMQ Template

代码如下:

<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory"  />

5. 在 applicationContext.xml 中声明一个交换机,name 属性需配置到 RabbitMQ 服务器。

<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
 <rabbit:bindings>
  <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
 </rabbit:bindings>
</rabbit:topic-exchange>

交换机的四种模式:

代码如下:

<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />

  • durable:是否持久化
  • exclusive:仅创建者可以使用的私有队列,断开后自动删除
  • auto-delete:当所有消费端连接断开后,是否自动删除队列
  • 7. 创建生产者端

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * @Description: 消息队列发送者
     * @Author:
     * @CreateTime:
     */
    @Service
    public class Producer {
    
     @Autowired
     private AmqpTemplate amqpTemplate;
    
     public void sendQueue(String exchange_key, String queue_key, Object object) {
      // convertAndSend 将Java对象转换为消息发送至匹配key的交换机中Exchange
      amqpTemplate.convertAndSend(exchange_key, queue_key, object);
     }
    }
    
    

    8. 在 applicationContext.xml 中配置监听及消费者端 

    <!-- 消费者 -->
    <bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>
    
    <!-- 配置监听 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
      <!--
        queues 监听队列,多个用逗号分隔
        ref 监听器
      -->
      <rabbit:listener queues="test_queue" ref="rabbitmqService"/>
    </rabbit:listener-container>

    消费者 Java 代码:

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class RabbitmqService implements MessageListener {
     public void onMessage(Message message) {
      System.out.println("消息消费者 = " + message.toString());
     }
    }
    

    至此,我们的所有配置文件就写完了,最终如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:util="http://www.springframework.org/schema/util"
      xmlns:aop="http://www.springframework.org/schema/aop"
      xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:rabbit="http://www.springframework.org/schema/rabbit"
      xmlns:p="http://www.springframework.org/schema/p"
      xsi:schemaLocation="
      http://www.springframework.org/schema/context
      http://www.springframework.org/schema/context/spring-context-3.0.xsd
      http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
      http://www.springframework.org/schema/util
      http://www.springframework.org/schema/util/spring-util-3.0.xsd
      http://www.springframework.org/schema/aop
      http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
      http://www.springframework.org/schema/tx
      http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
      http://www.springframework.org/schema/rabbit
      http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    
    
    
     <!-- RabbitMQ start -->
    
     <!-- 连接配置 -->
     <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
      password="${mq.password}" port="${mq.port}" />
    
     <rabbit:admin connection-factory="connectionFactory"/>
    
     <!-- 消息队列客户端 -->
     <rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />
    
     <!-- queue 队列声明 -->
     <!--
      durable 是否持久化
      exclusive 仅创建者可以使用的私有队列,断开后自动删除
      auto-delete 当所有消费端连接断开后,是否自动删除队列 -->
     <rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
    
     <!-- 交换机定义 -->
     <!--
      交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。
      如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。
    
      direct模式:消息与一个特定的路由器完全匹配,才会转发
      topic模式:按规则转发消息,最灵活
      -->
     <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
      <rabbit:bindings>
       <!-- 设置消息Queue匹配的pattern (direct模式为key) -->
       <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
      </rabbit:bindings>
     </rabbit:topic-exchange>
    
     <bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>
    
     <!-- 配置监听 消费者 -->
     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
      <!--
       queues 监听队列,多个用逗号分隔
       ref 监听器 -->
      <rabbit:listener queues="test_queue" ref="rabbitmqService"/>
     </rabbit:listener-container>
    </beans>
    
    

    9. 如何使用 RabbitMQ 发送一个消息

      @Autowired
     private Producer producer;
     @Value("#{appConfig['mq.queue']}")
     private String queueId;
    
     /**
      * @Description: 消息队列
      * @Author:
      * @CreateTime:
      */
     @ResponseBody
     @RequestMapping("/sendQueue")
     public String testQueue() {
      try {
       Map<String, Object> map = new HashMap<String, Object>();
       map.put("data", "hello rabbitmq");
       producer.sendQueue(queueId + "_exchange", queueId + "_patt", map);
      } catch (Exception e) {
       e.printStackTrace();
      }
      return "发送完毕";
     }
    
    

    嗯。这个测试是 SpringMVC 框架。

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    02-04 23:47