源碼分析 RocketMQ DLedger 多副本系列已經進行到第 8 篇了,前面的章節主要是介紹了基於 raft 協議的選主與日誌複製,從本篇開始將開始關注如何將 DLedger 應用到 RocketMQ中。java
> 摘要:詳細分析了RocketMQ DLedger 多副本(主從切換) 是如何整合到 RocketMQ中,本文的行文思路首先結合已掌握的DLedger 多副本相關的知識初步思考其實現思路,而後從 Broker啓動流程、DLedgerCommitlog 核心類的講解,再從消息發送(追加)與消息查找來進一步探討 DLedger 是如何支持平滑升級的。數據結構
RocketMQ 的消息存儲文件主要包括 commitlog 文件、consumequeue 文件與 Index 文件。commitlog 文件存儲全量的消息,consumequeue、index 文件都是基於 commitlog 文件構建的。要使用 DLedger 來實現消息存儲的一致性,應該關鍵是要實現 commitlog 文件的一致性,即 DLedger 要整合的對象應該是 commitlog 文件,即只需保證 raft 協議的複製組內各個節點的 commitlog 文件一致便可。併發
咱們知道使用文件存儲消息都會基於必定的存儲格式,rocketmq 的 commitlog 一個條目就包含魔數、消息長度,消息屬性、消息體等,而咱們再來回顧一下 DLedger 日誌的存儲格式: app
DLedger 要整合 commitlog 文件,是否是能夠把 rocketmq 消息,即一個個 commitlog 條目總體當成 DLedger 的 body 字段便可。eclipse
還等什麼,跟我一塊兒來看源碼吧!!!別急,再拋一個問題,DLedger 整合 RocketMQ commitlog,能不能作到平滑升級?ide
帶着這些思考和問題,一塊兒來探究 DLedger 是如何整合 RocketMQ 的。函數
> 舒適提示:本文不會詳細介紹 Broker 端的啓動流程,只會點出在啓動過程當中與 DLedger 相關的代碼,如想詳細瞭解 Broker 的啓動流程,建議關注筆者的《RocketMQ技術內幕》一書。源碼分析
Broker 涉及到 DLedger 相關關鍵點以下: this
DefaultMessageStore 構造方法.net
if(messageStoreConfig.isEnableDLegerCommitLog()) { // [@1](https://my.oschina.net/u/1198) this.commitLog = new DLedgerCommitLog(this); else { this.commitLog = new CommitLog(this); // @2 }
代碼@1:若是開啓 DLedger ,commitlog 的實現類爲 DLedgerCommitLog,也是本文須要關注的關鍵所在。
代碼@2:若是未開啓 DLedger,則使用舊版的 Commitlog實現類。
BrokerController#initialize
if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); }
主要調用 LedgerLeaderElector 的 addRoleChanneHandler 方法增長 節點角色變動事件監聽器,DLedgerRoleChangeHandler 是實現主從切換的另一個關鍵點。
DefaultMessageStore#load
// load Commit Log result = result && this.commitLog.load(); // [@1](https://my.oschina.net/u/1198) // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); this.recover(lastExitOK); // @2 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); }
代碼@一、@2 最終都是委託 commitlog 對象來執行,這裏的關鍵又是若是開啓了 DLedger,則最終調用的是 DLedgerCommitLog。
通過上面的鋪墊,主角 DLedgerCommitLog 「閃亮登場「了。
> 舒適提示:因爲 Commitlog 的絕大部分方法都已經在《RocketMQ技術內幕》一書中詳細介紹了,而且 DLedgerCommitLog 的實現原理與 Commitlog 文件的實現原理類同,本文會一筆帶過關於存儲部分的實現細節。
DLedgerCommitlog 繼承自 Commitlog。讓咱們一一來看一下它的核心屬性。
接下來咱們將詳細介紹 DLedgerCommitlog 各個核心方法及其實現要點。
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); // @1 dLedgerConfig = new DLedgerConfig(); dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); dLedgerConfig.setStoreType(DLedgerConfig.FILE); dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; // @2 dLedgerServer = new DLedgerServer(dLedgerConfig); // @3 dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { assert bodyOffset == DLedgerEntry.BODY_OFFSET; buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.putLong(entry.getPos() + bodyOffset); }; dLedgerFileStore.addAppendHook(appendHook); // @4 dLedgerFileList = dLedgerFileStore.getDataFileList(); this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); // @5 }
代碼@1:調用父類 即 CommitLog 的構造函數,加載 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 文件,以便兼容升級 DLedger 的消息。咱們稍微看一下 CommitLog 的構造函數:
代碼@2:構建 DLedgerConfig 相關配置屬性,其主要屬性以下:
代碼@3:根據 DLedger 配置信息建立 DLedgerServer,即建立 DLedger 集羣節點,集羣內各個節點啓動後,就會觸發選主。
代碼@4:構建 appendHook 追加鉤子函數,這是兼容 Commitlog 文件很關鍵的一步,後面會詳細介紹其做用。
代碼@5:構建消息序列化。
根據上述的流程圖,構建好 DefaultMessageStore 實現後,就是調用其 load 方法,在啓用 DLedger 機制後,會依次調用 DLedgerCommitlog 的 load、recover 方法。
public boolean load() { boolean result = super.load(); if (!result) { return false; } return true; }
DLedgerCommitLog 的 laod 方法實現比較簡單,就是調用 其父類 Commitlog 的 load 方法,即這裏也是爲了啓用 DLedger 時可以兼容之前的消息。
在 Broker 啓動時會加載 commitlog、consumequeue等文件,須要恢復其相關是數據結構,特別是與寫入、刷盤、提交等指針,其具體調用 recover 方法。 DLedgerCommitLog#recover
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // @1 recover(maxPhyOffsetOfConsumeQueue); }
首先會先恢復 consumequeue,得出 consumequeue 中記錄的最大有效物理偏移量,而後根據該物理偏移量進行恢復。 接下來看一下該方法的處理流程與關鍵點。
DLedgerCommitLog#recover
dLedgerFileStore.load();
Step1:加載 DLedger 相關的存儲文件,並一一構建對應的 MmapFile,其初始化三個重要的指針 wrotePosition、flushedPosition、committedPosition 三個指針爲文件的大小。
DLedgerCommitLog#recover
if (dLedgerFileList.getMappedFiles().size() > 0) { dLedgerFileStore.recover(); // @1 dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); // @2 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { // @3 disableDeleteDledger(); } long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); // Clear ConsumeQueue redundant data if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) { // @4 log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset); } return; }
Step2:若是已存在 DLedger 的數據文件,則只須要恢復 DLedger 相關數據文建,由於在加載舊的 commitlog 文件時已經將其重要的數據指針設置爲最大值。其關鍵實現點以下:
>舒適提示:爲何當存在 commitlog 文件的狀況下,不能刪除 DLedger 相關的日誌文件呢?
由於在此種狀況下,若是 DLedger 中的物理文件有刪除,則物理偏移量會斷層。
正常狀況下, maxCommitlogPhyOffset 與 dividedCommitlogOffset 是連續的,這樣很是方即是訪問 commitlog 仍是 訪問 DLedger ,但若是DLedger 部分文件刪除後,這兩個值就變的不連續,就會形成中間的文件空洞,沒法被連續訪問。
DLedgerCommitLog#recover
isInrecoveringOldCommitlog = true; super.recoverNormally(maxPhyOffsetOfConsumeQueue); isInrecoveringOldCommitlog = false;
Step3:若是啓用了 DLedger 而且是初次啓動(還未生成 DLedger 相關的日誌文件),則須要恢復 舊的 commitlog 文件。
DLedgerCommitLog#recover
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile == null) { // @1 return; } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(mappedFile.getWrotePosition()); boolean needWriteMagicCode = true; // 1 TOTAL SIZE byteBuffer.getInt(); //size int magicCode = byteBuffer.getInt(); if (magicCode == CommitLog.BLANK_MAGIC_CODE) { // @2 needWriteMagicCode = false; } else { log.info("Recover old commitlog found a illegal magic code={}", magicCode); } dLedgerConfig.setEnableDiskForceClean(false); dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); // @3 log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); if (needWriteMagicCode) { // @4 byteBuffer.position(mappedFile.getWrotePosition()); byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); byteBuffer.putInt(BLANK_MAGIC_CODE); mappedFile.flush(0); } mappedFile.setWrotePosition(mappedFile.getFileSize()); // @5 mappedFile.setCommittedPosition(mappedFile.getFileSize()); mappedFile.setFlushedPosition(mappedFile.getFileSize()); dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); }
Step4:若是存在舊的 commitlog 文件,須要將最後的文件剩餘部分所有填充,即再也不接受新的數據寫入,新的數據所有寫入到 DLedger 的數據文件中。其關鍵實現點以下:
在啓用 DLedger 機制時 Broker 的啓動流程就介紹到這裏了,相信你們已經瞭解 DLedger 在整合 RocketMQ 上作的努力,接下來咱們從消息追加、消息讀取兩個方面再來探討 DLedger 是如何無縫整合 RocketMQ 的,實現平滑升級的。
> 舒適提示:本節一樣也不會詳細介紹整個消息追加(存儲流程),只是要點出與 DLedger(多副本、主從切換)相關的核心關鍵點。若是想詳細瞭解消息追加的流程,能夠閱讀筆者所著的《RocketMQ技術內幕》一書。
DLedgerCommitLog#putMessage
AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBody(encodeResult.data); dledgerFuture = (AppendFuture<appendentryresponse>) dLedgerServer.handleAppend(request); if (dledgerFuture.getPos() == -1) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); }
關鍵點一:消息追加時,則再也不寫入到原先的 commitlog 文件中,而是調用 DLedgerServer 的 handleAppend 進行消息追加,該方法會有集羣內的 Leader 節點負責消息追加以及在消息複製,只有超過集羣內的半數節點成功寫入消息後,纔會返回寫入成功。若是追加成功,將會返回本次追加成功後的起始偏移量,即 pos 屬性,即相似於 rocketmq 中 commitlog 的偏移量,即物理偏移量。
DLedgerCommitLog#putMessage
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);
關鍵點二:根據 DLedger 的起始偏移量計算真正的消息的物理偏移量,從開頭部分得知,DLedger 自身有其存儲協議,其 body 字段存儲真實的消息,即 commitlog 條目的存儲結構,返回給客戶端的消息偏移量爲 body 字段的開始偏移量,即經過 putMessage 返回的物理偏移量與不使用Dledger 方式返回的物理偏移量的含義是同樣的,即從開偏移量開始,能夠正確讀取消息,這樣 DLedger 完美的兼容了 RocketMQ Commitlog。關於 pos 以及 wroteOffset 的圖解以下:
DLedgerCommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) { if (offset < dividedCommitlogOffset) { // @1 return super.getMessage(offset, size); } int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); // @2 if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return convertSbr(mappedFile.selectMappedBuffer(pos, size)); // @3 } return null; }
消息查找比較簡單,由於返回給客戶端消息,轉發給 consumequeue 的消息物理偏移量並非 DLedger 條目的偏移量,而是真實消息的起始偏移量。其實現關鍵點以下:
根據上面詳細的介紹,我想讀者朋友們應該不可貴出以下結論:
RocketMQ 整合 DLedger(多副本)實現平滑升級的設計技巧就介紹到這裏了。
若是本文對您有必定的幫助話,麻煩幫忙點個贊,很是感謝。
> 做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,重點關注JAVA集合、JAVA併發包、Netty、Dubbo、RocketMQ、Mybatis、Elasticsearch、Netty。可掃描以下二維碼與做者進行互動。
</appendentryresponse>