消息中间件RabbitMQ实战(一)-LMLPHP

简介

消息与消息队列

消息(Message)是指应用于应用之间传送的数据,消息的类型包括文本字符串、JSON、XML、内嵌对象等等...

所谓 消息中间件 / 消息队列(Message Queue Middleware,简称MQ)是利用高效可靠的消息传递机制进行数据交流,同时可以基于数据通信来进行分布式系统的继承,消息中间件一般有两种传递模式:点对点(Point-to-Point)模式和发布/订阅(Pub/Sub)模式,点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为了可能,发布订阅模式定义了如何向一个内容节点发布和订阅内容,这个内容节点叫topic,这种模式可以满足消费者发布一个消息,多个消费者同时消费同一信息的需求。

性能对比

消息中间件RabbitMQ实战(一)-LMLPHP

了解什么是AMQP协议?

消息中间件RabbitMQ实战(一)-LMLPHP

其中:Exchange和Message Queue存在着绑定的关系。

AMQP核心概念

  • Server:又称Broker,接受客户端的连接,实现AMQP实体服务;
  • Connection:连接,应用于程序与Broker的网络连接;
  • Channel:网络通道,几乎所有的操作都是在channel中进行的,channel是进行消息读写的通道,客户可以建立多个channel,每个channel代表一个会话任务;
  • Message:消息,服务器与应用程序之间传送的数据,有Properties和Body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body是消息体内容;
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtrual host里面可以有若干个Exchange和Queue,用一个Virtrual host里面不能有相同名字的Exchange或Queue;
  • Exchange:交换机,接收消息,很久路由键转发消息到绑定的队列;
  • Exchange和Queue之间的虚拟连接,binding中可以包含touting key;
  • Routing key:一个路由规则,虚拟机可用它来确定一个如何路由一个特定的消息;
  • Queue:也成Message Queue,消息队列,保存消息并将他们转发给消费者;

消息中间的作用和使用场景

消息中间件RabbitMQ实战(一)-LMLPHP

RabbitMQ的核心组件

消息中间件RabbitMQ实战(一)-LMLPHP

Hello World

“我会Hello World!”,学习一门技术,先出hello world开始,我们来编写一个Java项目来使用RabbitMQ来实现消息的生产和消费,这样能让我们能够更好的理解RabbitMQ的作用和原理,RabbitMQ是消息代理,它负责接收并转发消息,我们可以将它视为邮局,将要发送的邮件都放在又想着,可以确保Mailperson现在活或者女生最终将邮件传递给别人,因此:RabbitMQ是一个邮箱,一个邮局和一个邮递员的实例。

  • 生产意味着发送,发送消息的程序是生产者
  • 队列就是RabbitMQ内部的邮箱名称,消息是存储在队列中的,尽管消息流经RabbitMQ和你的应用程序,生产者可以发送一个队列信息,许多消费者可以尝试从一个队列里接收数据
  • 消费与接受者是同一个身份,一个消费者是一个程序,主要是等待接收信息

pom依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

我们称其为消息发布者(发送者)MessageSend和我们的消息消费者(接收者) MessageRecv。发布者将连接到RabbitMQ,发送一条消息,然后退出。

MessageSend.java

/**
 * @Author 林必昭
 * @Date 2019/11/29 22:12
 * @descr
 */
public class MassageSend {

    private final static String QUEUE_NAME = "myQueue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //连接本地计算机代理,也就是本地主机,如果我们想连到另一台计算机,这里只需要指定IP地址
        factory.setHost("localhost");
        try(Connection conn = factory.newConnection();
            //创建一个通道,完成我们的业务逻辑
            Channel channel =  conn.createChannel()){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message = "Hello World!";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [xxx] 发送消息: '" + message + "'");
        }
    }
}

MessageRecv.java

public class MessageRecv {

    private final static String QUEUE_NAME = "myQueue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("消费者正在等待消息,退出请按CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [xxx] 接收到了: '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
11-30 05:53