rocketmq在存储消息的时候,最终是通过mmap映射成磁盘文件进行存储的,本文就消息的存储流程作一个整理。源码版本是4.9.2
主要的存储组件有如下4个:
CommitLog:存储的业务层,接收“保存消息”的请求
MappedFile:存储的最底层对象,一个MappedFile对象就对应了一个实际的文件
MappedFileQueue:管理MappedFile的容器
AllocateMappedFileService:异步创建mappedFile的服务
对于rocketmq来说,存储消息的主要文件被称为CommitLog,因此就从该类入手。处理存储请求的入口方法是asyncPutMessage,主要流程如下:

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    ...
    //可能会有多个线程并发请求,虽然支持集群,但是对于每个单独的broker都是本地存储,所以内存锁就足够了
    putMessageLock.lock();
    try {
    	//获取最新的文件
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        ...
        //如果文件为空,或者已经存满,则创建一个新的commitlog文件
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        ...
        //调用底层的mappedFile进行出处,但是注意此时还没有刷盘
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        ...
    } finally {
        putMessageLock.unlock();
    }
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
	...
}

因此对于Commitlog.asyncPutMessage来说,主要的工作就是2步:
1.获取或者创建一个MappedFile
2.调用appendMessage进行存储

接下去我们先看MappedFile的创建,查看mappedFileQueue.getLastMappedFile方法,最终会调用到doCreateMappedFile方法,调用流如下:
getLastMappedFile-->tryCreateMappedFile-->doCreateMappedFile

protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
    MappedFile mappedFile = null;
	//如果异步服务对象不为空,那么就采用异步创建文件的方式
    if (this.allocateMappedFileService != null) {
        mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
    } else {
    //否则就同步创建
        try {
            mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
        } catch (IOException e) {
            log.error("create mappedFile exception", e);
        }
    }
    ...
    return mappedFile;
}

因此对于MappedFileQueue来说,主要工作就2步:
1.如果有异步服务,那么就异步创建mappedFile
2.否则就同步创建

接下去主要看异步创建的流程,查看allocateMappedFileService.putRequestAndReturnMappedFile

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    ...
    //创建mappedFile的请求,
    AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
    //将其放入ConcurrentHashMap中,主要用于并发判断,保证不会创建重复的mappedFile
    boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
	//如果map添加成功,就可以将request放入队列中,实际创建mappedFile的线程也是从该queue中获取request
    if (nextPutOK) {
        boolean offerOK = this.requestQueue.offer(nextReq);
    }

    AllocateRequest result = this.requestTable.get(nextFilePath);
    try {
        if (result != null) {
        	//因为是异步创建,所以这里需要await,等待mappedFile被异步创建成功
            boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
            //返回创建好的mappedFile
            return result.getMappedFile();
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
    }
    return null;
}

因此对于AllocateMappedFileService.putRequestAndReturnMappedFile,主要工作也是2步:
1.将“创建mappedFile”的请求放入队列中
2.等待异步线程实际创建完mappedFile

接下去看异步线程是如何具体创建mappedFile的。既然AllocateMappedFileService本身就是负责创建mappedFile的,并且其本身也实现了Runnable接口,我们查看其run方法,其中会调用mmapOperation,这就是最终执行创建mappedFile的方法

private boolean mmapOperation() {
    boolean isSuccess = false;
    AllocateRequest req = null;
    try {
        //从队列中拿request
        req = this.requestQueue.take();
        ...
        if (req.getMappedFile() == null) {
            MappedFile mappedFile;
            //判断是否采用堆外内存
            if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                try {
                    //如果开启了堆外内存,rocketmq允许外部注入自定义的MappedFile实现
                    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                    mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                } catch (RuntimeException e) {
                	//如果没有自定义实现,那么就采用默认的实现
                    log.warn("Use default implementation.");
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                }
            } else {
                //如果未采用堆外内存,那么就直接采用默认实现
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
            }
			...
            //这里会预热文件,这里涉及到了系统的底层调用
            mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                    this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
            req.setMappedFile(mappedFile);
        }
        ...
    } finally {
        if (req != null && isSuccess)
            //无论是否创建成功,都要唤醒putRequestAndReturnMappedFile方法中的等待线程
            req.getCountDownLatch().countDown();
    }
    return true;
}

因此对于mmapOperation创建mappedFile,主要工作为4步:
1.从队列中获取putRequestAndReturnMappedFile方法存放的request
2.根据是否启用对外内存,分支创建mappedFile
3.预热mappedFile
4.唤醒putRequestAndReturnMappedFile方法中的等待线程

接下去查看mappedFile内部的具体实现,我们可以发现在构造函数中,也会调用内部的init方法,这就是主要实现mmap的方法

private void init(final String fileName, final int fileSize) throws IOException {
    ...
    //创建文件对象
    this.file = new File(fileName);
    try {
        //获取fileChannel
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        //进行mmap操作,将磁盘空间映射到内存
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        ...
    } finally {
        ...
    }
}

因此对于init执行mmap,主要工作分为2步:
1.获取文件的fileChannel
2.执行mmap映射

而如果采用了堆外内存,那么除了上述的mmap操作,还会额外分配对外内存

this.writeBuffer = transientStorePool.borrowBuffer();

到这里,CommitLog.asyncPutMessage方法中的获取或创建mappedFile就完成了。

接下去需要查看消息具体是符合被写入文件中的。查看mappedFile的appendMessage方法,最终会调用到appendMessagesInner方法:

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
                                               PutMessageContext putMessageContext) {
    //如果是启用了对外内存,那么会优先写入对外内存,否则直接写入mmap内存
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result;
    ...
    //调用外部的callback执行实际的写入操作
    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
           (MessageExtBrokerInner) messageExt, putMessageContext);
    ...
    return result;
}

