转载:https://blog.csdn.net/Evankaka/article/details/80977027
1.RabbitMq简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。AMQP协议介绍RabbitMQ就必须先介绍AMQP协议,因为RabbitMQ是它的一种实现而已。
2.RabbitMq模型
Server(broker)
接受客户端连接,实现AMQP消息队列和路由功能的进程,可以理解为邮局。
Virtual Host
其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等,就好比于tomcat中webapps目录下可以部署多个web项目。
Exchange
接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列,就好比邮递员。Exchange有4种类型,direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
1)、Direct 直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue。 2)、fanout广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。 3)、topic主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)。 4)、headers消息体的header匹配(ignore)
Message Queue
1)、设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失。 2)、设置为临时队列,queue中的数据在系统重启之后就会丢失。 3)、设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除。
消息只能存储在队列(queue)中。尽管消息在rabbitMQ和应用程序间流通,但是队列却是存在于RabbitMQ内部。一个队列不受任何限制,它可以存储你想要存储的消息量,它本质上是一个无限的缓冲区。多个生产者可以向同一个队列发送消息,多个消费者可以尝试从同一个消息队列中接收数据。消息队列,提供了FIFO的处理机制,具有缓存消息的能力。Rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
Message
由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,就好比于邮箱里面的信件。而Body是真正需要传输的APP数据,就像信件里面的信纸。
Binding
Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定,就好比于邮件上面的地址。
Connection
连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
Channel
信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接,可以简单的理解为线程池中的一个个线程。
3.通信过程
假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:
P1生产消息,发送给服务器端的Exchange
Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
Queue1收到消息,将消息发送给订阅者C1
C1收到消息,发送ACK给队列确认收到消息
Queue1收到ACK,删除队列中缓存的此条消息
Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:
如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
如果cosumer接受了消息, 但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。
如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。
rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。