rocketmq在存储消息的时候,最终是通过mmap映射成磁盘文件进行存储的,本文就消息的存储流程作一个整理。源码版本是4.9.2
主要的存储组件有如下4个:
CommitLog:存储的业务层,接收“保存消息”的请求
MappedFile:存储的最底层对象,一个MappedFile对象就对应了一个实际的文件
MappedFileQueue:管理MappedFile的容器
AllocateMappedFileService:异步创建mappedFile的服务
对于rocketmq来说,存储消息的主要文件被称为CommitLog,因此就从该类入手。处理存储请求的入口方法是asyncPutMessage,主要流程如下:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
...
//可能会有多个线程并发请求,虽然支持集群,但是对于每个单独的broker都是本地存储,所以内存锁就足够了
putMessageLock.lock();
try {
//获取最新的文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
...
//如果文件为空,或者已经存满,则创建一个新的commitlog文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
...
//调用底层的mappedFile进行出处,但是注意此时还没有刷盘
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
...
} finally {
putMessageLock.unlock();
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
...
}
因此对于Commitlog.asyncPutMessage来说,主要的工作就是2步:
1.获取或者创建一个MappedFile
2.调用appendMessage进行存储
接下去我们先看MappedFile的创建,查看mappedFileQueue.getLastMappedFile方法,最终会调用到doCreateMappedFile方法,调用流如下:
getLastMappedFile-->tryCreateMappedFile-->doCreateMappedFile
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
//如果异步服务对象不为空,那么就采用异步创建文件的方式
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
//否则就同步创建
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
...
return mappedFile;
}
因此对于MappedFileQueue来说,主要工作就2步:
1.如果有异步服务,那么就异步创建mappedFile
2.否则就同步创建
接下去主要看异步创建的流程,查看allocateMappedFileService.putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
...
//创建mappedFile的请求,
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
//将其放入ConcurrentHashMap中,主要用于并发判断,保证不会创建重复的mappedFile
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//如果map添加成功,就可以将request放入队列中,实际创建mappedFile的线程也是从该queue中获取request
if (nextPutOK) {
boolean offerOK = this.requestQueue.offer(nextReq);
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
//因为是异步创建,所以这里需要await,等待mappedFile被异步创建成功
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
//返回创建好的mappedFile
return result.getMappedFile();
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
因此对于AllocateMappedFileService.putRequestAndReturnMappedFile,主要工作也是2步:
1.将“创建mappedFile”的请求放入队列中
2.等待异步线程实际创建完mappedFile
接下去看异步线程是如何具体创建mappedFile的。既然AllocateMappedFileService本身就是负责创建mappedFile的,并且其本身也实现了Runnable接口,我们查看其run方法,其中会调用mmapOperation,这就是最终执行创建mappedFile的方法
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
//从队列中拿request
req = this.requestQueue.take();
...
if (req.getMappedFile() == null) {
MappedFile mappedFile;
//判断是否采用堆外内存
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
//如果开启了堆外内存,rocketmq允许外部注入自定义的MappedFile实现
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
//如果没有自定义实现,那么就采用默认的实现
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
//如果未采用堆外内存,那么就直接采用默认实现
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
...
//这里会预热文件,这里涉及到了系统的底层调用
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
req.setMappedFile(mappedFile);
}
...
} finally {
if (req != null && isSuccess)
//无论是否创建成功,都要唤醒putRequestAndReturnMappedFile方法中的等待线程
req.getCountDownLatch().countDown();
}
return true;
}
因此对于mmapOperation创建mappedFile,主要工作为4步:
1.从队列中获取putRequestAndReturnMappedFile方法存放的request
2.根据是否启用对外内存,分支创建mappedFile
3.预热mappedFile
4.唤醒putRequestAndReturnMappedFile方法中的等待线程
接下去查看mappedFile内部的具体实现,我们可以发现在构造函数中,也会调用内部的init方法,这就是主要实现mmap的方法
private void init(final String fileName, final int fileSize) throws IOException {
...
//创建文件对象
this.file = new File(fileName);
try {
//获取fileChannel
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//进行mmap操作,将磁盘空间映射到内存
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
...
} finally {
...
}
}
因此对于init执行mmap,主要工作分为2步:
1.获取文件的fileChannel
2.执行mmap映射
而如果采用了堆外内存,那么除了上述的mmap操作,还会额外分配对外内存
this.writeBuffer = transientStorePool.borrowBuffer();
到这里,CommitLog.asyncPutMessage方法中的获取或创建mappedFile就完成了。
接下去需要查看消息具体是符合被写入文件中的。查看mappedFile的appendMessage方法,最终会调用到appendMessagesInner方法:
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
//如果是启用了对外内存,那么会优先写入对外内存,否则直接写入mmap内存
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
...
//调用外部的callback执行实际的写入操作
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
...
return result;
}
因此对于appendMessage方法,主要工作分为2步:
1.判断是否启用对外内存,从而选择对应的buffer对象
2.调用传入的callback方法进行实际写入
接下去查看外部传入的callback方法,是由CommitLog.asyncPutMessage传入
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
而this.appendMessageCallback则是在CommitLog的构造函数中初始化的
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
查看DefaultAppendMessageCallback.doAppend方法,因为本文不关心消息的具体结构,所以省略了大部分构造buffer的代码:
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
...
//获取消息编码后的buffer
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
...
//写入buffer中,如果启用了对外内存,那么就会写入外部传入的writerBuffer,否则直接写入mappedByteBuffer中
byteBuffer.put(preEncodeBuffer);
...
return result;
}
因此对于doAppend方法,主要工作分为2步:
1.将消息编码
2.将编码后的消息写入buffer中,可以是writerBuffer或者mappedByteBuffer
此时虽然字节流已经写入了buffer中,但是对于堆外内存,此时数据还仅存在于内存中,而对于mappedByteBuffer,虽然会有系统线程定时刷数据落盘,但是这并非我们可以控,因此也只能假设还未落盘。为了保证数据能落盘,rocketmq还有一个异步刷盘的线程,接下去再来看下异步刷盘是如何处理的。
查看CommitLog的构造函数,其中有3个service,分别负责同步刷盘、异步刷盘和堆外内存写入fileChannel
public CommitLog(final DefaultMessageStore defaultMessageStore) {
...
//同步刷盘
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
//异步刷盘
this.flushCommitLogService = new FlushRealTimeService();
}
//将对外内存的数据写入fileChannel
this.commitLogService = new CommitRealTimeService();
...
}
先看CommitRealTimeService.run方法,其中最关键的代码如下:
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
查看mappedFileQueue.commit方法,关键如下:
int offset = mappedFile.commit(commitLeastPages);
查看mappedFile.commit方法:
public int commit(final int commitLeastPages) {
//如果为空,说明不是堆外内存,就不需要任何操作,只需等待刷盘即可
if (writeBuffer == null) {
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
//如果是堆外内存,那么需要做commit
commit0();
this.release();
}
...
}
return this.committedPosition.get();
}
查看commit0方法:
protected void commit0() {
...
//获取堆外内存
ByteBuffer byteBuffer = writeBuffer.slice();
//写入fileChannel
this.fileChannel.write(byteBuffer);
...
}
因此对于CommitRealTimeService,工作主要分2步:
1.判断是否是对外内存,如果不是那就不需要处理
2.如果是对外内存,则写入fileChannel
最后查看同步刷盘的GroupCommitService和异步刷盘FlushRealTimeService,查看其run方法,会发现其本质都是调用了如下方法:
CommitLog.this.mappedFileQueue.flush
当然在处理的逻辑上还有计算position等等逻辑,但这不是本文所关心的,所以就省略了。
同步和异步的区别体现在了执行刷盘操作的时间间隔,对于同步刷盘,固定间隔10ms:
this.waitForRunning(10);
而对于异步刷盘,时间间隔为配置值,默认500ms:
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
...
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
最后查看mappedFileQueue.flush是如何刷盘的。最终会调用到mappedFile的flush方法:
public int flush(final int flushLeastPages) {
...
//如果是使用了堆外内存,那么调用的是fileChannel的刷盘
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
//如果非堆外内存,那么调用的是mappedByteBuffer的刷盘
this.mappedByteBuffer.force();
}
...
return this.getFlushedPosition();
}
因此最终的刷盘,工作主要分2步,正和前面的CommitRealTimeService工作对应:
1.如果是使用了堆外内存,那么调用fileChannel的刷盘
2.如果非堆外内存,那么调用mappedByteBuffer的刷盘
至此,整个rocketmq消息落盘的流程就完成了,接下去重新整理下整个流程:
1.CommitLog:存储的业务层,接收“保存消息”的请求,主要有2个功能:创建mappedFile、异步写入消息。
2.AllocateMappedFileService:异步创建mappedFile的服务,通过构建AllocateRequest对象和队列进行线程间的通讯。虽然MappedFile的实际创建是通过异步线程执行的,但是当前线程会等待创建完成后再返回,所以实际上是异步阻塞的。
3.MappedFile:存储的最底层对象,一个MappedFile对象就对应了一个实际的文件。在init方法中创建了fileChannel,并完成了mmap操作。如果启用了堆外内存,则会额外初始化writeBuffer,实现读写分离。
4.MappedFileQueue:管理MappedFile的容器。
5.写入消息的时候,会根据是否启用堆外内存,写入writeBuffer或者mappedByteBuffer。
6.实际落盘是通过异步的线程实现的,分为名义上的同步(GroupCommitService)和异步(FlushRealTimeService),不过主要区别在于执行落盘方法的时间间隔不同,最终都是调用mappedFile的flush方法
7.落盘会根据是否启用对外内存,分别调用fileChannel.force或者mappedByteBuffer.force