当Broker收到生产者的消息发送请求时,会对请求进行处理,从请求中解析发送的消息数据,接下来以单个消息的接收为例,看一下消息的接收过程。
数据校验
封装消息
首先Broker会创建一个MessageExtBrokerInner
对象封装从请求中解析到的消息数据,它会将Topic信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner
中,后续都使用这个MessageExtBrokerInner
对象来操纵消息。
接下来会判断是否开启事务,开启事务与未开启事务时调用的方法不一致,这里以未开启事务为例,看下消息的持久化过程。
消息校验
在存储消息之前,需要对消息进行一系列的校验,保证收到的消息有效合法。
Broker存储合法性检查
主要对Broker是否可以写入消息进行检查,包含以下几个方面:
- 判断是否处于关闭消息存储的状态,如果处于关闭状态则不再受理消息的存储;
- Broker是否是从节点,从节点只能读不能写;
- Broker是否有写权限,如果没有写入权限,不能进行写入操作;
- 操作系统是否处于PAGECACHE繁忙状态,处于繁忙状态同样不能进行写入操作;
消息长度检查
主要是对主题的长度校验和消息属性的长度校验。
LMQ(Light Message Queue)
主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量。
消息写入
对消息进行校验完毕之后,就可以对消息进行写入了。
前面说到Broker将收到的消息封装为了MessageExtBrokerInner
对象,这里会新增以下设置:
(1)设置消息存储的时间(当前时间);
(2)计算消息体的CRC值,并设置到对应的成员变量中;
(3)如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识;
写入缓冲区
RocketMQ会将消息数据先写入内存buffer,写入之前还会做一些校验:
(1)对消息属性数据的长度进行校验判断是否超过限定值;
(2)对消息整体内容长度进行校验,判断是否超过最大的长度限制;
校验通过之后,会根据消息总体内容的长度对buffer进行初始化,也就是根据需要的大小申请一块内存区域,开始写入以下数据:
- 消息总长度,占4个字节;
- 魔数,占4个字节;
- 消息体CRC校验和,占4个字节;
- 队列ID,占4个字节;
- 标识,占4个字节;
- 队列的偏移量,占8个字节;
- 消息在文件的物理偏移量,占8个字节;
- 系统标识,占4个字节;
- 发送消息的时间戳,占8个字节;
- 发送消息的主机地址,占8个字节;
- 存储时间戳,占8个字节;
- 存储消息的主机地址,占8个字节;
- 消息的重试次数,占4个字节;
- 事务相关偏移量,占8个字节;
- 消息内容的长度,占4个字节;
- 消息内容,由于消息内容不固定,所以长度不固定;
- 主题名称的长度,占1个字节;
- 主题名称内容,长度不固定;
- 消息属性长度,占2个字节;
- 消息属性内容,长度不固定;
整体存储格式如下:
获取CommitLog
RocketMQ将每一条消息存储到CommitLog文件中,存储文件的根目录由配置参数storePathRootDir
决定:
默认每一个CommitLog的文件大小为1G,如果文件写满会新建一个CommitLog文件,以该文件中第一条消息的偏移量为文件名,小于20位用0补齐。
在持久化消息之前,需要知道消息要写入哪个CommitLog文件,RocketMQ通过一个队列(对应MappedFileQueue
)存储了记录了所有的CommitLog文件(对应MappedFile
),并提供了相关方法获取到当前正在使用的那个CommitLog。
如果获取到的CommitLog取为空或者已写满,可能是首次写入消息还未创建文件或者上一次写入的文件已达到规定的大小(1G),此时会新建一个CommitLog文件。
写入CommitLog
知道要写入哪个CommitLog之后,就可以将之前已经写入缓冲区buffer的消息数据写入到CommitLog了。
RocketMQ提供了两种方式进行写入:
(1)通过暂存池将数据写入缓冲区
在开启暂存池时,会先将数据都写入字节缓冲区ByteBuffer
中,ByteBuffer
在申请内存时,可以申请JVM堆内内存(HeapByteBuffer
),也可以申请堆外内存(DirectByteBuffer
),RocketMQ使用的是堆外内存DirectByteBuffer
。
暂存池
类似线程池,只不过池中存放的是提前申请好的内存(ByteBuffer
),RocketMQ会预先申请一些内存,从源码中可以看到申请的是堆外内存,然后放入池中,需要用时从池中获取,使用完毕后会归还到池中。
暂存池的开启条件
需要同时满足以下三个条件时才会开启暂存池:
- 配置中允许开启暂存池;
- Broker的角色不能是
SLAVE
; - 刷盘策略为异步刷盘;
从条件3中可以看出异步刷盘时才可以开启暂存池的使用,因为异步刷盘,很有可能是积攒了一批消息,需要同时刷入磁盘,所以使用暂存池可以将之前写入的消息先暂存在内存缓冲区中,等待执行刷盘时,将积攒的消息一起刷入磁盘中,而同步刷盘由于每次写入完毕之后要立刻刷回磁盘,那么就没有必要使用暂存池缓存数据了。
(2)通过文件映射
未开启暂存池时使用文件映射,使用MappedByteBuffer
映射对应的CommitLog文件,MappedByteBuffer
是ByteBuffer的子类,它可以将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,对文件内容进行操作。
使用MappedByteBuffer
可以减少数据的拷贝,详细内容可参考【Java】Java中的零拷贝。
消息写入流程
了解了写入方式之后,来看下消息的写入流程:
CommitLog对应的
MappedFile
对象中记录了当前文件的写入位置,首先会判断准备写入的位置是否小于文件总大小,如果小于意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息;判断是否开启暂存池,开启暂存池时使用
MappedFile
中的ByteBuffer
来开辟共享内存,否则使用MappedFile
中的;MappedByteBuffer
来开辟。
将之前已经写入缓冲区的消息数据写入到开辟的共享内存中;
返回消息写入结果,有以下几种状态:
- PUT_OK:写入成功;
- END_OF_FILE:超过文件大小;
- MESSAGE_SIZE_EXCEEDED:消息长度超过最大允许长度:
- PROPERTIES_SIZE_EXCEEDED:消息、属性超过最大允许长度;
- UNKNOWN_ERROR:未知异常;
需要注意,此时消息驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容刷入到硬盘中。
RocketMQ消息存储相关源码可参考:【RocketMQ】【源码】消息的存储
刷盘处理
在以上的消息写入步骤完成之后,会进行刷盘操作。
有两种刷盘策略:
同步刷盘:表示消息写入到内存之后需要立刻刷到磁盘文件中。
异步刷盘:表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险。
不管同步刷盘还是异步刷盘,都是唤醒对应的刷盘线程来进行,这里不对唤醒的具体过程进行讲解,如果想了解可参考【RocketMQ】【源码】消息的刷盘机制。
同步刷盘
前面讲到,暂存池只有在异步刷盘时才可以启用,所以设置为同步刷盘时,使用的是文件映射的方式写入数据,在同步刷盘时直接通过MappedByteBuffer
的force
方法将数据flush到磁盘文件即可。
异步刷盘
异步刷盘有开启暂存池和未开启两种情况。
开启暂存池
开启暂存池时,可以分为Commit和Flush两个阶段。
(1)Commit阶段
在开启暂存池时,数据会先写入缓冲区ByteBuffer
中,并未映射到CommitLog文件中,所以首先会唤醒Commit线程,将ByteBuffer
中的数据写入到CommitLog对应的FileChannel
中。
(2)Flush阶段
数据被写入FileChannel
之后,就会唤醒Flush线程,再调用FileChannel
的force方法将数据flush到磁盘。
未开启暂存池
未开启暂存池时使用文件映射的方式,直接唤醒Flush线程,调用MappedByteBuffer
的force
方法将数据flush到磁盘文件即可。
总结
通过上面分析消息的持久化过程,来看下RocketMQ提升性能的一些地方。
(1)RocketMQ在写入数据到CommitLog时,采用的是顺序写的方式,顺序写比随机写文件效率要高很多。
(2)在异步刷盘时,可以使用暂存池,暂存池会提前申请好内存,申请内存是一个比较重的操作,所以避免在消息写入时申请内存,以此提高效率。
(3)RocketMQ使用了MappedByteBuffer
文件映射的方式,向CommitLog写入数据,可以减少数据的拷贝过程。
参考