【RocketMQ】Message存儲筆記

概述

消息中間件存儲分爲三種,一是保存在內存中,速度快但會由於系統宕機等因素形成消息丟失;二是保存在內存中,同時定時將消息寫入DB中,好處是持久化消息,如何讀寫DB是MQ的瓶頸;三是內存+磁盤,定時將消息保存在磁盤中,如何設計好的存儲機制決定MQ的高併發、高可用。java

經過閱讀RocketMQ源碼,瞭解下列問題的解法:數組

  • RocketMQ如何設計存儲機制
  • 採用哪些技術保證存儲的高效性

存儲機制

消息存儲在文件中,須要有一個角色專門來管理對應的文件,MappedFile爲此而生。管理這些MappedFile的角色是MappedFileQueue,看作一個文件夾,維護CopyOnWriteArrayList<MappedFile> mappedFiles。併發

public class MappedFile {

    //記錄每次寫消息到內存以後的位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    //記錄每次提交到FileChannel以後的位置
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    //記錄刷新到物理文件以後的位置
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    //文件大小默認是1G
    protected int fileSize;
    //對應的文件NIO通道
    protected FileChannel fileChannel;
    //對應的文件
    private File file;
    //內存緩衝區,保存暫時寫入的消息
    protected ByteBuffer writeBuffer = null;
    protected MappedByteBuffer mappedByteBuffer = null;


    private void init(final String fileName, final int fileSize) throws IOException {
       
        this.fileFromOffset = Long.parseLong(this.file.getName());
        ensureDirOK(this.file.getParent());
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    }
}

MappedFile的名字(file.getName)是00000000000000000000、0000000000107374182四、00000000002147483648,fileName[n] = fileName[n - 1] + mappedFileSize。直接用起始偏移量記錄每一個文件名稱,00000000001073741824換算成大小=1G,即每一個文件名稱是該文件的startOffset。app

MappedFile提供三種做用:寫消息、提交消息到FileChannel、寫磁盤dom

一、AppendMessageResult appendMessagesInner(MessageExt messageExt, final AppendMessageCallback cb)異步

二、boolean appendMessage(final byte[] data, final int offset, final int length)函數

三、int commit(final int commitLeastPages) 高併發

四、int flush(final int flushLeastPages)性能

先看appendMessage操做this

MappedFile#appendMessage

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {

        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, messageExt);
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
       .......
    }

1.先獲取上一次寫入位置,從Buffer中取一個分區出來

2.設置buffer即將寫入的開始位置,即上一次寫入位置以後

3.由回調函數AppendMessageCallback負責消息寫入,該函數由CommitLog提供,邏輯是對Message作一些額外處理,如附加消息長度、時間戳等。具體以下:

第幾位 字段 說明 數據類型 字節數
1 MsgLen 消息總長度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 消息內容CRC Int 4
4 QueueId 消息隊列編號 Int 4
5 Flag flag Int 4
6 QueueOffset 消息隊列位置 Long 8
7 PhysicalOffset 物理位置。在 CommitLog 的順序存儲位置。 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成消息時間戳 Long 8
10 BornHost 生效消息的地址+端口 Long 8
11 StoreTimestamp 存儲消息時間戳 Long 8
12 StoreHost 存儲消息的地址+端口 Long 8
13 ReconsumeTimes 從新消費消息次數 Int 4
14 PreparedTransationOffset   Long 8
15 BodyLength + Body 內容長度 + 內容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic長度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展字段長度 + 拓展字段 Short + Bytes 2 + PropertiesLength

封裝後轉爲字節數組寫入到Buffer中便可。返回寫入長度告訴wrotePosition再偏移WroteBytes長度;因而可知,ByteBuffer針對是消息維度

commit操做

public int commit(final int commitLeastPages) {

        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                commit0(commitLeastPages);
                this.release();
            } 
        }
        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }
        return this.committedPosition.get();
}

protected void commit0(final int commitLeastPages) {
        int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
}
/**
 * 是否可以commit。知足以下條件任意條件:
 * 1. 映射文件已經寫滿
 * 2. commitLeastPages > 0 && 未commit部分超過commitLeastPages
 * 3. commitLeastPages = 0 && 有新寫入部分
 * @param commitLeastPages commit最小分頁
 * @return 是否可以寫入
 */
 protected boolean isAbleToCommit(final int commitLeastPages) {
        int flush = this.committedPosition.get();
        int write = this.wrotePosition.get();
        if (this.isFull()) {  //this.fileSize == this.wrotePosition.get()
            return true;
        }
        if (commitLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }
        return write > flush;
}

commit操做主要由上面三個方法構成,isAbleToCommit負責判斷可否寫入,每次寫入超過4KB(OS頁大小)。commit0把buffer中的內容(上次提交後的位置——最近一次寫入Buffer的位置)寫入到FileChannel中,更新committedPosition。commit操做主要針對FileChannel維度。

flush操做

public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
              
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
 }

刷新時isAbleToFlush思路和isAbletoCommit同樣,保證超過4KB。刷新到磁盤後更新flushedPosition,記錄物理文件的最後寫入位置。flush操做針對物理文件級別。

下面再來看下CommitLog如何操做commit && flush的

FlushCommitLogService类图

FlushCommitLogService繼承了ServiceThread-->Thread,所以異步執行。

線程服務 場景 插入消息性能
CommitRealTimeService 異步刷盤 && 開啓內存字節緩衝區 第一
FlushRealTimeService 異步刷盤 && 關閉內存字節緩衝區 第二
GroupCommitService 同步刷盤 第三

CommitRealTimeService定時調用mappedFileQueue.commit(commitDataLeastPages)執行提交。提交以後喚醒flushCommitLogService執行落盤。

【MappedFileQueue】
 public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = findMappedFileByOffset(committedWhere,committedWhere == 0);
        if (mappedFile != null) {
            int offset = mappedFile.commit(commitLeastPages);
            // 更新以後的位置,即下一次提交開始位置
            long where = mappedFile.getFileFromOffset() + offset;
            //若是不相等,說明有寫入,不然上一步操做offset是零,相加以後纔可能依然等於committedWhere
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
 }

首先findMappedFileByOffset找到要提交的文件,公式是 index (文件在集合中的下標)= (committedWhere-startOffset)/fileSize,committedWhere即要提交的位置,例如committedWhere = 4000,startOffset = 0,fileSize = 1024,那麼index = 3,從Queue中獲取第4個MappedFile,由它負責把自身的buffer提交到FileChannel。

FlushRealTimeService也是定時刷新內容到物理文件中,刷新成功後更新flushedWhere,主要步驟和commit類似。

public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }
相關文章
相關標籤/搜索