RabbitMQ

什么是MQ?

Message Queue:消息队列(消息中间件),典型的生产者—消费者模型。生产者不断地往消息队列中生产消息,消费者不断从队列中获取消息。消息的生产和消费是异步的,分别只关心消息的发送和接收,没有业务逻辑的入侵,可以轻松实现系统间解耦。

RabbitMQ学习-LMLPHP

对比其他MQ

  • RocketMQ —— 阿里开源的消息中间件,基于Java语言开发,具有高吞吐、高可用的特性,适合大规模分布式系统应用的特点。

  • Kafka —— 分布式发布-订阅消息系统,基于Pull的模式处理消息消费,追求高吞吐量、低延迟,最初开发用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。

  • RabbitMQ —— 基于Erlang语言开发的开源消息队列系统,基于AMQP协议实现,追求高可靠性

AMQP协议

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息网络协议。AMQP协议模型如下图所示:

RabbitMQ学习-LMLPHP

  • 消息发布者(publisher)发布消息,经由交换机(Exchange)
  • 交换机根据不同的路由规则将收到的消息分发给与该交换机绑定(Binding)的队列(Queue)
  • AMQP代理将消息投递给订阅(Subscribe)了该队列的消费者(Consumer),或者消费者按照需求自行获取

深入理解

(1)发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者、消费者、消息代理可以分别存在于不同的设备上。

(2)发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

(3)从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(ACK)后,才将其完全从队列中删除。

(4)在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

RabbitMQ安装

通过docker部署

docker run -dit --name myRabbitMQ -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root -p 15672:15672 -p 5672:5672 rabbitmq:management

熟悉RabbitMQ管理界面

RabbitMQ学习-LMLPHP

三种常见的交换机类型

RabbitMQ学习-LMLPHP

RabiitMQ常用工作模式

RabbitmqUtils.java —— 连接管理工具类

package com.youzikeji.rabbitmq.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ连接开启、关闭工具类
 */
public class RabbitmqUtils {

    private static ConnectionFactory cf;

    // 类一加载就执行,且只执行一次,避免每次获取连接都去创建连接工厂
    static {
        cf = new ConnectionFactory();
        cf.setHost("127.0.0.1");
        cf.setPort(5672);
        cf.setUsername("root");
        cf.setPassword("root");
        cf.setVirtualHost("/test01");
    }
    /**
     * 定义提供RabbitMQ连接的工具方法
     * @return RabbitMQ连接
     */
    public static Connection getConnection() {
        try {
            return cf.newConnection();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭通道和连接
     * @param channel 通道
     * @param connection 连接
     */
    public static void closeChannelAndConnection(Channel channel, Connection connection) {
        try {
            if (channel != null && channel.isOpen()){
                channel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

简单模式

一个生产者,一个消费者,不涉及交换机

RabbitMQ学习-LMLPHP

Producer.java

package com.youzikeji.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException {

        // 1. 创建connection
        Connection conn = RabbitmqUtils.getConnection();

        // 2. 通过connection获取channel
        assert conn != null;
        Channel channel = conn.createChannel();


        //3. 声明消息队列
        String queueName = "test01-simple-durable";
        /**
         * @param1: 消息队列名
         * @param2: 是否将队列持久化
         * @param3: 队列是否独占,能否绑定其他channel或连接
         * @param4: 是否自动删除队列,为true则表示消费完自动删除队列
         * @param5: 其他属性
         */
        channel.queueDeclare(queueName, true, false, true, null);

        // 4. 准备消息内容
        String msg = "hello, rabbitmq!!!";

        /**
         * @param1: 交换机
         * @param2: 路由键,也即绑定的消息队列名
         * @param3: 其他参数,例如将队列中的消息进行持久化(MessageProperties.PERSISTENT_TEXT_PLAIN)
         * @param4: 消息实体
         */
        // 5. 发送消息给队列
        channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));

        System.out.println("消息发送成功");

        // 6. 依次关闭通道和连接
        RabbitmqUtils.closeChannelAndConnection(channel, conn);
    }
}

Consumer.java

package com.youzikeji.rabbitmq.simple;

import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException {

        // 1. 创建connection
        Connection conn = RabbitmqUtils.getConnection();

        // 2. 通过connection获取channel
        assert conn != null;
        Channel channel = conn.createChannel();

        // 3. 绑定消息队列
        channel.queueDeclare("test01-simple-durable", true, false, true, null);

        // 3. 从消息队列中拿消息
        channel.basicConsume("test01-simple-durable", true, new DeliverCallback() {
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println("收到消息是" + new String(delivery.getBody(), StandardCharsets.UTF_8));
            }
        }, new CancelCallback() {
            public void handle(String s) throws IOException {
                System.out.println("消息接收失败");
            }
        });
        // 阻塞监听消息队列
        System.out.println("开始接收消息");
        System.in.read();

        // 4. 依次关闭通道和连接
        RabbitmqUtils.closeChannelAndConnection(channel, conn);

    }
}

工作队列模式

多个消费者消费一个生产者,一条消息只能被消费一次,也不涉及交换机

RabbitMQ学习-LMLPHP

Producer.java

package com.youzikeji.rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * work queue工作模式下,消息队列中的消息默认平均分配给订阅的消费者
 */
public class Producer {
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();

