前言
我们日常开发当中需要用到消息中间件的场合很多,我们或许也用到了形形色色的消息中间件产品,有老牌的ActiveMQ、RabbitMQ,炙手可热的Kafaka,还有阿里研发的Notify、MetaQ、RocketMQ等等,但反过来思考一下,如果让我们自己来设计一个消息中间件,需要考虑哪些方面的问题,需要有什么样的特性来满足实际业务生产的需要呢?下面就这个问题展开讨论。
消息队列应该有什么样的特性
很多有经验的工程师看到这个问题,脑袋里最直接能想到的应该是:解耦、异步、削峰。
解耦
想象这样一种场景,有个业务系统A,在处理某个核心业务逻辑的时候,跟另外的B、C、D三个业务系统有关联,当前已有的处理流程是A系统在处理完自己本地的事务后,顺次调用B、C、D三个系统的接口去同步数据状态。但是,随着业务的发展,突然有一天另外一个业务系统D的数据也同样需要根据A系统当中数据记录的更新而同步更新,此时,开发不得不去修改原有代码,加上调用D系统同步更新数据的代码。然后某一天,业务需求又发生了变更,B系统不再维护对应的业务数据,开发又不得不修改原有代码,将调用B系统接口的代码给删掉。如此周而复始,开发就会很苦恼。其实,站在A系统的角度来看,可能“关心”A系统变更的应用有很多个,但A系统只需要发布变更消息即可,谁关心谁接入。
异步
另外一种情况就是,还是上面最初的场景,A系统处理业务逻辑时,需要调用B、C、D三个系统的接口,但是其中D接口是个耗时任务接口,需要很长的时间才会得到处理结果,那么一旦这样的请求多了以后,A系统和D系统都会被拖垮。一个典型的例子是,电商当中的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户也肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。
削峰
还是A系统,A系统里面有个秒杀业务逻辑,每天上午11点有一波抢购活动,对于一天的其他时间来说,访问A系统的请求流量平平淡淡,A系统完全可以应付的了,但就是11点时候,有一波“突袭”流量访问进来,上游入口还好,做了集群部署等一系列应对高并发的措施,但MySQL数据库扛不住,每秒2K个请求已经是极限了。
像上面这种情况,上下游处理能力存在明显差距,利用消息队列来做一个通用的“漏斗”,当下游有能力处理的时候,再进行分发,就是一种很好的处理方式。
实际上,除了上面三个典型的应用场景以外,消息队列还有一个应用场景,那就是---最终一致性。
最终一致性
以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。 然而,这个过程中存在很多可能的意外:
- A扣钱成功,调用B加钱接口失败。
- A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。
- A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。
那么,要解决上面提到的可能出现的意外,就有两种备选方案:
- 用分布式事务去实现强一致性,但实际上这种实现成本很高。
- 利用消息队列的“记录”和“补偿”的方式去实现最终一致性。
这里主要讲第二种实现方式。回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。
整个这个模型依然可以基于RPC来做,但可以抽象成一个统一的模型,基于消息队列来做一个“企业总线”。 具体来说,本地事务维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。 broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。 我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。
最终一致性不是消息队列的必备特性,但某些时候确实可以依赖消息队列做一些需要满足最终一致性的事情。那么可以再思考一下,理论上只要消息队列不能100%保证不丢消息,那也无法实现最终一致性。
如何设计一个消息队列
其实总体而言,我们设计一个消息队列,一言以蔽之,可以简单的理解为设计一个整体的消息被消费的数据流。
其中主要涉及到三个角色:消息生产Producer、Broker(消息服务端)、消息消费者Consumer。
- Producer(消息生产者):发送消息到Broker。
- Broker(服务端):Broker这个概念主要来自于Apache的ActiveMQ,特指消息队列的服务端。
- Consumer(消息消费者):从消息队列接收消息,consumer回复消费确认。
其中,broker是我们的设计重点,它主要有三个职能:
- 消息的转储:消息存储在broker服务器上,在合适的时间点把消息投递出去,或者通过一系列手段辅助消息最终能送达消费机。
- 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
- 消息的传输:RPC调用
所以,一个消息队列的基本实现可以概括为:
当然,在基本实现的基础之上,消息队列也会视实际情况封装一些高级特性,如可靠投递,事务特性,性能优化等,这些高级特性不是本文探讨的重点,本文主要关注消息队列基本特性的原理和设计,即通讯协议、存储选择和消费关系维护这几方面。
通讯协议
消息Message既是信息的载体,消息发送者需要知道如何构造消息,消息接收者需要知道如何解析消息,它们需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。
几种常见消息通讯协议
JMS:JMS是由Sun公司早期提出的消息标准,旨在为java应用提供统一的消息操作,包括创建消息、发送消息、接收消息等。JMS提供了两种消息模型,点对点和发布订阅模型,当采用点对点模型时,消息将发送到一个队列,该队列的消息只能被一个消费者消费。而采用发布订阅模型时,消息可以被多个消费者消费。在发布订阅模型中,生产者和消费者完全独立,不需要感知对方的存在。
AMQP:AMQP是 Advanced Message Queuing Protocol,即高级消息队列协议。AMQP不是一个具体的消息队列实现,而是一个标准化的消息中间件协议。其目标是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口。 目前主流的ActiveMQ和RabbitMQ都支持AMQP协议。AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
Kafka的通讯协议:Kafka的Producer、Broker和Consumer之间采用的是一套自行设计的基于TCP层的协议。Kafka的这套协议完全是为了Kafka自身的业务需求而定制的。
存储选型
通常来说,可供选择的存储类型有如下几种:
- 内存
- 本地文件系统
- 分布式文件系统
- DB
- NoSQL
从速度上内存显然是最快的,对于允许消息丢失,消息堆积能力要求不高的场景(例如日志),内存会是比较好的选择。
DB则是最简单的实现可靠存储的方案,很适合用在可靠性要求很高,最终一致性的场景(例如交易消息),对于不需要100%保证数据完整性的场景,要求性能和消息堆积的场景,hbase也是一个很好的选择。
具体的选择还是要从支持的业务场景出发作出最合理的选择,如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择;对于不需要100%保证数据完整性的场景,要求性能和消息堆积的场景,hbase也是一个很好的选择,典型的比如 kafka的消息落地可以使用hadoop。
消费关系处理
经过上面的存储选型以后,我们的消息队列就初步具备了转储消息的能力。下面一个重要的事情就是解析发送接收关系,进行正确的消息投递了。市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。 掰开了揉碎了看,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。
为了实现广播功能,我们必须要维护消费关系,通常消息队列本身不维护消费订阅关系,可以利用zookeeper等成熟的系统维护消费关系,在消费关系发生变化时下发通知。
最后
以上就是一个基本的消息队列设计需要考虑的特性和关键点,大家get了多少呢,欢迎留言一起探讨。欢迎关注我:野生技术汇