本文记录学习在Spring Boot中使用MQ。

一 什么是MQ

  MQ全称(Message Queue)又名消息队列,是一种异步通讯的中间件。它的作用类似于邮局,发信人(生产者)只需要将信(消息)交给邮局,然后由邮局再将信(消息)发送给具体的接收者(消费者),具体发送过程与时间发信人可以不关注,也不会影响发信人做其它事情。目前常见的MQ有activemq、kafka、rabbitmq、zeromq、rocketmq等。

  使用MQ的优点主要有:

  1 方法的异步执行 使用MQ可以将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了由于同步而等待的时间;

  2 程序之间松耦合 使用MQ可以减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,只要约定好消息的内容格式就行;

  JMS(Java Message Service)即java消息服务,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS的消息机制有2种模型,一种是1对1(Point to Point)的队列的消息,这种消息,只能被一个消费者消费;另一种是一对多的发布/订阅(Topic)消息,一条消息可以被多个消费者消费。ActiveMq是对JMS的一个实现。

二 SpringBoot集成Active MQ

  官网下载一个服务程序,解压后直接启动服务就可以了,下载地址:http://activemq.apache.org/activemq-5158-release.html

  SpringBoot也对Active MQ提供了支持,我们使用时引入具体的依赖即可,修改pom.xml文件,添加依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

  在application.properties文件中配置Active MQ服务器的连接信息

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#消息模式 true:广播(Topic),false:队列(Queue),默认时false
#spring.jms.pub-sub-domain=true

  完成以上配置信息后,当我们在启动SpringBoot项目时,会自动帮我们完成初始化操作,并提供一个JmsMessagingTemplate,提提供了我们常用发送消息的各种方法供我们使用。我们只需要在使用的地方注入JmsMessagingTemplate即可使用。

  发送队列消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Test
    public void testQueueMsg(){
        //创建名称为zyQueue的队列
        Queue queue = new ActiveMQQueue("zyQueue");
        //向队列发送消息
        jmsMessagingTemplate.convertAndSend(queue,"这是一个队列消息!");
    }
}

  消息的接收方,监听消息队列,当队列中有消息时就可以获取到消息

@Component
public class Consumer {

    private static DateFormat df =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");

    /**
     * destination 目标地址即队列
     */
    @JmsListener(destination = "zyQueue")
    public void receiveMessage(String text){
        System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }
}

  执行测试方法发送消息可以看到,控制台输出的消费者接受到消息

SpringBoot入门 (九) MQ使用-LMLPHP

队列消息只能有一个消费者,如果有多个消费者同时监听一个队列时,只能有一个拿到消息,我们测试,修改发送方法,循环发送10调消息

@Test
    public void testQueueMsg(){
        //创建名称为zyQueue的队列
        Queue queue = new ActiveMQQueue("zyQueue");
        //向队列发送消息
        for (int i=0;i<10;i++) {
            jmsMessagingTemplate.convertAndSend(queue,"这是第"+i+"个队列消息!");
        }
    }

  在Consumer 类中再添加一个消费者,监听队列zyQueue

@JmsListener(destination = "zyQueue")
    public void receiveMessage(String text){
        System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

    @JmsListener(destination = "zyQueue")
    public void receiveMessage1(String text){
        System.out.println("1接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

  执行发送消息,看到控制台输出的结果,2个消费者平分了这10条消息

SpringBoot入门 (九) MQ使用-LMLPHP

  如果希望监听同一个队列的多个消费者都能接收到所有消息,我们就只能发送Topic消息了,我们修改application.properties中的

#消息模式 true:广播(Topic),false:队列(Queue),默认时false
spring.jms.pub-sub-domain=true

  表示要发送发布/订阅消息,发送消息的队列改用Topic发送消息,如下

@Test
    public void testTopicMsg(){
        Topic topic = new ActiveMQTopic("zyTopic");
        for (int i=0;i<5;i++){
            jmsMessagingTemplate.convertAndSend(topic,"这是第"+i+"个Topic消息!");
        }
    }

  我们在Consumer 类中添加两个消费者来监听zyTopic队列,接受消息

@JmsListener(destination = "zyTopic")
    public void receiveTopicMessage1(String text){
        System.out.println("消费者1接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

    @JmsListener(destination = "zyTopic")
    public void receiveTopicMessage2(String text){
        System.out.println("消费者2接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

  执行发消息方法,可以看到控制台输出的内容,2个消费者都完整的接收到了5条消息

 SpringBoot入门 (九) MQ使用-LMLPHP

   我们在测试发送消息时修改了属性文件中的配置信息,才可以发送对应的类型的消息,这是由于SpringBoot中默认的是队列消息(查看源码可以知道,监听器默认使用的DefaultJmsListenerContainerFactory),如果我们想在不修改配置信息的情况下可以同时发送Queue和Topic消息怎么办呢,我们需要手动的更改初始的配置类,分别针对Queue和Topic消息提供JmsListenerContainerFactory

  新建一个配置类,如下

@SpringBootConfiguration
public class ActiveMqConfig {

    @Bean("queueListenerFactory")
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //设置消息模型为队列
        factory.setPubSubDomain(false);
        return factory;
    }

    @Bean("topicListenerFactory")
    public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //设置消息模型为队列
        factory.setPubSubDomain(true);
        return factory;
    }
}

  在容器启动时会针对两种消息类型,初始化得到两个不同的JmsListenerContainerFactory。下来再修改消费者类,在 @JmsListener 注解中指定 containerFactory,如

@JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
    public void receiveMessage(String text){
        System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

@JmsListener(destination = "zyTopic", containerFactory = "topicListenerFactory")
    public void receiveTopicMessage1(String text){
        System.out.println("消费者1接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

  Queue消息使用 queueListenerFactory,Topic消息使用 topicListenerFactory,然后注释掉属性文件中的消息模式配置就可以了。

02-01 11:10