提起消息队列,也许你的脑海里会不自觉地蹦出好多概念:JMS、Kafka、RocketMQ、AMQP、RabbitMQ、ActiveMQ、Pulsar、Redis Stream...如果你的项目中恰好用到了其中的一个消息中间件,那么你出去面试或者与同事交流技术的时候,对方很大概率会问你:为啥要选择xxx消息中间件?
如果你刚好只了解你正在用的消息中间件,那么你只能回答:因为只会xxx...
不...这绝对不是你想要的结局!在对方准备看你的笑话之前,你也许可以主动发起还击,把所有的框架的诞生背景、优缺点,适用场景等都说一遍,从概念到原理,从特性到源码。在说完了之后,为了不让对方感觉到尴尬,你应该故意停顿片刻,暗示对方自己不想再聊这个话题了,好让对方有喘息的机会,让他趁早切换话题,给他一个台阶下。
为了让自己能有如此实力,你务必要对这些常见的消息中间件有比较深入的了解。
我们先来看看这些技术的发展史。
MQ技术发展史:
如下图所示:
- 操作系统中的消息队列:在操作系统里面,我们可以通过消息队列实现两个或多个进程/线程之间的异步通信,发送方和接收方不需要同时与消息队列交互。放置在队列中的消息会一直存储,直到接收者取回它们。消息队列对可以在单个消息中传输的数据大小和队列能够存储的消息数量上限有隐式或显式限制;
- TIB:1983年,一位来自孟买的工程师Vivek Ranadive提出了一个问题:为什么没有通用的软件总线--一种通信系统,可以将信息从一个应用程序传递到另一个应用程序呢?最终,Vivek Ranadive创办了Teknekron公司,在1985年实现了第一个消息中间件:Teknekron的The Information Bus(TIB);
- MQSeries:TIB受到了企业的欢迎,同时这也被IBM看在眼里,于是他们决定研发自己的消息队列软件。最终,在1993年,IBM推出了面向消息中间件的产品,MQSeries,2002年更名为WebSphereMQ,2014年更名为IBM MQ;
- MSMQ:这么好的发财机会怎么能错过呢,于是微软也加入了竞争队伍,并在1997年发布了自家的消息中间件产品:MSMQ;
- JMS:这些巨头推出的消息中间件价格昂贵,一般只应用于大型组织机构。并且,由于商业壁垒,MQ厂商们只关注于应用互通的问题,而不会去考虑创建标准来实现不同的MQ产品之间的互通。为了打破这个壁垒,于是JMS诞生了;
- JMS,即Java消息服务(Java Message Service),是由Sun公司早期提出的消息标准,为Java提供统一的消息操作,是Java平台中关于面向消息中间件的接口;
- JMS是一种与厂商无关的API,类似于JDBC(Java Database Connectivity),用来访问消息系统和收发消息的编程API;
- 不过JMS毕竟是在真实的消息中间件API上面做了一层适配,各个消息中间件的实现仍旧是没有一个规范,最终会暴露出问题,使得程序更加混乱与脆弱。此刻,我们需要的是一种消息通信标准;
- AMQP:在2004至2006年,摩根大通在着手设计AMQP,最终,与其他公司(Cisco, Red Hat, iMatix等)成立了AMQP工作组,越来越多的公司参与进来,最终在2006年制定了AMQP的公开标准,由此,AMQP登上了历史的舞台,大家可以基于此标准来实现消息中间件,不受任何开发语言、产品等的条件限制;
- RabbitMQ:RabbitMQ最初就是一个实现了AMQP的消息中间件,本文我们会详细介绍这个家伙;
- Kafka: Kafka是一种分布式流式系统,被设计为能够作为一个统一平台来处理大型公司可能拥有的所有实时数据馈送。为此,它必须具有高吞吐量才能支持大容量事件流,例如实时日志聚合;
- RabbitMQ是基于队列和交换器的消息中间件,而Kafka是使用分区事务日志来实现存储层的分布式流式系统;
- Kafka不存在队列,而是按照主题存储记录集,并且为每个主题维护一个消息分区日志;
- Kafka中消费者自己维护消息的消费偏移量,支持持久订阅和临时订阅(重启后丢失偏移);
- Kafka中的消息是按照预设的时间进行持久化的,而不是根据消费状态;
- Kafka的设计之初就考虑到了高性能,通过以下方式实现:
- 利用分区实现并行处理;
- 使用磁盘顺序写,以及充分利用页缓存;
- 零拷贝技术;
- 批处理技术,数据压缩等;
- RocketMQ:随着阿里巴巴的电商业务不断发展,需要一款更高性能的消息中间件,RocketMQ就是这个业务背景的产物。RocketMQ是一个分布式消息中间件,具有低延迟、高性能和可靠性、万亿级别的容量和灵活的可扩展性,它是阿里巴巴于2012年开源的第三代分布式消息中间件。RocketMQ经历了多年双十一的洗礼,在可用性、可靠性以及稳定性等方面都有出色的表现。值得一提的是,RocketMQ最初就是借鉴了Kafka进行改造开发而来的,所以熟悉Kafka的朋友,会发现RocketMQ的原理和Kafka有很多相似之处;
- Pulsar:在Yahoo,为了追求大集群多租户、稳定可靠的 IO 服务质量、百万级 Topic、跨地域复制等需求,Pulsar 应运而生,以弥补Kafka在这方面的不足,Pulsar的优点:
- 应用场景:Pulsar 对用户来说提供了统一的消息模型,可以满足各种MQ;
- 架构优势:有存储计算分离的云原生架构的优势,使用BookKeeper作为Pulsar的存储层。在 Broker 层不存储任何数据,具有更高的可用性、更灵活的扩容和管理,避免数据的 reblance 和 catch-up;
- 社区活跃度:Pulsar 用户和贡献者数量也在快速增加...
而RabbitMQ作为传统的消息中间件,被大量应用于各种古老的项目,你第一个要拿下的就是它了,本文将带您从以下各个方面了解RabbitMQ相关知识:
- 什么是AMQP?
- 常见的交换机类型有哪些?
- 如何实现消息的持久化?
- RabbitMQ的连接复用有啥优势?
- RabbitMQ的消息ACK机制是如何实现的?
- RabbitMQ消息持久化机制性能如何?
- 如何避免消费过载的问题?
- 如何提高手动ACK签收的效率?
- 什么时候需要让消息重回队列?
- 如何保证消息的顺序消费?
- 如何实现可靠的消息投递?
关于其他的消息中间件,我会在下篇文章中继续给大家分享。
RabbitMQ是一种使用Erlang语言编写的开源的消息中间件,最初实现了AMQP(高级消息队列协议),后来通过插件架构进行了扩展,支持STOMP(面向流文本的消息传递协议)、MQTT(MQ遥控传输)等协议。
详细关于RabbitMQ支持的消息协议,参考官网:Which protocols does RabbitMQ support?
更多内容欢迎关注公众号Java架构杂谈
,或者我的博客IT宅itzhai.com
。
1. RabbitMQ优势
RabbitMQ支持多种客户端,如Python、Java、.NET、C、Ruby等,在易用性、扩展性、高可用性等方面表现都不错,并且可以与SpringAMQP完美整合,API丰富易用。
RabbitMQ程序健壮、稳定、易用,跨平台、支持多种语言,管理界面简单易用,功能全面,文档相对比较齐全,社区活跃。
2. AMQP简介
AMQP,全称为:Advanced Message Queuing Protocol,高级消息队列协议,是面向消息中间件的开放标准的二进制应用层协议。AMQP的核心特性是:面向消息、排队、路由(包括点对点和发布订阅)、可靠性和安全性。这些功能使其非常适合在应用程序之间传递业务消息,AMQP还可以用作物联网IoT协议。
目前,AMQP 1.0已经被批准为国际标准,具体规范文档,可以进一步阅读:OASIS Advanced Message Queuing Protocol (AMQP) Version 1.0
而RabbitMQ最初是为了支持AMQP 0-9-1而开发的,因此,该协议是RabbitMQ Broker支持的核心协议。
下面我们就简要介绍下AMQP 0-9-1协议。这部分内容,概念会比较多,稍微有点枯燥,但是可以说RabbitMQ就是按照这个协议去实现的,所以熟悉这个协议很重要。
2.1 AMQP模型概述
2.1.1 AMQP 0-9-1
AMQP 0-9-1 是一个二进制协议,定义了非常强大的消息传递语义。对于客户端来说,这是一个相对容易实现的协议,因此有大量客户端库可用于许多不同的编程语言和环境。
AMQP 0-9-1通常划分为两层:
功能层(Functional Layer)
:定义了一组命令(按功能做不同的分类),提供给应用程序,用于支撑消息相关的工作;传输层(Transport Layer)
:传输层将这些方法从应用程序传送到服务器并返回,并处理通道多路复用、成帧、内容编码、心跳、数据表示和错误处理。
可以在不改变协议的应用程序相关功能的情况下用任意的传输协议来替换传输层,也可以将传输层用于不同的高级协议。
2.1.2 AMQP 0-9-1模型简介
如下图,消息Broker(代理)
从消息发布者
(发布消息的应用程序,也称为生产者)接收消息并将它们路由到消费者
(处理消息的应用程序)。
由于AMQP是一个网络协议,因此,生产者
、消费者
、代理
都可以部署在不同的机器上。AMQP模型如下图所示:
消息发布到交换机(exchanges)
(通常将其比作邮局或邮箱),然后使用称为绑定(Bindings)
的规则将消息副本分发到队列(queues)
。然后代理(brokers)
要么将消息传递(deliver)
给订阅队列的消费者(consumers)
,要么消费者主动按需从队列中获取(fetch)
/拉取(pull)
消息。
消息元数据:发布消息的时候,发布者可以指定各种消息元数据(消息属性)
,其中一些元数据可能由代理使用,其余的元数据仅由接收消息的应用程序使用。
消息确认:由于网络是不可靠的,并且应用程序可能无法正确处理消息,因此AMQP 0-9-1模型有一个消息确认的概念:当消息传递给消费者时,消费者会自动或者由开发人员在应用程序中手动指定通知代理Broker,代理只会在收到消息(或消息组)的通知时从队列中完全删除该消息。
死信队列:在某些情况下,例如,当消息无法路由时,消息可能会返回给发布者、或者丢弃掉、或者将其放入所谓的死信队列
(如果代理扩展支持),发布者通过使用某些参数来选择如何处理此类情况。
队列(queues)/交换机(exchanges)和绑定(bindings)统称为AMQP实体。
2.1.3 AMQP 0-9-1 是一个可编程的协议
AMQP 0-9-1是一种可编程的协议,因为AMQP 0-9-1实体
和路由方案
由应用程序本身定义,而不是代理管理员。因此AMQP制定了一些规定来实现这些协议操作:
- 声明队列和交换机;
- 定义他们之间的绑定;
- 订阅队列等。
这为应用程序开发人员提供了很大的自由,但也要求他们了解潜在的定义冲突。在实践中,定义冲突很少见,通常表示为配置错误。
应用程序声明它们需要的AMQP 0-9-1实体
,定义必要的路由方案
,并在不需要使用它们时进行删除。
2.2 交换机(Exchanges)和交换机类型
交换机
是发送消息的AMQP 0-9-1实体。交换机收到一条消息,并将其路由到零个或者多个队列
中。咳咳,Java架构杂谈
提醒大家,不要联想到了网络的交换机(Network switch),只是中文名称一样而已。
使用的路由算法
取决于交换机类型
和称为绑定
的规则。以下是AMQP 0-9-1 Broker提供的四种交换机类型:
除了交换类型之外,交换机还声明了许多属性,关键属性有:
Name
,交换机的名称持久性
,保证交换机在Broker重启后仍然存在,如果没有指定持久,那么交换机在Broker重启后就不存在了,需要重新声明,并非所有场景和用例都要求交换机是持久的;自动删除
,当最后一个队列解除绑定时,交换机被删除;参数
,可选,由插件和特定于代理的功能使用。
2.2.1 默认交换机
默认交换机是由Broker预先声明的匿名直连交换机。使用默认交换机的时候,每个新建队列都会自动绑定到默认交换机上,绑定的路由键与队列名称相同,默认交换机看起来可以将消息直接传递到队列。
2.2.2 直连交换机
交换机根据消息路由键(router_key)
将消息传递到队列,消息将会投递到与路由键名称和队列名称相同的队列上。直接交换机是消息单播路由
的理想选择(尽管它们也可以用于多播路由)。
直连交换机如下图所示:
- 一个队列N使用路由键 K 绑定到交换机;
- 当具有路由键 M 的新消息到达直连交换机时,如果 K = M,则交换机将其路由到队列N。
如上图,具有路由键"itzhai.com"的消息达到交换机之后,则会路由到Queue1中。
直连交换机通常用于以循环的方式在多个消费者之间分配任务,也就是说,消息的负载均衡是发生在消费者之间而不是队列之间。
2.2.3 扇形交换机
扇形交换机将消息路由到绑定到它的所有队列,并且忽略路由键。也就是说,当新消息发布到该交换机时,该消息的副本将投递到所有绑定该交换机的队列。扇形交换机是消息广播
路由的理想选择。
扇形交换机如下图所示:
使用扇形交换机的案例都非常相似:
- 大型多人在线游戏(MMO)可以将其用于排行榜更新或其他全局事件;
- 体育新闻网站可以使用扇形交换机向客户端近乎实时的分发比分信息;
- 分布式系统使用它来广播各种状态和配置更新;
- 群聊可以使用它在参与者之间分发消息(AMQP没有内置presence的概念,因此XMPP可能会是更好的选择)。
2.2.4 主题交换机
主题交换机根据消息路由键和和用于将队列绑定到交换机的模式匹配字符串之间的匹配将消息路由到一个或者多个队列。
也就是说通过消息的路由键去匹配到绑定到交换机的路由键匹配字符串,如果匹配上了,就进行投递消息。
routing key模糊匹配的通配符如下:
*
:用于匹配一个单词,比如itzhai.com.*
,可以匹配:itzhai.com.a
,itzhai.com.b
#
:用于匹配0个或者多个单词,比如itzhai.com.#
,可以匹配:itzhai.com.a
,itzhai.com.a.b
routing key通过.
分隔字符串。
主题交换机如下图所示:
当生产者发送的routing_key=itzhai.com
的时候,会把消息路由投递到Queue1和Queue2。
当生产者发送的routing_key=www.itzhai.com
的时候,会把消息路由投递到Queue3。
当需要有针对性的选择多个接收消息的消费者或者应用的时候,主题交换机都可以被列入考虑的范围。常见的使用场景:
- 后台任务由多个工作线程完成,每个工作线程负责处理某些特定的任务,这个时候可以通过主题交换机订阅感兴趣的消息;
- 分发特定地理位置的信息,每个工作线程只订阅感兴趣的地理位置的信息;
- ...
2.2.5 头交换机
头交换机不依赖路由键的匹配规则来路由消息,而是根据发送消息内容中的请求头属性进行匹配。
头交换机类似于直连交换机,但是直连交换机的路由键必须是一个字符串,而请求头信息则没有这个约束,它们甚至可以是整数或者字典。因此可以用作路由键不必是字符串的直连交换。
绑定一个队列到头交换机上的时候,会同时绑定多个用于匹配的头信息。
投递消息的时候,可以携带一个x-match
参数,指定是否要求必须所有的头信息都匹配(all)才能进行投递,还是只要匹配任何一个就可以了(any)。
2.3 队列(Queues)
AMQP 0-9-1 中的队列与其他消息队列和任务队列系统中的队列类似,用于存储即将被消费的消息。一般地,队列与交换机共享一些属性,但队列也有一些特定的属性:
Name
:队列名称;Durable
:队列持久化,队列在Broker重启之后是否继续存在;Exclusive
:队列是否仅由一个连接使用,如果是,在连接关闭的时候,队列将被删除;Auto-delete
:当最后一个消费者取消订阅的时候,立即删除;Arguments
:可选,一些特定的插件和Broker功能使用,例如实现消息的TTL,队列长度限制等。
2.3.1 队列名称
应用程序可以设置队列名称,如果设置为空字符串,Broker会为它们生成一个唯一的队列名称,在队列声明响应体中一起返回给客户端。队列名称为255个字节以内的UTF-8字符。
2.3.2 队列持久化
持久化的队列的元数据会存储在磁盘上,当Broker重启之后,队列依然存在。没有被持久化的队列称为暂存队列。发布的消息也有同样的区分,也就是说,持久化的队列并不会使得路由到它的消息也具有持久性,需要手动把消息也标记为持久化才能保证消息的持久性。
2.4 绑定(Bindings)
绑定是交换机将消息路由到队列的规则
。为了让交换机能够正确的把消息投递到对应的队列,需要把交换机和队列通过路由键绑定起来,路由键就像是一个过滤器,决定了消息是否可以投递给消息队列。
2.5 消费者
如果消息只是存储在队列里没有被消费,是没有什么实际作用的。在AMQP 0-9-1中,有两种途径可以进行消息的消费:
- 订阅消息队列,以将消息投递给应用(
push API
),这是推荐的做法; - 应用根据需要主动的轮训获取消息(
pull API
),这种方式非常低效,在大多数情况下应该避免。
如果应用程序对某一个特定队列的消息感兴趣,则可以注册一个消费者,对队列进行订阅。每个队列可以有多个消费者,当然也可以注册一个独享的消费者,这个时候其他消费者会被排除在外。
每个消费者(订阅)都有一个称为消费者标签的字符串类型的标识符,可以用它来退订消息。
2.5.1 消息确认
消费者应用程序可能偶尔无法处理单个消息或有时会崩溃,另外网络问题也有可能导致问题。这就提出了一个问题:Broker何时应该从队列中删除消息?AMQP 0-9-1 规范中约定让消费者对此进行控制,有两种确认模式:
- 自动确认模式:在Broker向应用程序发送消息之后(使用basic.deliver或basic.get-ok方法),将消息从消息队列中删除;
- 显示确认模式:在应用程序向broker发回确认之后(使用basic.ack方法),将消息从消息队列中删除。
在显示模式下,应用程序选择何时发送确认消息。如果消费者在没有发送确认的情况下就挂掉了,那么Broker会将其重新投递给另一个消费者,如果此时没有可用的消费者,那么Broker将等到至少有一个消费者注册到该队列时,再尝试重新投递消息。
另外,如果应用程序崩溃(当连接关闭时 AMQP Broker会感知到这一点),并且AMQP Broker在预期的时间内未收到消息确认,则消息将重新入队,如果此时有其他消费者,可能立即传递给另一个消费者。为此,我们的消费者做好业务的幂等处理也是非常重要的。
2.5.2 拒绝消息
当消费者接收到消息之后,可能处理成功或者失败。应用程序可以通过拒绝消息向Broker表明消息处理失败了(或者当时无法完成)。拒绝消息的时候,应用程序可以要求Broker丢弃消息或者重新入队。
当队列中只有一个消费者的时候,请确保您不会通过不断地拒绝消息和重新入队导致消息在同一个消费者身上无限循环的情况发生。
在AMQP中,basic.reject
方法用来执行拒绝消息的操作。
2.5.3 预取消息
在多个消费者共享一个队列的情况,能够制定每个消费者在发送下一个ack之前可以一次性接收多少条消息,这是非常有用的特性。这可以在试图批量发布消息的时候,起到简单的负载均衡和提高消息吞吐量的作用。
2.6 消息属性和有效负载
AMQP 0-9-1模型中的消息是具有属性的,有些属性非常常见,以至于AMQP 0-9-1明确定义了它们,例如:
Content type
内容类型Content encoding
内容编码Routing key
路由键Delivery mode (persistent or not)
投递模式,是否持久化Message priority
消息优先级Message publishing timestamp
消息发布的时间戳Expiration period
消息有效期Publisher application id
发布消息的应用id
有些属性是被AMQP的Broker所使用的,但是大多数是开放给接收它们的应用程序用的。有些属性是可选的,称为消息头(headers),它们类似于HTTP协议的X-Headers,消息属性需要在消息被发布时定义。
消息体:AMQP消息除了属性之外,还包括一个有效载荷Payload(消息实际携带的数据),AMQP Broker视其为一个透明的字节数组来对待。Broker不会修改payload。消息可能只包含属性而没有payload。payload通常使用JSON、Thrift、Protocol Buffers和MessagePack等序列化格式来序列化成结构化的数据,以便进行发布,协议对等方通常使用Content type
和Content encoding
字段来传达此信息。
消息持久化:消息可以作为持久性发布,这使得Broker将他们持久化到磁盘。如果服务器重启之后,系统可以确保接收到的持久化消息不会丢失。简单的将消息发布到持久化的交换机或者被路由到持久化的队列中,是不会让消息也持久化的,消息是否持久化完全取决于消息本身的持久模式。将消息发布为持久性会影响性能,就像数据存储一样,持久性以一定的性能成本作为代价。
2.7 AMQP 0-9-1 方法
AMQP 0-9-1中定义了许多操作方法,详细参考:AMQP 0-9-1参考。
很多方法都有对应的响应方法,有些甚至有不止一种可能的响应,如basic.get,响应可能为:get-ok或者get-empty。
如下是声明一个交换机和响应成功的方法:
2.8 连接(Connections)
AMQP 0-9-1 连接通常是长连接,AMQP 0-9-1 是一种使用TCP提供可靠投递的应用层协议。AMQP 0-9-1连接使用身份认证机制并提供TLS (SSL)保护。当应用程序不再需要连接到Broker时,它应该优雅地关闭其 AMQP 0-9-1 连接,而不是突然关闭底层 TCP 连接。
2.9 通道(Channels)
某些应用程序需要同时开启连接到Broker,但是,同时保持许多TCP连接是不可取的,这样会消耗系统资源并且使得配置防火墙更加困难。
AMQP 0-9-1通过通道复用技术通过通道的形式实现在一个TCP连接上面支持多个连接(虚拟的链接)。同一个TCP连接中有多个通道,通道之间的通信是完全隔离的。客户端的每个协议操作都携带了一个通道ID,代理和客户端都是用它来确定该操作所走的通道。
通道仅存在于TCP连接上下文中,一旦TCP连接关闭,其上所有通道也跟着关闭。
一般的,我们会给每个线程打开一个新的通道进行通信。
2.10 虚拟主机
为了让单个代理可以托管多个隔离的环境(用户组、交换机、队列等),AMQP中提供了虚拟主机,这类似于许多流行的Web服务器使用的虚拟主机。协议客户端在连接协商期间需要指定想要使用的虚拟主机。
2.11. AMQP Client架构
推荐的AQMP Client架构须由下面多个抽象层组成:
成帧层
:此层接收AMQP协议方法,并按某种语言格式(结构、类等)来序列化成线级帧,成帧层可以根据AMQP协议规范来实现;连接管理层
:此层用于读写AMQP帧,并管理所有连接、会话逻辑。在此层中,我们可以封装开启连接和会话、错误处理、内容传输和接收数据的全部逻辑;API层
:此层暴露了应用程序工作的特定API。API层可能会反映一些现有的标准,或暴露高层AMQP的方法。API层本身可能是由多个层组成的,如构建于AMQP方法API之上的高级API;IO层
:此外,通常还会有一些I/O层,这此可以是非常简单的同步套接字读取和写入或复杂的异步多线程IO。
AMQP就介绍到这里了,接下来Java架构杂谈
带大家详细看看RabbitMQ。
3. RabbitMQ架构
RabbitMQ的整体架构如下图所示:
Broker
:Broker中按虚拟主机(virtual host
)划分,每个虚拟主机下面有自己的交换机(exchange
)和消息队列(queue
),以及交换机和队列的绑定routing_key
(有些人会把这个key称为binding_key
);
生产端
:一般地,同一个客户端(client
)里面的每个生产者(producer
)创建一个专门的通道(channel
),复用同一个TCP连接(connection
),每个生产者可以往Broker发布消息,发布消息的时候,需指定虚拟主机,以及虚拟主机上的交换机,并且消息需要带上routing_key;
消费端
:一般地,同一个客户端(client
)里面的每个消费者(consumer
)创建一个专门的通道(channel
),复用同一个TCP连接,每个消费者指定一个消息队列进行消费。同一个消息队列,可以有多个消费者共同消费,但消息队列里面的同一条消息,只会由一个消费者消费,多个消费者相当于给消息队列做了负载均衡。
针对默认交换机
、直连交换机
和主题交换机
,生产端带入的routing_key
和交换机与队列之间绑定的routing_key(binding_key)
进行匹配,匹配上了,就把消息投递给对应的消息队列。
针对扇形交换机,直接把消息投递给所有与扇形交换机绑定的队列。
4. RabbitMQ特性
4.1 消息ACK机制
ACK (Acknowledge character),即是确认字符,消息的接收方需要告诉发送方已确认接收消息,这是实现可靠消息投递的必备特性。
MQ系统中,涉及到ACK的流程如下图所示:
4.1.1 生产端ACK之Confirm消息机制
如上图所示:
Producer
发布消息到Broker
;Broker
将消息落地;Broker
发送ack给Producer
。
如果Producer
没有收到ack,那么可以重发消息,直到收到ack为止。为了避免无限的给Broker
投递消息,应该设置一个重试上限,并记录下发送失败的消息。在这个过程中,MQ Server可能会收到重复消息。
在RabbitMQ中,生产端的ACK通过ConfirmListener机制来实现:
在channel中开启确认模式confirmSelect()
,然后在channel中添加监听,用来监听Broker返回的应答。
Broker何时给生产端发送ACK?
对于不可路由的消息,一旦交换机验证消息不会路由到任何队列,Broker将发出ack,如果开启了Return消息机制(下一小节讲解),那么Broker会先发送basic.return
消息给客户端,再发送basic.ack
消息。示例代码如下:
String message = "Hello itzhai.com....";
// Confirm消息机制
channel.addConfirmListener(new TestConfirmListener());
// Return消息机制
channel.addReturnListener(new TestRetrunListener());
// 错误的路由键,但交换机的名称正确
String errorRoutingKey = "itzhai.com.test1";
boolean mandatory = true;
channel.basicPublish(exchangeName, errorRoutingKey, mandatory, basicProperties, message.getBytes());
执行以上代码,生产者将依次收到basic.return(Return消息),basic.ack(Confirm消息)。
对于可路由的消息,当所有队列都接收到消息的之后,Broker向生产端发送ACK。如果路由到的是持久队列,并且是持久消息,那么这个ACK就意味着消息持久化到了磁盘。
也就是说,路由到持久队列
的持久消息
的ACK将在将消息持久化到磁盘后发送。
RabbitMQ消息持久化的性能如何?
RabbitMQ持久化消息的刷盘策略:为了尽可能减少fsync(2)的调用次数,RabbitMQ在间隔一段时间(几百毫秒)或者在队列空闲的时候将消息分批保存到磁盘中。
这就意味着,在正常的负载下,生产端接收Broker的ACK时延可达几百毫秒。为了提高吞吐量,强烈建议生产端应用程序异步处理ACK,或者批量发布消息,并等待ACK。
4.1.2 生产端ACK之Return消息机制
Return消息机制用于处理一些不可路由的消息。发送消息的时候,如果指定的routing_key路由不到队列,这个时候就可以通过ReturnListener监听这种异常情况。
4.1.3 消费端ACK
如上图所示:
- 消息服务器将消息投递给消费者;
- 消费者消费消息,并向消息服务器发送ack;
- 消息服务器收到消费者的ack之后,将已落地的消息删除掉。
当Broker一直没有收到消费端的ACK,则会重发消息,这个过程一般采用指数退避策略
,时间间隔按指数增长。
Rabbit中的消费端ACK
在RabbitMQ中,消费端的ACK可以是自动的,或者手动的。
手动ACK签收
通过以下方法关闭自动ack签收(入参autoAck设置为false):
Channel.java
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
然后自定义一个支持ack的Consumer:
public class TestAckConsumer extends DefaultConsumer {
...
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
try{
...
// 成功消费的ack
boolean multiple = false;
channel.basicAck(envelope.getDeliveryTag(), multiple);
}catch (Exception e) {
// 未成功消费的ack,设置为不重回队列,即立刻删除消息
boolean multiple = false;
boolean requeue = false;
channel.basicNack(envelope.getDeliveryTag(), multiple, requeue);
}
}
}
channel中有三种ack相关的方法:
- basic.ack:用于肯定确认,指示RabbitMQ消息已经
处理成功
,可以丢弃
消息了; - basic.nack:用于否定确认,指示RabbitMQ消息
未处理成功
,可以通过参数指定是否需要丢弃消息
还是重回队列
。 - basic.reject:用于否定确认,指示RabbitMQ消息
未处理成功
,可以通过参数指定是否需要丢弃消息
还是重回队列
。
basic.nack与basic.reject的区别就是,basic.nack支持批量手动确认,basic.nack是RabbitMQ对AMQP 0-9-1协议的扩展。
自动ACK签收
使用自动确认模式,消息在发送之后就立刻被标记为投递消费成功。如果消费者的TCP连接或者通道在真正投递成功之前就关闭了,那么Broker发送的消息将会丢失。自动确认模式是以降低消息投递的可靠性来换取更高的消费端吞吐量(只要消费端处理速度能够跟上)。
如何避免消费过载的问题(消费端限流)?
使用自动模式可以提高吞吐量,但是前提是消费端要能够处理得过来,如果处理不过来,就会在消费端的内存中积压消息,直至把内存耗尽。因此,自动确认模式仅推荐用于能够以稳定的速度高效地处理消息的消费者。
为了避免消费过载问题,我们一般使用手动确认模式
,配合通道预取限制
一起使用:
// 每条消息的大小限制,0表示不限制
int prefetchSize = 0;
// MQ Server每次推送的消息的最大条数,0表示不限制
int prefetchCount = 1;
// true 表示配置应用于整个通道,false表示只应用于消费级别
boolean global = false;
channel.basicQos(prefetchSize, prefetchCount, global);
// 队列名称
String queueName = "com.itzhai.queue";
// 设置为手动确认模式
boolean autoAck = false;
// 消费者对象实例
Consumer consumer = new ItzhaiTestConsumer(channel);
channel.basicConsume(queueName, autoAck, consumer);
如何提高手动ACK签收的效率
如果不需要严格控制发送消费端ACK的时间,即,只要消费者成功接收到消息,不管有没有消费成功,都允许进行ACK回复,那么就可以通过批量ACK签收
的功能更来提高签收的消效率。做法如下:
// 手动签收模式
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// 注意,这里设置为批量签收
boolean mutiple = true;
// 签收deliveryTag以及deliveryTag之前的所有消息
channel.basicAck(deliveryTag, mutiple);
}
});
这样执行basicAck
,deliveryTag以及deliveryTag之前的所有消息都将会被签收。
什么时候需要让消息重回队列?
有时候消费者太繁忙导致无法立即处理接收到的消息,但是其他实例可能可以处理。这种情况,就可以拒绝消息,并且让消息重回队列。
另外,可以使用channel.basicNack
方法一次拒绝或者重新排队多条消息:
// 指定批量拒绝策略
boolean multiple = true;
// 指定拒绝之后重新入队
boolean requeue = true;
channel.basicNack(envelope.getDeliveryTag(), multiple, requeue);
极端情况下,如果所有消费者因为暂时无法处理接收的消息,会导致消息不断的循环重回入队,导致消耗网络带宽和CPU资源。为了避免这种情况,可以跟踪重回队列的消息数量,决定是否需要永久拒绝消息(丢弃消息)还是延迟重回队列的时间。
4.2 消息的顺序性能够得到保证吗?
一般情况下,在单个通道上发布的消息,Rabbit会按照消息发布的相通顺序向生产端发送ACK消息,但也不是绝对的。发布ACK的确切时刻取决于消息的传递模式(持久化或瞬时),以及消息路由到的队列的属性。也就是说,不同的消息在不同的时间准备好进行确认,确认消息可以以不同的顺序达到。所以,应用程序尽可能不要依赖于消息的顺序性。
4.3 消息处理的幂等性如何处理?
无论是生产端还是消费端的ACK,都有可能因为网络或者程序问题导致ACK消息没有及时送达,这个时候会导致重复的消息投递。如何保证消费同一条消息的情况下不影响业务,这就需要保证消息处理的幂等性。
也就是说,针对同一条消息,无论消费者消费多少次,产生的效果始终应该跟消费一次的保持一致,并且返回的ACK结果也是一致的。
常用的实现消息处理幂等性的方法:
- 每条消息生成唯一ID,消费端根据唯一ID判断是否已经消费过,如果消费过,则直接返回消费成功的ACK。
- 针对入库的业务操作可以通过数据库的唯一索引来实现避免重复业务数据入库;
- 针对修改数据类的操作,可以先判断数据是否已经是目标状态了,如果是目标状态,直接返回再进行更新。
- 针对并发的场景,我们需要给业务消费程序添加分布式锁,避免并发执行导致触发业务重复处理。
4.4 死信队列
如果消息队列中的消息没有被正常消费掉,那么该消息就会成为一个死信(Dead Letter)
,这条消息可以被重新发送到另一个交换机上,后面这个交换机就是死信交换机(DLX)
,死信交换机绑定的队列就是死信队列
。在以下情况下导致的消息未被正常消费,均会使消息变为死信:
- 消费者使用
basic.reject
或者basic.nack
来拒绝消息,同时设置requeue
参数为false
,表示消息不需要重回队列; - 消息设置了TTL,并且过期了,或者队列设置了消息的过期时间
x-message-ttl
; - 由于消息队列超过了长度限制导致消息被丢弃了。
死信队列也是一个正常的交换机,它可以是任何常见的交换机类型,与常规交换机声明没有区别。
DLX可以有客户端使用队列参数(arguments)进行定义,或者在服务器中使用策略(policy)进行定义,在policy和arguments都定义了的情况下,arguments中指定的那个会否决policy中指定的那个。
通过policy启用死信队列:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
通过arguments启用死信队列:
// 声明一个交换机,作为死信交换机
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
5. 持久化消息就意味着消息的可靠性吗?如何实现可靠性投递?
消息可靠性需要考虑生产端投递消息的可靠性以及保证消费端最终成功地消费消息。
虽然通过生产端的ACK机制,可以确保消息成功的投递到了RabbitMQ中,保证投递的消息不丢失。但是如果生产端不知道消费者究竟有没有成功的消费了消息,那也就无法实现可靠性投递了。
而生产端投递消息的过程中,通常会涉及到生产端的事务提交,要保证消息跟随事务提交而发送,也是需要考虑的问题。
如何实现可靠投递呢?这里留给大家思考,关键设计要点:
- 是否要发消息跟随生产端
事务
一起保存到发送日志表
,提交事务之后立刻向消息队列投递一次消息;- 生产端发送日志表
消息状态
:1 发送中,2 Broker签收成功,3 Broker签收失败,4 消费端签收成功,5 消费端签收失败
- 生产端发送日志表
- 使用消息队列模拟
RPC调用
,在消费者成功处理消息之后,向生产者投递成功消费的消息,以便让生产端知道消息已经处理成功了; 定时任务
定时扫描生产端发送日志表,对于超过固定时间之内,还未处理成功的消息,进行重试投递,重试可以使用指数退避策略
,并设置投递上限次数。如果达到上限
次数还未成功,则预警人工介入
排查;- 消费端一定要做好
幂等
处理,避免重复消费导致业务异常。
提示的还不够具体?我再上一张图:
有更好的方案的朋友,欢迎在评论区留言交流,也许你就是评论区最靓的仔。
6. RabbitMQ更多使用场景
通过给消息设置TTL,超时时候放入死信队列进行处理,可以实现延迟队列,当然,RabbitMQ也有专门的延迟队列插件可以使用;
另外,也可以使用RabbitMQ模拟RPC调用,参考上一节实现消息可靠性投递的例子。
更多的使用场景欢迎大家进行补充。
关于更多消息中间件的文章,欢迎关注Java架构杂谈
,或者我的博客IT宅(itzhai.com)
,我会持续的输出相关内容。
我精心整理了一份Redis宝典给大家,涵盖了Redis的方方面面,面试官懂的里面有,面试官不懂的里面也有,有了它,不怕面试官连环问,就怕面试官一上来就问你Redis的Redo Log是干啥的?毕竟这种问题我也不会。
在Java架构杂谈
公众号发送Redis
关键字获取pdf文件: