消息中間件存儲分爲三種,一是保存在內存中,速度快但會由於系統宕機等因素形成消息丟失;二是保存在內存中,同時定時將消息寫入DB中,好處是持久化消息,如何讀寫DB是MQ的瓶頸;三是內存+磁盤,定時將消息保存在磁盤中,如何設計好的存儲機制決定MQ的高併發、高可用。java
經過閱讀RocketMQ源碼,瞭解下列問題的解法:數組
消息存儲在文件中,須要有一個角色專門來管理對應的文件,MappedFile爲此而生。管理這些MappedFile的角色是MappedFileQueue,看作一個文件夾,維護CopyOnWriteArrayList<MappedFile> mappedFiles。併發
public class MappedFile { //記錄每次寫消息到內存以後的位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); //記錄每次提交到FileChannel以後的位置 protected final AtomicInteger committedPosition = new AtomicInteger(0); //記錄刷新到物理文件以後的位置 private final AtomicInteger flushedPosition = new AtomicInteger(0); //文件大小默認是1G protected int fileSize; //對應的文件NIO通道 protected FileChannel fileChannel; //對應的文件 private File file; //內存緩衝區,保存暫時寫入的消息 protected ByteBuffer writeBuffer = null; protected MappedByteBuffer mappedByteBuffer = null; private void init(final String fileName, final int fileSize) throws IOException { this.fileFromOffset = Long.parseLong(this.file.getName()); ensureDirOK(this.file.getParent()); this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); } }
MappedFile的名字(file.getName)是00000000000000000000、0000000000107374182四、00000000002147483648,fileName[n] = fileName[n - 1] + mappedFileSize。直接用起始偏移量記錄每一個文件名稱,00000000001073741824換算成大小=1G,即每一個文件名稱是該文件的startOffset。app
MappedFile提供三種做用:寫消息、提交消息到FileChannel、寫磁盤dom
一、AppendMessageResult appendMessagesInner(MessageExt messageExt, final AppendMessageCallback cb)異步
二、boolean appendMessage(final byte[] data, final int offset, final int length)函數
三、int commit(final int commitLeastPages) 高併發
四、int flush(final int flushLeastPages)性能
先看appendMessage操做this
MappedFile#appendMessage public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, messageExt); this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } ....... }
1.先獲取上一次寫入位置,從Buffer中取一個分區出來
2.設置buffer即將寫入的開始位置,即上一次寫入位置以後
3.由回調函數AppendMessageCallback負責消息寫入,該函數由CommitLog提供,邏輯是對Message作一些額外處理,如附加消息長度、時間戳等。具體以下:
第幾位 | 字段 | 說明 | 數據類型 | 字節數 |
---|---|---|---|---|
1 | MsgLen | 消息總長度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息內容CRC | Int | 4 |
4 | QueueId | 消息隊列編號 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息隊列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog 的順序存儲位置。 |
Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息時間戳 | Long | 8 |
10 | BornHost | 生效消息的地址+端口 | Long | 8 |
11 | StoreTimestamp | 存儲消息時間戳 | Long | 8 |
12 | StoreHost | 存儲消息的地址+端口 | Long | 8 |
13 | ReconsumeTimes | 從新消費消息次數 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 內容長度 + 內容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic長度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段長度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
封裝後轉爲字節數組寫入到Buffer中便可。返回寫入長度告訴wrotePosition再偏移WroteBytes長度;因而可知,ByteBuffer針對是消息維度
commit操做
public int commit(final int commitLeastPages) { if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } return this.committedPosition.get(); } protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } } /** * 是否可以commit。知足以下條件任意條件: * 1. 映射文件已經寫滿 * 2. commitLeastPages > 0 && 未commit部分超過commitLeastPages * 3. commitLeastPages = 0 && 有新寫入部分 * @param commitLeastPages commit最小分頁 * @return 是否可以寫入 */ protected boolean isAbleToCommit(final int commitLeastPages) { int flush = this.committedPosition.get(); int write = this.wrotePosition.get(); if (this.isFull()) { //this.fileSize == this.wrotePosition.get() return true; } if (commitLeastPages > 0) { return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } return write > flush; }
commit操做主要由上面三個方法構成,isAbleToCommit負責判斷可否寫入,每次寫入超過4KB(OS頁大小)。commit0把buffer中的內容(上次提交後的位置——最近一次寫入Buffer的位置)寫入到FileChannel中,更新committedPosition。commit操做主要針對FileChannel維度。
flush操做
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
刷新時isAbleToFlush思路和isAbletoCommit同樣,保證超過4KB。刷新到磁盤後更新flushedPosition,記錄物理文件的最後寫入位置。flush操做針對物理文件級別。
下面再來看下CommitLog如何操做commit && flush的
FlushCommitLogService繼承了ServiceThread-->Thread,所以異步執行。
線程服務 | 場景 | 插入消息性能 |
---|---|---|
CommitRealTimeService | 異步刷盤 && 開啓內存字節緩衝區 | 第一 |
FlushRealTimeService | 異步刷盤 && 關閉內存字節緩衝區 | 第二 |
GroupCommitService | 同步刷盤 | 第三 |
CommitRealTimeService定時調用mappedFileQueue.commit(commitDataLeastPages)執行提交。提交以後喚醒flushCommitLogService執行落盤。
【MappedFileQueue】 public boolean commit(final int commitLeastPages) { boolean result = true; MappedFile mappedFile = findMappedFileByOffset(committedWhere,committedWhere == 0); if (mappedFile != null) { int offset = mappedFile.commit(commitLeastPages); // 更新以後的位置,即下一次提交開始位置 long where = mappedFile.getFileFromOffset() + offset; //若是不相等,說明有寫入,不然上一步操做offset是零,相加以後纔可能依然等於committedWhere result = where == this.committedWhere; this.committedWhere = where; } return result; }
首先findMappedFileByOffset找到要提交的文件,公式是 index (文件在集合中的下標)= (committedWhere-startOffset)/fileSize,committedWhere即要提交的位置,例如committedWhere = 4000,startOffset = 0,fileSize = 1024,那麼index = 3,從Queue中獲取第4個MappedFile,由它負責把自身的buffer提交到FileChannel。
FlushRealTimeService也是定時刷新內容到物理文件中,刷新成功後更新flushedWhere,主要步驟和commit類似。
public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }