目錄java
源碼分析 RocketMQ DLedger 多副本系列已經進行到第 8 篇了,前面的章節主要是介紹了基於 raft 協議的選主與日誌複製,從本篇開始將開始關注如何將 DLedger 應用到 RocketMQ中。數據結構
摘要:詳細分析了RocketMQ DLedger 多副本(主從切換) 是如何整合到 RocketMQ中,本文的行文思路首先結合已掌握的DLedger 多副本相關的知識初步思考其實現思路,而後從 Broker啓動流程、DLedgerCommitlog 核心類的講解,再從消息發送(追加)與消息查找來進一步探討 DLedger 是如何支持平滑升級的。架構
@(本節目錄)併發
RocketMQ 的消息存儲文件主要包括 commitlog 文件、consumequeue 文件與 Index 文件。commitlog 文件存儲全量的消息,consumequeue、index 文件都是基於 commitlog 文件構建的。要使用 DLedger 來實現消息存儲的一致性,應該關鍵是要實現 commitlog 文件的一致性,即 DLedger 要整合的對象應該是 commitlog 文件,即只需保證 raft 協議的複製組內各個節點的 commitlog 文件一致便可。app
咱們知道使用文件存儲消息都會基於必定的存儲格式,rocketmq 的 commitlog 一個條目就包含魔數、消息長度,消息屬性、消息體等,而咱們再來回顧一下 DLedger 日誌的存儲格式:
DLedger 要整合 commitlog 文件,是否是能夠把 rocketmq 消息,即一個個 commitlog 條目總體當成 DLedger 的 body 字段便可。eclipse
還等什麼,跟我一塊兒來看源碼吧!!!別急,再拋一個問題,DLedger 整合 RocketMQ commitlog,能不能作到平滑升級?分佈式
帶着這些思考和問題,一塊兒來探究 DLedger 是如何整合 RocketMQ 的。ide
舒適提示:本文不會詳細介紹 Broker 端的啓動流程,只會點出在啓動過程當中與 DLedger 相關的代碼,如想詳細瞭解 Broker 的啓動流程,建議關注筆者的《RocketMQ技術內幕》一書。函數
Broker 涉及到 DLedger 相關關鍵點以下:
高併發
DefaultMessageStore 構造方法
if(messageStoreConfig.isEnableDLegerCommitLog()) { // @1 this.commitLog = new DLedgerCommitLog(this); else { this.commitLog = new CommitLog(this); // @2 }
代碼@1:若是開啓 DLedger ,commitlog 的實現類爲 DLedgerCommitLog,也是本文須要關注的關鍵所在。
代碼@2:若是未開啓 DLedger,則使用舊版的 Commitlog實現類。
BrokerController#initialize
if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); }
主要調用 LedgerLeaderElector 的 addRoleChanneHandler 方法增長 節點角色變動事件監聽器,DLedgerRoleChangeHandler 是實現主從切換的另一個關鍵點。
DefaultMessageStore#load
// load Commit Log result = result && this.commitLog.load(); // @1 // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); this.recover(lastExitOK); // @2 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); }
代碼@一、@2 最終都是委託 commitlog 對象來執行,這裏的關鍵又是若是開啓了 DLedger,則最終調用的是 DLedgerCommitLog。
通過上面的鋪墊,主角 DLedgerCommitLog 「閃亮登場「了。
舒適提示:因爲 Commitlog 的絕大部分方法都已經在《RocketMQ技術內幕》一書中詳細介紹了,而且 DLedgerCommitLog 的實現原理與 Commitlog 文件的實現原理類同,本文會一筆帶過關於存儲部分的實現細節。
DLedgerCommitlog 繼承自 Commitlog。讓咱們一一來看一下它的核心屬性。
接下來咱們將詳細介紹 DLedgerCommitlog 各個核心方法及其實現要點。
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); // @1 dLedgerConfig = new DLedgerConfig(); dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); dLedgerConfig.setStoreType(DLedgerConfig.FILE); dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; // @2 dLedgerServer = new DLedgerServer(dLedgerConfig); // @3 dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { assert bodyOffset == DLedgerEntry.BODY_OFFSET; buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.putLong(entry.getPos() + bodyOffset); }; dLedgerFileStore.addAppendHook(appendHook); // @4 dLedgerFileList = dLedgerFileStore.getDataFileList(); this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); // @5 }
代碼@1:調用父類 即 CommitLog 的構造函數,加載 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 文件,以便兼容升級 DLedger 的消息。咱們稍微看一下 CommitLog 的構造函數:
代碼@2:構建 DLedgerConfig 相關配置屬性,其主要屬性以下:
代碼@3:根據 DLedger 配置信息建立 DLedgerServer,即建立 DLedger 集羣節點,集羣內各個節點啓動後,就會觸發選主。
代碼@4:構建 appendHook 追加鉤子函數,這是兼容 Commitlog 文件很關鍵的一步,後面會詳細介紹其做用。
代碼@5:構建消息序列化。
根據上述的流程圖,構建好 DefaultMessageStore 實現後,就是調用其 load 方法,在啓用 DLedger 機制後,會依次調用 DLedgerCommitlog 的 load、recover 方法。
public boolean load() { boolean result = super.load(); if (!result) { return false; } return true; }
DLedgerCommitLog 的 laod 方法實現比較簡單,就是調用 其父類 Commitlog 的 load 方法,即這裏也是爲了啓用 DLedger 時可以兼容之前的消息。
在 Broker 啓動時會加載 commitlog、consumequeue等文件,須要恢復其相關是數據結構,特別是與寫入、刷盤、提交等指針,其具體調用 recover 方法。
DLedgerCommitLog#recover
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // @1 recover(maxPhyOffsetOfConsumeQueue); }
首先會先恢復 consumequeue,得出 consumequeue 中記錄的最大有效物理偏移量,而後根據該物理偏移量進行恢復。
接下來看一下該方法的處理流程與關鍵點。
DLedgerCommitLog#recover
dLedgerFileStore.load();
Step1:加載 DLedger 相關的存儲文件,並一一構建對應的 MmapFile,其初始化三個重要的指針 wrotePosition、flushedPosition、committedPosition 三個指針爲文件的大小。
DLedgerCommitLog#recover
if (dLedgerFileList.getMappedFiles().size() > 0) { dLedgerFileStore.recover(); // @1 dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); // @2 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { // @3 disableDeleteDledger(); } long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); // Clear ConsumeQueue redundant data if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) { // @4 log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset); } return; }
Step2:若是已存在 DLedger 的數據文件,則只須要恢復 DLedger 相關數據文建,由於在加載舊的 commitlog 文件時已經將其重要的數據指針設置爲最大值。其關鍵實現點以下:
舒適提示:爲何當存在 commitlog 文件的狀況下,不能刪除 DLedger 相關的日誌文件呢?
由於在此種狀況下,若是 DLedger 中的物理文件有刪除,則物理偏移量會斷層。
正常狀況下, maxCommitlogPhyOffset 與 dividedCommitlogOffset 是連續的,這樣很是方即是訪問 commitlog 仍是 訪問 DLedger ,但若是DLedger 部分文件刪除後,這兩個值就變的不連續,就會形成中間的文件空洞,沒法被連續訪問。
DLedgerCommitLog#recover
isInrecoveringOldCommitlog = true; super.recoverNormally(maxPhyOffsetOfConsumeQueue); isInrecoveringOldCommitlog = false;
Step3:若是啓用了 DLedger 而且是初次啓動(還未生成 DLedger 相關的日誌文件),則須要恢復 舊的 commitlog 文件。
DLedgerCommitLog#recover
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile == null) { // @1 return; } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(mappedFile.getWrotePosition()); boolean needWriteMagicCode = true; // 1 TOTAL SIZE byteBuffer.getInt(); //size int magicCode = byteBuffer.getInt(); if (magicCode == CommitLog.BLANK_MAGIC_CODE) { // @2 needWriteMagicCode = false; } else { log.info("Recover old commitlog found a illegal magic code={}", magicCode); } dLedgerConfig.setEnableDiskForceClean(false); dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); // @3 log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); if (needWriteMagicCode) { // @4 byteBuffer.position(mappedFile.getWrotePosition()); byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); byteBuffer.putInt(BLANK_MAGIC_CODE); mappedFile.flush(0); } mappedFile.setWrotePosition(mappedFile.getFileSize()); // @5 mappedFile.setCommittedPosition(mappedFile.getFileSize()); mappedFile.setFlushedPosition(mappedFile.getFileSize()); dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); }
Step4:若是存在舊的 commitlog 文件,須要將最後的文件剩餘部分所有填充,即再也不接受新的數據寫入,新的數據所有寫入到 DLedger 的數據文件中。其關鍵實現點以下:
在啓用 DLedger 機制時 Broker 的啓動流程就介紹到這裏了,相信你們已經瞭解 DLedger 在整合 RocketMQ 上作的努力,接下來咱們從消息追加、消息讀取兩個方面再來探討 DLedger 是如何無縫整合 RocketMQ 的,實現平滑升級的。
舒適提示:本節一樣也不會詳細介紹整個消息追加(存儲流程),只是要點出與 DLedger(多副本、主從切換)相關的核心關鍵點。若是想詳細瞭解消息追加的流程,能夠閱讀筆者所著的《RocketMQ技術內幕》一書。
DLedgerCommitLog#putMessage
AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBody(encodeResult.data); dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); if (dledgerFuture.getPos() == -1) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); }
關鍵點一:消息追加時,則再也不寫入到原先的 commitlog 文件中,而是調用 DLedgerServer 的 handleAppend 進行消息追加,該方法會有集羣內的 Leader 節點負責消息追加以及在消息複製,只有超過集羣內的半數節點成功寫入消息後,纔會返回寫入成功。若是追加成功,將會返回本次追加成功後的起始偏移量,即 pos 屬性,即相似於 rocketmq 中 commitlog 的偏移量,即物理偏移量。
DLedgerCommitLog#putMessage
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);
關鍵點二:根據 DLedger 的起始偏移量計算真正的消息的物理偏移量,從開頭部分得知,DLedger 自身有其存儲協議,其 body 字段存儲真實的消息,即 commitlog 條目的存儲結構,返回給客戶端的消息偏移量爲 body 字段的開始偏移量,即經過 putMessage 返回的物理偏移量與不使用Dledger 方式返回的物理偏移量的含義是同樣的,即從開偏移量開始,能夠正確讀取消息,這樣 DLedger 完美的兼容了 RocketMQ Commitlog。關於 pos 以及 wroteOffset 的圖解以下:
DLedgerCommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) { if (offset < dividedCommitlogOffset) { // @1 return super.getMessage(offset, size); } int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); // @2 if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return convertSbr(mappedFile.selectMappedBuffer(pos, size)); // @3 } return null; }
消息查找比較簡單,由於返回給客戶端消息,轉發給 consumequeue 的消息物理偏移量並非 DLedger 條目的偏移量,而是真實消息的起始偏移量。其實現關鍵點以下:
根據上面詳細的介紹,我想讀者朋友們應該不可貴出以下結論:
RocketMQ 整合 DLedger(多副本)實現平滑升級的設計技巧就介紹到這裏了。
若是本文對您有必定的幫助話,麻煩幫忙點個贊,很是感謝。
推薦閱讀:源碼分析 RocketMQ DLedger 多副本即主從切換系列文章:
一、RocketMQ 多副本前置篇:初探raft協議
二、源碼分析 RocketMQ DLedger 多副本之 Leader 選主
三、源碼分析 RocketMQ DLedger 多副本存儲實現
四、源碼分析 RocketMQ DLedger(多副本) 之日誌追加流程
五、源碼分析 RocketMQ DLedger(多副本) 之日誌複製(傳播)
六、基於 raft 協議的 RocketMQ DLedger 多副本日誌複製設計原理
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接加入中間件知識星球 ,一塊兒探討高併發、分佈式服務架構,交流源碼。