        String queueName = "work-durable";
        channel.queueDeclare(queueName, true, false, false, null);

        // 发布100条消息
        for (int i = 0; i < 100; i++) {
            String msg = "hello, work queue msg(" + i + ")";
            channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
        }

        RabbitmqUtils.closeChannelAndConnection(channel, connection);
    }
}

Consumer1.java

package com.youzikeji.rabbitmq.work;

import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Consumer1 {
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();

        String queueName = "work-durable";
        channel.queueDeclare(queueName, true, false, false, null);

        channel.basicConsume(queueName, true, new DeliverCallback() {
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println("收到消息是" + new String(delivery.getBody(), StandardCharsets.UTF_8));
            }
        }, new CancelCallback() {
            public void handle(String s) throws IOException {
                System.out.println("消息接收失败");
            }
        });

        System.out.println("开始接收消息");
        System.in.read();

        RabbitmqUtils.closeChannelAndConnection(channel, connection);
    }
}

其他消费者类同。

平均分配结果 —— 消费者1消费了一半消息,消费者2消费了另一半

RabbitMQ学习-LMLPHP

发布订阅模式(fanout)

又称广播模式,可以有多个消费者,每个消费者都有自己订阅的临时消息队列。其消息发送流程如下:

  • 消息发布者发布消息到fanout交换机
  • 交换机把消息发送给绑定在交换机上的所有临时队列
  • 临时消息队列的订阅者都能获取到消息

RabbitMQ学习-LMLPHP

Publisher.java

package com.youzikeji.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Publisher {
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();

        // 为channel声明交换机(名称 + 类型)
        channel.exchangeDeclare("logs", "fanout");

        // 发送消息
        channel.basicPublish("logs", "", null, "fanout type msg".getBytes(StandardCharsets.UTF_8));

        // 关闭通道及连接
        RabbitmqUtils.closeChannelAndConnection(channel, connection);
    }
}

Subscriber1.java

package com.youzikeji.rabbitmq.fanout;

import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;

public class Subscriber1 {
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();

        String tempQueue = channel.queueDeclare().getQueue();

        // 将临时队列绑定到fanout交换机(路由key为空)
        channel.queueBind(tempQueue, EXCHANGE_NAME, "");

        // 接收消息
        channel.basicConsume(tempQueue, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("订阅者1: " + new String(body));
            }
        });

    }
}

其他消息订阅者类同。

路由模式(direct)

