前 言
【重要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-消息发送(视频版)
本文是“一张图进阶 RocketMQ” 系列第 3 篇,对 RocketMQ 不了解的同学可以先看看三此君的
一张图进阶 RocketMQ-整体架构,一张图进阶 RocketMQ - NameServer。
在了解了 RocketMQ 的整体架构之后,我们来深入的分析下生产者消息发送的设计与实现。本文从一个生产者示例开始,以两行代码为切入点,逐步剖析生产者启动流程以及同步消息发送流程。
生产者示例
消息发送分为同步消息、异步消息和单向消息,简单来说:
- 同步消息:消息发送之后会等待 Broker 响应,并把响应结果传递给业务线程,整个过程业务线程在等待。
- 异步消息:调用异步发送 API,Producer 把消息发送请求放进线程池就返回。逻辑处理,网络请求都在线程池中进行,等结果处理完之后回调业务定义好的回调函数。
- 单向消息:只负责发送消息,不管发送结果。
我们先来回顾下同步消息发送的例子:
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("sancijun","order", "orderId", "我一定会关注三此君".getBytes("UTF-8"));
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
- 首先,实例化一个生产者
producer
,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。 - 然后
producer
得做一些初始化(这是很关键的步骤),它要和 NameServer 通信,要先初始化通信模块等。 producer
已经准备好了,那得准备好要发的内容,把 "我一定会关注三此君" 发送到 Topic=”sanicjun“。- 内容准备好,那
producer
就可以把消息发送出去了。producer
怎么知道 Broker 地址呢?他会去 NameServer 获取路由信息,得到 Broker 的地址是 localhost:10909,然后通过网络通信将消息发送给 Broker。 - 生产者发送的消息通过网络传输给 Broker,Broker 需要对消息按照一定的结构进行存储。存储完成之后,把存储结果告知生产者。
其中有两个关键的地方:producer.start()
及 producer.send()
,也就是生产者初始化及消息发送。我们以这两行代码为切入点,看看 RocketMQ Producer 的设计与实现。
目标:将消息发送给 Broker 进行存储
关键点 1: 怎样根据 topic+路由信息 建立网络通道,进行消息的发送
关键点 2: 消息在发送过程中又经过了哪些处理?
生产者启动
我们实例化一个生产者 DefaultMQProducer,并调用 DefaultMQProducer.start() 方法进行初始化:
启动流程比较长,其实最重要的就是初始化了通信模块,并启动了多个定时任务,这些在后面的消息发送过程中都会用到:
检查配置是否合法:生产者组名是否为空、是否满足命名规则、长度是否满足等。
启动通信模块服务 Netty RemotingClient:RemotingClient 是一个接口,底层使用的通讯框架是Netty,提供了实现类 NettyRemotingClient,RemotingClient 在初始化的时候实例化 Bootstrap,方便后续用来创建 SocketChannel;后文会介绍 RocketMQ 的通信机制,大家稍安勿躁。
启动 5 个后台定时任务:定时更新 NameServerAddr 信息,定时更新 topic 的路由信息,定时向 Broker 发送心跳及清理下线的 Broker,定时持久化 Consumer 的 Offset 信息,定时调整线程池;
生产者每 30s 会从某台 NameServer 获取 Topic 和 Broker 的映射关系(路由信息)存在本地内存中,如果发现新的 Broker 就会和其建立长连接,每 30s 会发送心跳至 Broker 维护连接。
同步发送
总体上讲,消息发送可以划分为三个层级:
- 业务层:准备需要发送的消息。
- 消息处理层:获取业务发送的 Message,经过一系列的参数检查、消息发送准备、参数包装等操作。
- 通信层:基于 Netty 封装的一个网络通信服务,将消息发送给 Broker。
我们通过前面的示例来看整个同步消息发送的处理流程,整个过程我们的主要目标就是把消息发送到 Broker:
第一步:业务层构建待发送消息
Message msg = new Message("sancijun","order", "orderId", "我一定会关注三此君".getBytes("UTF-8"));
第二步:然后我们调用
producer.send(msg)
发送消息,可是 producer 怎么知道发给谁呢?消息本身又需要经过哪些处理呢?我们进入调用链直到 sendDefaultImpl检查消息是否为空,消息的 Topic 的名字是否为空或者是否符合规范,消息体大小是否符合要求,最大值为4MB,可以通过 maxMessageSize 进行设置。
执行 tryToFindTopicPublishInfo() 方法:获取 Topic 路由信息,如果不存在则抛出异常。如果本地缓存没有路由信息,就通过Namesrv 获取路由信息,更新到本地。消息构建的时候我们指定了消息所属 Topic,根据 Topic 路由信息我们可以找到对应的 Broker。
计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的。在同步发送情况下如果发送失败会默认重投两次(默认retryTimesWhenSendFailed = 2),并且不会选择上次失败的 Broker,会向其他 Broker 投递。
执行队列选择方法 selectOneMessageQueue()。根据 lastBrokerName(上次发送消息失败的 Broker 的名字)和 Topic 路由信息选一个 MessageQueue。
首次发送时 lastBrokerName 为 null,采用轮询策略选择一个 MessageQueue。如果上次发送失败,也是采用轮询策略选择一个 MessageQueue,但是会跳过上次发送失败 Broker 的 MessageQueue,也就是换一个 Broker 发送。执行 sendKernelImpl() 方法。
第三步:sendDefaultImpl 做了一系列逻辑处理,我们已经得到了待发送的 BrokerName,而我们的目标是把消息发送到 Broker。sendKernelImpl 方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址、请求体等),将请求传递给通信层。
根据 MessageQueue.brokerName 获取 Broker IP 地址,给 message 添加全局唯一 ID。
调用 MQClientAPIImpl.sendMessage(),首先构建一个远程请求 RemotingCommand,根据发送类型(同步或异步)调用不同的通信层实现方法。我们这里是同步消息,则调用
RemotingClient.invokeSync()。
处理返回结果,将通信层返回的结果封装成 SendResult 对象返回给业务层。
第四步:RemotingClient 是基于 Netty 实现的,熟悉 Netty 的同学已经大概知道后面的流程,不熟悉的同学也没有关系,这里先混个眼熟,下面我们会对 Netty 做简单的介绍。
- RemotingClient.invokeSync() 先是通过 Broker Addr 获取或者创建 Netty Channel。先从 channelTables Map 本地缓存中,以Broker Addr 为 key 获取 Channel,没有获取到则通过 Netty Bootstrap.connect( Broker Addr) 创建 Channel,并放入缓存。
- 然后生成<opaque, ResponseFuture>的键值对放入 responseTable 缓存中,结果返回的时候根据 opaque 从缓存中获取结果。
- 调用 channel.writeAndFlush() 将消息通过网络传输给指定 Broker。这里是 Netty 框架的 API,已经不在 RocketMQ 范畴。
- 调用 ResponseFuture.waitResponse() 方法,直到 Netty 接收 Broker的返回结果。其实就是执行 countDownLatch.await()。
第五步:结果处理及返回。
- Broker 处理结果返回,Netty 产生可读事件,由 Channelhandler 处理可读事件,这里是 NettyClientHandler.channelRead0()接收写入数据,处理可读事件。
- 然后处理返回结果,从 responseTable 取出 ResponseFuture,并执行 responseFuture.putResponse()。实际上就只执行 countDownLatch.countDown() 唤醒第四步中等待的调用线程,返回 Broker 的处理结果 RemotingCommand。
- 结果层层返回,直到 MQClientAPIImpl.sendMessageSync() 出手了,这里调用 MQClientAPIImpl.processSendResponse() 处理返回结果,封装成 SendResult 对象返回给业务层。
到这里,生产者已经将消息发送到指定的 Broker 了,其中包括了消息的层层校验及封装;还有很重要的是如何选择一个 MessageQueue 进行发送(重试),重试是保证消息发送可靠的关键步骤;最后通过 Netty 将请求发送给 Broker。我们先不管 Broker 收到请求如何处理,但是要明白消息如何送到 Broker 进行存储,需要对 Netty 有简单的理解。
总结
以上就是 RocketMQ 消息发送的主要内容,我们简单的总结下:
- 生产者启动:主要是调用 NettyRemotingClient.start() 初始化 Netty 客户端,并启动 5 个后台线程;
- 消息发送:业务层封装发送的消息,逻辑层进行层层校验及封装,轮询策略选择一个 MessageQueue 发送(重试),通信层基于 Netty 将消息发送给 Broker。
参考文献
丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.
李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.