• 相比传统的 JMS 模型,AMQP 主要多了 ExchangeBinding 这个新概念。

    在 AMQP 模型中,消息的生产者不是直接将消息发送到Queue队列,而是将消息发送到Exchange交换器,其中还新加了一个中间层Binding绑定,作用就是通过路由键Key将交换器和队列建立绑定关系。

    就好比类似用户表角色表,中间通过用户角色表来将用户和角色建立关系,从而实现关系绑定,在 RabbitMQ 中,消息生产者不直接跟队列建立关系,而是将消息发送到交换器之后,由交换器通过已经建立好的绑定关系,将消息发送到对应的队列!

    RabbitMQ 最终的架构模型,核心部分就变成如下图所示:

    从图中很容易看出,与 JMS 模型最明显的差别就是消息的生产者不直接将消息发送给队列,而是由Binding绑定决定交换器的消息应该发送到哪个队列,进一步实现了在消息的推送方面,更加灵活!

    2.2、交换器分发策略

    当消息的生产者将消息发送到交换器之后,是不会存储消息的,而是通过中间层绑定关系将消息分发到不同的队列上,其中交换器的分发策略分为四种:Direct、Topic、Headers、Fanout!

    2.2.1、Direct

    Direct 是 RabbitMQ 默认的交换机模式,也是最简单的模式,消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。

    如果传入的 routing key 为 black,不会转发到black.green。Direct 类型交换器是完全匹配、单播的模式

    2.2.2、Topic

    Topic 类型交换器转发消息和 Direct 一样,不同的是:它支持通配符转发,相比 Direct 类型更加灵活!

    两种通配符:*只能匹配一个单词,#可以匹配零个或多个。

    如果传入的 routing key 为 black#,不仅会转发到black,也会转发到black.green

    2.2.3、Headers

    headers 也是根据规则匹配, 相比 direct 和 topic 固定地使用 routing_key , headers 则是通过一个自定义匹配规则的消息头部类进行匹配。

    在队列与交换器绑定时,会设定一组键值对规则,消息中也包括一组键值对( headers 属性),当这些键值对有一对, 或全部匹配时,消息被投送到对应队列。

    此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。

    2.2.4、Fanout

    Fanout  类型交换器与上面几个不同,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略,也被成为消息广播模式。很像子网广播,每台子网内的主机都获得了一份复制的消息

    fanout 类型转发消息在四种类型中是最快的。

    三、RabbitMQ 安装

    RabbitMQ 基于 erlang 进行通信,相比其它的软件,安装有些麻烦,为了跟生产环境保持一直,操作系统选择CentOS7,不过本例采用rpm方式安装,任何新手都可以完成安装,过程如下!

    3.1、安装前命令准备

    输入如下命令,完成安装前的环境准备。

    yum install lsof  build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim

    3.2、下载 RabbitMQ、erlang、socat 的安装包

    本次下载的是RabbitMQ-3.6.5版本,采用rpm一键安装,适合新手直接上手。

    先创建一个rabbitmq目录,本例的目录路径为/usr/app/rabbitmq,然后在目录下执行如下命令,下载安装包!

    wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
    wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
    wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

    最终目录文件如下:

    3.3、安装软件包

    下载完之后,按顺序依次安装软件包,这个很重要哦~

    rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
    rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

    安装完成之后,修改rabbitmq的配置,默认配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目录下。

    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

    修改loopback_users节点的值!

    最后只需通过如下命令,启动服务即可!

    rabbitmq-server start &

    运行脚本之后,如果报错,例如下图!

    解决办法如下:

    vim /etc/rabbitmq/rabbitmq-env.conf

    在文件里添加一行,如下配置!

    NODENAME=rabbit@localhost

    然后,再保存!再次以下命令启动服务!

    rabbitmq-server start &

    通过如下命令,查询服务是否启动成功!

    lsof -i:5672

    如果出现5672已经被监听,说明已经启动成功!

    3.4、启动可视化的管控台

    输入如下命令,启动控制台!

    rabbitmq-plugins enable rabbitmq_management

    用浏览器打开http://ip:15672,这里的ip就是 CentOS 系统的 ip,结果如下:

    账号、密码,默认为guest,如果出现无法访问,检测防火墙是否开启,如果开启将其关闭即可!

    登录之后的监控平台,界面如下:

    四、web界面使用

    相比其他的消息队列,rabbitMQ 其中一个很明显的好处就是有 web 操作界面,而且简单易用。

    进入 web 管理界面之后,可以很清晰的看到分了 6 个菜单目录,分别是:Overview、Connections、Channels、Exchanges、Queues、Admin

    点击具体某个具体的信道,可以看到对应的消费队列等信息。

    下面,我们重点介绍一些如何通过 web 页面来操作 rabbitMQ!

    4.1、交换器管理

    点击进入 Exchanges 菜单,最下面有一个Add a new exchange标签。

    点击Add a new exchange,会展示如下信息!

    我们先新建一个名称为hello-exchange,类型为direct的交换器,结果如下。

    等会用于跟队列关联!

    4.2、队列管理

    点击进入 Queues 菜单,最下面也有一个Add a new queue标签。

    点击标签,即可进入添加队列操作界面!

    同样的,新建一个名称为hello-mq的消息队列,结果如下。

    队列新建好了之后,继续来建立绑定关系!

    4.3、绑定管理

    建立绑定关系,既可以从队列进入也可以从交换器进入。

    如果是从交换器进入,那么被关联的对象就是队列。

    如果是从队列进入,那么被关联的对象就是交换器。

    我们选择从队列入手,被绑定的交换器是hello-exchange,因为类型是direct,所以还需要填写routing key

    建立完成之后,在交换器那边也可以看到对应的绑定关系。

    4.4、发送消息

    最后,我们从交换器入手,选择对应的交换器,点击Publish message标签,填写对应的路由键 key,发送一下数据,查看数据是否发送到对应的队列中。

    然后点击进入 Queues 菜单,查询消息队列基本情况。

    然后选择hello-mq消息队列,点击Get messages标签,获取队列中的消息。

    结果如下,可以很清晰的看到,消息写入到队列!

    五、java客户端使用

    RabbitMQ 支持多种语言访问,本次介绍 RabbitMQ Java Client 的一些简单的api使用,如声明 Exchange、Queue,发送消息,消费消息,一些高级 api 会在后面的文章中详细的说明。

    5.1、引入 rabbitMQ 依赖包

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.1.0</version>
    </dependency>

    5.2、连接服务器

    使用给定的参数(host name,端口等等)连接AMQP的服务器。

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(userName);
    factory.setPassword(password);
    factory.setVirtualHost(virtualHost);
    factory.setHost(hostName);
    factory.setPort(portNumber);
    Connection conn = factory.newConnection();

    也可以使用通过 URI 方式进行连接。

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
    Connection conn = factory.newConnection();

    Connection(连接)接口可以被用作创建一个channel(管道),利用 channel(管道)可以进行发送和接收消息,在后面我们会频繁使用到它。

    Channel channel = conn.createChannel();

    注意,管道使用之后,需要进行关闭。

    channel.close();
    conn.close();

    5.3、创建交换器

    不仅可以通过 web页面进行创建交换器,还可以通过代码进行声明(创建的意思)交换器。

    //创建exchange,类型是direct类型
    channel.exchangeDeclare("ex-hello","direct");

    //第三个参数表示是否持久化,同步操作,有返回值
    AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("ex-hello","direct",true);
    System.out.println(ok);

    //创建带属性的交换器
    Map<String,Object> argument = new HashMap<>();
    argument.put("alternate-exchange","log");
    channel.exchangeDeclare("ex-hello","direct",true,false,argument);

    //异步创建exchange,没有返回值
    channel.exchangeDeclareNoWait("ex-hello","direct",true,false,false,argument);

    ///判断exchange是否存在,存在的返回ok,不存在的exchange则报错
    AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("ex-hello");
    System.out.println(declareOk);


    //删除exchange(可重复执行),删除一个不存在的也不会报错
    channel.exchangeDelete("ex-hello");

    创建交换器参数解读

    5.4、创建队列

    同样的,也可以通过代码进行声明队列。

    //同步创建队列
    channel.queueDeclare(queueName, truefalsefalsenull);

    //异步创建队列没有返回值
    channel.queueDeclareNoWait(queueName,true,false,false,null);

    //判断queue是否存在,不存在会抛出异常
    channel.exchangeDeclarePassive(queueName);

    //删除队列
    channel.queueDelete(queueName);

    创建队列参数解读

    5.5、创建绑定

    当交换器和队列都创建成功之后,就可以建立绑定关系。

    //交换器和队列进行绑定(可重复执行,不会重复创建)
    channel.queueBind(queueName, exchangeName, routingKey);

    //异步进行绑定,最后一个参数表示可以带自定义参数
    channel.queueBindNoWait(queueName,exchangeName,routingKey,null);

    //exchange和queue进行解绑(可重复执行)
    channel.queueUnbind(queueName, exchangeName, routingKey);

    //exchange与exchange进行绑定(可重复执行,不会重复创建)
    //第一个参数表示目标交换器
    //第二个参数表示原地址交换器
    //第三个参数表绑定路由key
    channel.exchangeBind(destination,source,routingKey);

    //exchange和exchange进行解绑(可重复执行)
    channel.exchangeUnbind(destination,source,routingKey);

    绑定关系参数解读

    5.6、发送消息

    发送消息到交换器就会使用我们上文所提到的channel管道。

    //发送的消息内容
    byte[] messageBodyBytes = "Hello, world!".getBytes();
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

    也可以在发送消息前设定一些消息属性。

    //自己构建BasicProperties的对象
    channel.basicPublish(exchangeName, routingKey,
                 new AMQP.BasicProperties.Builder()
                   .contentType("text/plain")
                   .deliveryMode(2)
                   .priority(1)
                   .userId("zhangsan")
                   .build()),
                   messageBodyBytes);

    发送指定头信息的消息。

    Map<String, Object> headers = new HashMap<String, Object>();
    headers.put("userName",  '"zhangsan');
    headers.put("userCode""123");

    //发送消息到交换器
    channel.basicPublish(exchangeName, routingKey,
                 new AMQP.BasicProperties.Builder()
                   .headers(headers)
                   .build()),
                   messageBodyBytes);

    发送一个有过期时间的消息,单位:ms。

    //设置消息过期时间,单位ms
    channel.basicPublish(exchangeName, routingKey,
                 new AMQP.BasicProperties.Builder()
                   .expiration("6000")
                   .build()),
                   messageBodyBytes);

    更多用法,可以参见官方 API

    5.7、接受消息

    从消息队列中接受消息也会使用我们上文所提到的channel管道。

    //监听队列中的消息
    channel.basicConsume(queueName,true,new SimpleConsumer(channel));

    监听队列消息参数解读

    具体的消息处理类需要继承DefaultConsumer,并重写handleDelivery方法,代码如下:

    public class SimpleConsumer extends DefaultConsumer{

        public SimpleConsumer(Channel channel){
            super(channel);
        }

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      //接受从队列中发送的消息
            System.out.println(consumerTag);
            System.out.println("-----收到消息了---------------");
            System.out.println("消息属性为:"+properties);
            System.out.println("消息内容为:"+new String(body));
        }
    }

    如果是手工确认消息,需要在handleDelivery方法中进行相关的确认,代码如下:

    //手动确认
    long deliveryTag = envelope.getDeliveryTag();
    channel.basicAck(deliveryTag, false);

    5.8、完整demo

    5.8.1、发送消息
    public class Producer {

        public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
            //连接RabbitMQ服务器
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            factory.setHost("197.168.24.206");
            factory.setPort(5672);
            //创建一个连接
            Connection conn = factory.newConnection();
            //获得信道
            Channel channel = conn.createChannel();
            //声明交换器
            channel.exchangeDeclare("ex-hello","direct");
            //发送的消息内容
            byte[] messageBodyBytes = "Hello, world!".getBytes();
            channel.basicPublish("ex-hello""route-hello"null, messageBodyBytes);
            //关闭通道
            channel.close();
            conn.close();
        }
    }
    5.8.2、接受消息
    public class Consumer {

        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //连接RabbitMQ服务器
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            factory.setHost("197.168.24.206");
            factory.setPort(5672);
            //创建一个连接
            Connection conn = factory.newConnection();
            //获得信道
            Channel channel = conn.createChannel();
            //声明队列
            channel.queueDeclare("queue-hello"truefalsefalsenull);
            //声明绑定
            channel.queueBind("queue-hello""ex-hello""route-hello");

            //监听队列中的消息
            channel.basicConsume("queue-hello",true,new SimpleConsumer(channel));

            TimeUnit.SECONDS.sleep(10);

            channel.close();
            conn.close();
        }
    }

    消息处理类SimpleConsumer

    public class SimpleConsumer extends DefaultConsumer {

        public SimpleConsumer(Channel channel) {
            super(channel);
        }

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //接受从队列中发送的消息
            System.out.println(consumerTag);
            System.out.println("-----收到消息了---------------");
            System.out.println("消息属性为:"+properties);
            System.out.println("消息内容为:"+new String(body));
        }
    }

    消息发送成功之后,启动消费者,输出结果如下:

    六、总结

    整篇文章主要介绍了 RabbitMQ 内部结构、安装步骤、使用教程,以及 java 客户端使用等内容,内容比较长,限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。

    七、参考

    1、简书 - 预流 - 消息队列之 RabbitMQ

    2、掘金 - 高广超 - RabbitMQ指南

    3、简书 - RabbitMQ笔记三:四种类型Exchange

    4、Java Client API Guide

    5、简书 - RabbitMQ java client的使用

    < END " data-textnode-index="257" data-index="7724" class="character">>

    深入剖析 rabbitMQ-LMLPHP


    本文分享自微信公众号 - Java极客技术(Javageektech)。
    如有侵权,请联系 [email protected] 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    09-08 04:34