直连模式,基于路由Key的工作模式。在发布-订阅模式中,发布的消息会被所有订阅了绑定在交换机上的临时消息队列的订阅者接收到。但是在某些场景下,希望不同的消息被不同的队列消费,这时就需要路由模式。

RabbitMQ学习-LMLPHP

  • 队列和交换机不再任意绑定,而是需要指定一个RoutingKey
  • 消息发送方在向交换机发送消息时,必须指定消息的RoutingKey
  • 交换机不再把消息交给每个与其绑定的队列,而是根据消息的RoutingKey进行定向的消息发送

Producer.java

package com.youzikeji.rabbitmq.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Producer {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs_direct", "direct");

        // 指定发送消息的RoutingKey
        String routingKey = "error";

        channel.basicPublish("logs_direct", routingKey, null, ("基于" + routingKey + "发送的消息").getBytes(StandardCharsets.UTF_8));

        RabbitmqUtils.closeChannelAndConnection(channel, connection);
    }
}

Consumer1.java —— 只绑定了一个RoutingKey(info)

package com.youzikeji.rabbitmq.direct;

import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "logs_direct";
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();

        // 声明交换机名称和类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 获取临时消息队列
        String tempQueue = channel.queueDeclare().getQueue();

        // 绑定交换机和消息队列的同时指定routingKey
        channel.queueBind(tempQueue, EXCHANGE_NAME, "info");

        // 接收消息
        channel.basicConsume(tempQueue, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
            }
        });

    }
}

Consumer2.java —— 绑定了多个RoutingKey(info、warning、error)

package com.youzikeji.rabbitmq.direct;

import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "logs_direct";
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();

        // 声明交换机名称和类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 获取临时消息队列
        String tempQueue = channel.queueDeclare().getQueue();

        // 绑定交换机和消息队列的同时指定routingKey,可以同时指定多个
        channel.queueBind(tempQueue, EXCHANGE_NAME, "error");
        channel.queueBind(tempQueue, EXCHANGE_NAME, "info");
        channel.queueBind(tempQueue, EXCHANGE_NAME, "warning");

        // 接收消息
        channel.basicConsume(tempQueue, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
            }
        });

    }
}

上述生产者生产的信息带有RoutingKey(error),只能由消费者2接受并消费。

主题模式

动态路由(direct),更加灵活地匹配RoutingKey(通过通配符的形式)。

RabbitMQ学习-LMLPHP

通配符

  • * —— 恰好匹配一个词
  • # —— 匹配一个或多个词
  • 例如lazy.#可以匹配lazy.irs或者lazy.irs.corporate,而lazy.*之能匹配lazy.irs

Producer.java

package com.youzikeji.rabbitmq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Producer {
    private static final String EXCHANGE_NAME = "topics";
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();


        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 指定RoutingKey
        String routingKey = "user.save.test";

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, ("基于" + routingKey +"发送的消息").getBytes(StandardCharsets.UTF_8));

        RabbitmqUtils.closeChannelAndConnection(channel, connection);

    }
}

Consumer1.java —— 使用"*"进行路由匹配

package com.youzikeji.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "topics";
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();

        // 声明交换机名称和类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 获取临时消息队列
        String tempQueue = channel.queueDeclare().getQueue();

        // 将交换机和临时消息队列进行绑定,使用通配符"*"进行路由匹配
        channel.queueBind(tempQueue, EXCHANGE_NAME, "user.*");

        // 接收消息
        channel.basicConsume(tempQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
            }
        });

    }
}

Consumer2.java

package com.youzikeji.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;

import java.io.IOException;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "topics";
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqUtils.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();

        // 声明交换机名称和类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 获取临时消息队列
        String tempQueue = channel.queueDeclare().getQueue();

        // 将交换机和临时消息队列进行绑定,使用通配符"#"进行路由匹配
        channel.queueBind(tempQueue, EXCHANGE_NAME, "user.#");

        // 接收消息
        channel.basicConsume(tempQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
            }
        });

    }
}

上述带RoutingKey(user.save.test)的消息,只能被第二个消费者接收并消费。

SpringBoot整合RabbitMQ

导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

Springboot配置

spring:
  application:
    name: rabbitmq-springboot
  rabbitmq:
    host: localhost
    port: 5672
    username: root
    password: root
    virtual-host: /test01

五种常用工作模式测试

TestRabbitMQ.java

模板的使用

  • 注入RabbitTemplate
  • 重载rabbitTemplate.convertAndSend(),简单模式和工作队列模式只有队列名参数和消息体参数,后三种工作模式的三个参数分别为交换机名称、路由键、消息体
package com.youzikeji;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestRabbitMQ {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimple() {
        rabbitTemplate.convertAndSend("hello", "hello world");
    }

    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "work queue");
        }
    }

    @Test
    public void testFanout() {
        rabbitTemplate.convertAndSend("springboot_logs", "", "Fanout message");
    }

    @Test
    public void testDirect() {
        rabbitTemplate.convertAndSend("springboot_logs_direct", "error", "direct message");
    }

    @Test
    public void testTopic() {
        rabbitTemplate.convertAndSend("springboot_logs_topic", "user.save", "topic message");
    }

}

注解的使用

  • @RabbitListener —— 修饰目标包括类、方法、注解,多个监听对应多个消费者

    @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @Repeatable(RabbitListeners.class)
    public @interface RabbitListener {
    
  • @QueueBinding(value = @Queue, exchange=@Exchang(...)) —— 指示绑定在一起的交换机和消息队列

  • @Queue(value = "xxx", durable = "true") —— 指示要监听的持久化队列,不带参数说明是临时队列

  • @Exchange(value="xxx", type="xxx") —— 指示绑定的交换机的名称和类型

  • @RabbitHandler —— 指示消息的接收处理方法

SimpleConsumer.java —— 简单模式

package com.youzikeji.simple;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "true"))  // 消费者监听hello队列
public class SimpleConsumer {

    @RabbitHandler
    public void receiveMsg(String message) {
        System.out.println("message = " + message);
    }
}

WorkConsumer.java —— 工作队列模式

package com.youzikeji.work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkConsumer {

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

FanoutConsumer.java —— 发布订阅模式(广播模式)

package com.youzikeji.fanout;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    exchange = @Exchange(value = "springboot_logs", type = "fanout")
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);

    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    exchange = @Exchange(value = "springboot_logs", type = "fanout")
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);

    }
}

DirectConsumer.java —— Direct路由模式

package com.youzikeji.direct;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "springboot_logs_direct", type = "direct"),
                    key = {"info", "warning", "error"}
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "springboot_logs_direct", type = "direct"),
                    key = {"info", "warning"}
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

TopicConsumer.java —— Topic模式

package com.youzikeji.topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "springboot_logs_topic", type = "topic"),
                    key = {"user.save", "user.*"}
            )
    })

    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "springboot_logs_topic", type = "topic"),
                    key = {"*.user.*"}
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "springboot_logs_topic", type = "topic"),
                    key = {"user.#"}
            )
    })
    public void receive3(String message) {
        System.out.println("message3 = " + message);
    }

}

MQ使用场景

异步处理

例如网站注册,没必要等待注册成功的通知邮件发送完毕或者注册成功的通知短信发送完毕才显示注册成功,邮件和短信的发送可以异步处理。

应用解耦

将订单系统和库存系统解耦,订单系统只用关注下订单,库存系统只用从消息队列中读取订单信息,再做库存的扣除。

流量削峰

秒杀系统中,流量过大会导致系统宕机,将大量的用户请求先写入消息队列,消息队列达到最大长度是抛弃其余的用户请求或跳转到错误页面,秒杀业务根据消息队列中的请求信息,再做后续处理。

06-20 10:40