因此对于appendMessage方法,主要工作分为2步:
1.判断是否启用对外内存,从而选择对应的buffer对象
2.调用传入的callback方法进行实际写入

接下去查看外部传入的callback方法,是由CommitLog.asyncPutMessage传入

result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);

而this.appendMessageCallback则是在CommitLog的构造函数中初始化的

this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());

查看DefaultAppendMessageCallback.doAppend方法,因为本文不关心消息的具体结构,所以省略了大部分构造buffer的代码:

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
    ...
    //获取消息编码后的buffer
    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    ...
    //写入buffer中,如果启用了对外内存,那么就会写入外部传入的writerBuffer,否则直接写入mappedByteBuffer中
    byteBuffer.put(preEncodeBuffer);
    ...
    return result;
}

因此对于doAppend方法,主要工作分为2步:
1.将消息编码
2.将编码后的消息写入buffer中,可以是writerBuffer或者mappedByteBuffer

此时虽然字节流已经写入了buffer中,但是对于堆外内存,此时数据还仅存在于内存中,而对于mappedByteBuffer,虽然会有系统线程定时刷数据落盘,但是这并非我们可以控,因此也只能假设还未落盘。为了保证数据能落盘,rocketmq还有一个异步刷盘的线程,接下去再来看下异步刷盘是如何处理的。
查看CommitLog的构造函数,其中有3个service,分别负责同步刷盘、异步刷盘和堆外内存写入fileChannel

public CommitLog(final DefaultMessageStore defaultMessageStore) {
    ...
    //同步刷盘
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        //异步刷盘
        this.flushCommitLogService = new FlushRealTimeService();
    }
    //将对外内存的数据写入fileChannel
    this.commitLogService = new CommitRealTimeService();
    ...
}

先看CommitRealTimeService.run方法,其中最关键的代码如下:

boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);

查看mappedFileQueue.commit方法,关键如下:

int offset = mappedFile.commit(commitLeastPages);

查看mappedFile.commit方法:

public int commit(final int commitLeastPages) {
	//如果为空,说明不是堆外内存,就不需要任何操作,只需等待刷盘即可
    if (writeBuffer == null) {
        return this.wrotePosition.get();
    }
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            //如果是堆外内存,那么需要做commit
            commit0();
            this.release();
        }
        ...
    }
    return this.committedPosition.get();
}

查看commit0方法:

protected void commit0() {
    ...
    //获取堆外内存
    ByteBuffer byteBuffer = writeBuffer.slice();
    //写入fileChannel
    this.fileChannel.write(byteBuffer);
    ...
}

因此对于CommitRealTimeService,工作主要分2步:
1.判断是否是对外内存,如果不是那就不需要处理
2.如果是对外内存,则写入fileChannel

最后查看同步刷盘的GroupCommitService和异步刷盘FlushRealTimeService,查看其run方法,会发现其本质都是调用了如下方法:

CommitLog.this.mappedFileQueue.flush

当然在处理的逻辑上还有计算position等等逻辑,但这不是本文所关心的,所以就省略了。
同步和异步的区别体现在了执行刷盘操作的时间间隔,对于同步刷盘,固定间隔10ms:

this.waitForRunning(10);

而对于异步刷盘,时间间隔为配置值,默认500ms:

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
...
if (flushCommitLogTimed) {
    Thread.sleep(interval);
} else {
    this.waitForRunning(interval);
}

最后查看mappedFileQueue.flush是如何刷盘的。最终会调用到mappedFile的flush方法:

public int flush(final int flushLeastPages) {
	...
    //如果是使用了堆外内存,那么调用的是fileChannel的刷盘
    if (writeBuffer != null || this.fileChannel.position() != 0) {
        this.fileChannel.force(false);
    } else {
    //如果非堆外内存,那么调用的是mappedByteBuffer的刷盘
        this.mappedByteBuffer.force();
    }
    ...
    return this.getFlushedPosition();
}

因此最终的刷盘,工作主要分2步,正和前面的CommitRealTimeService工作对应:
1.如果是使用了堆外内存,那么调用fileChannel的刷盘
2.如果非堆外内存,那么调用mappedByteBuffer的刷盘

至此,整个rocketmq消息落盘的流程就完成了,接下去重新整理下整个流程:
1.CommitLog:存储的业务层,接收“保存消息”的请求,主要有2个功能:创建mappedFile、异步写入消息。
2.AllocateMappedFileService:异步创建mappedFile的服务,通过构建AllocateRequest对象和队列进行线程间的通讯。虽然MappedFile的实际创建是通过异步线程执行的,但是当前线程会等待创建完成后再返回,所以实际上是异步阻塞的。
3.MappedFile:存储的最底层对象,一个MappedFile对象就对应了一个实际的文件。在init方法中创建了fileChannel,并完成了mmap操作。如果启用了堆外内存,则会额外初始化writeBuffer,实现读写分离。
4.MappedFileQueue:管理MappedFile的容器。
5.写入消息的时候,会根据是否启用堆外内存,写入writeBuffer或者mappedByteBuffer。
6.实际落盘是通过异步的线程实现的,分为名义上的同步(GroupCommitService)和异步(FlushRealTimeService),不过主要区别在于执行落盘方法的时间间隔不同,最终都是调用mappedFile的flush方法
7.落盘会根据是否启用对外内存,分别调用fileChannel.force或者mappedByteBuffer.force

03-21 18:37