RocketMQ 整合 DLedger(多副本)即主從切換實現平滑升級的設計技巧

源碼分析 RocketMQ DLedger 多副本系列已經進行到第 8 篇了,前面的章節主要是介紹了基於 raft 協議的選主與日誌複製,從本篇開始將開始關注如何將 DLedger 應用到 RocketMQ中。java

> 摘要:詳細分析了RocketMQ DLedger 多副本(主從切換) 是如何整合到 RocketMQ中,本文的行文思路首先結合已掌握的DLedger 多副本相關的知識初步思考其實現思路,而後從 Broker啓動流程、DLedgerCommitlog 核心類的講解,再從消息發送(追加)與消息查找來進一步探討 DLedger 是如何支持平滑升級的。數據結構

一、閱讀源碼以前的思考

RocketMQ 的消息存儲文件主要包括 commitlog 文件、consumequeue 文件與 Index 文件。commitlog 文件存儲全量的消息,consumequeue、index 文件都是基於 commitlog 文件構建的。要使用 DLedger 來實現消息存儲的一致性,應該關鍵是要實現 commitlog 文件的一致性,即 DLedger 要整合的對象應該是 commitlog 文件,即只需保證 raft 協議的複製組內各個節點的 commitlog 文件一致便可。併發

咱們知道使用文件存儲消息都會基於必定的存儲格式,rocketmq 的 commitlog 一個條目就包含魔數、消息長度,消息屬性、消息體等,而咱們再來回顧一下 DLedger 日誌的存儲格式: 在這裏插入圖片描述app

DLedger 要整合 commitlog 文件,是否是能夠把 rocketmq 消息,即一個個 commitlog 條目總體當成 DLedger 的 body 字段便可。eclipse

還等什麼,跟我一塊兒來看源碼吧!!!別急,再拋一個問題,DLedger 整合 RocketMQ commitlog,能不能作到平滑升級?ide

帶着這些思考和問題,一塊兒來探究 DLedger 是如何整合 RocketMQ 的。函數

二、從 Broker 啓動流程看 DLedger

> 舒適提示:本文不會詳細介紹 Broker 端的啓動流程,只會點出在啓動過程當中與 DLedger 相關的代碼,如想詳細瞭解 Broker 的啓動流程,建議關注筆者的《RocketMQ技術內幕》一書。源碼分析

Broker 涉及到 DLedger 相關關鍵點以下: 在這裏插入圖片描述this

2.1 構建 DefaultMessageStore

DefaultMessageStore 構造方法.net

if(messageStoreConfig.isEnableDLegerCommitLog()) {  // [@1](https://my.oschina.net/u/1198)
    this.commitLog = new DLedgerCommitLog(this);
 else {
    this.commitLog = new CommitLog(this);                    // @2
}

代碼@1:若是開啓 DLedger ,commitlog 的實現類爲 DLedgerCommitLog,也是本文須要關注的關鍵所在。

代碼@2:若是未開啓 DLedger,則使用舊版的 Commitlog實現類。

2.2 增長節點狀態變動事件監聽器

BrokerController#initialize

if (messageStoreConfig.isEnableDLegerCommitLog()) {
    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}

主要調用 LedgerLeaderElector 的 addRoleChanneHandler 方法增長 節點角色變動事件監聽器,DLedgerRoleChangeHandler 是實現主從切換的另一個關鍵點。

2.3 調用 DefaultMessageStore 的 load 方法

DefaultMessageStore#load

// load Commit Log
result = result && this.commitLog.load();   // [@1](https://my.oschina.net/u/1198)
// load Consume Queue
result = result && this.loadConsumeQueue();  
if (result) {
    this.storeCheckpoint =  new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    this.indexService.load(lastExitOK);
    this.recover(lastExitOK);                         // @2
    log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}

代碼@一、@2 最終都是委託 commitlog 對象來執行,這裏的關鍵又是若是開啓了 DLedger,則最終調用的是 DLedgerCommitLog。

通過上面的鋪墊,主角 DLedgerCommitLog 「閃亮登場「了。

三、DLedgerCommitLog 詳解

> 舒適提示:因爲 Commitlog 的絕大部分方法都已經在《RocketMQ技術內幕》一書中詳細介紹了,而且 DLedgerCommitLog 的實現原理與 Commitlog 文件的實現原理類同,本文會一筆帶過關於存儲部分的實現細節。

3.1 核心類圖

在這裏插入圖片描述

DLedgerCommitlog 繼承自 Commitlog。讓咱們一一來看一下它的核心屬性。

  • DLedgerServer dLedgerServer 基於 raft 協議實現的集羣內的一個節點,用 DLedgerServer 實例表示。
  • DLedgerConfig dLedgerConfig DLedger 的配置信息。
  • DLedgerMmapFileStore dLedgerFileStore DLedger 基於文件映射的存儲實現。
  • MmapFileList dLedgerFileList DLedger 所管理的存儲文件集合,對比 RocketMQ 中的 MappedFileQueue。
  • int id 節點ID,0 表示主節點,非0表示從節點
  • MessageSerializer messageSerializer 消息序列器。
  • long beginTimeInDledgerLock = 0 用於記錄 消息追加的時耗(日誌追加所持有鎖時間)。
  • long dividedCommitlogOffset = -1 記錄的舊 commitlog 文件中的最大偏移量,若是訪問的偏移量大於它,則訪問 dledger 管理的文件。
  • boolean isInrecoveringOldCommitlog = false 是否正在恢復舊的 commitlog 文件。

接下來咱們將詳細介紹 DLedgerCommitlog 各個核心方法及其實現要點。

3.2 構造方法

public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
    super(defaultMessageStore);                   // @1
    dLedgerConfig =  new DLedgerConfig();
    dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
    dLedgerConfig.setStoreType(DLedgerConfig.FILE);
    dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
    dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
    dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
    dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
    dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
    dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);  
    id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;            // @2
    dLedgerServer = new DLedgerServer(dLedgerConfig);                           // @3
    dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
    DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
            assert bodyOffset == DLedgerEntry.BODY_OFFSET;
            buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
            buffer.putLong(entry.getPos() + bodyOffset);
    };
    dLedgerFileStore.addAppendHook(appendHook);   // @4
    dLedgerFileList = dLedgerFileStore.getDataFileList();
    this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());   // @5
}

代碼@1:調用父類 即 CommitLog 的構造函數,加載 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 文件,以便兼容升級 DLedger 的消息。咱們稍微看一下 CommitLog 的構造函數: 在這裏插入圖片描述

代碼@2:構建 DLedgerConfig 相關配置屬性,其主要屬性以下:

  • enableDiskForceClean 是否強制刪除文件,取自 broker 配置屬性 cleanFileForciblyEnable,默認爲 true 。
  • storeType DLedger 存儲類型,固定爲 基於文件的存儲模式。
  • dLegerSelfId leader 節點的 id 名稱,示例配置:n0,其配置要求第二個字符後必須是數字。
  • dLegerGroup DLeger group 的名稱,建議與 broker 配置屬性 brokerName 保持一致。
  • dLegerPeers DLeger Group 中全部的節點信息,其配置示例 n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多個節點使用分號隔開。
  • storeBaseDir 設置 DLedger 的日誌文件的根目錄,取自 borker 配件文件中的 storePathRootDir ,即 RocketMQ 的數據存儲根路徑。
  • mappedFileSizeForEntryData 設置 DLedger 的單個日誌文件的大小,取自 broker 配置文件中的 - mapedFileSizeCommitLog,即與 commitlog 文件的單個文件大小一致。
  • deleteWhen DLedger 日誌文件的刪除時間,取自 broker 配置文件中的 deleteWhen,默認爲凌晨 4點。
  • fileReservedHours DLedger 日誌文件保留時長,取自 broker 配置文件中的 fileReservedHours,默認爲 72h。

代碼@3:根據 DLedger 配置信息建立 DLedgerServer,即建立 DLedger 集羣節點,集羣內各個節點啓動後,就會觸發選主。

代碼@4:構建 appendHook 追加鉤子函數,這是兼容 Commitlog 文件很關鍵的一步,後面會詳細介紹其做用。

代碼@5:構建消息序列化。

根據上述的流程圖,構建好 DefaultMessageStore 實現後,就是調用其 load 方法,在啓用 DLedger 機制後,會依次調用 DLedgerCommitlog 的 load、recover 方法。

3.3 load

public boolean load() {
    boolean result = super.load();
    if (!result) {
        return false;
    }
    return true;
}

DLedgerCommitLog 的 laod 方法實現比較簡單,就是調用 其父類 Commitlog 的 load 方法,即這裏也是爲了啓用 DLedger 時可以兼容之前的消息。

3.4 recover

在 Broker 啓動時會加載 commitlog、consumequeue等文件,須要恢復其相關是數據結構,特別是與寫入、刷盤、提交等指針,其具體調用 recover 方法。 DLedgerCommitLog#recover

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {  // @1
    recover(maxPhyOffsetOfConsumeQueue);
}

首先會先恢復 consumequeue,得出 consumequeue 中記錄的最大有效物理偏移量,而後根據該物理偏移量進行恢復。 接下來看一下該方法的處理流程與關鍵點。

DLedgerCommitLog#recover

dLedgerFileStore.load();

Step1:加載 DLedger 相關的存儲文件,並一一構建對應的 MmapFile,其初始化三個重要的指針 wrotePosition、flushedPosition、committedPosition 三個指針爲文件的大小。

DLedgerCommitLog#recover

if (dLedgerFileList.getMappedFiles().size() > 0) {   
    dLedgerFileStore.recover();   // @1
    dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset();     // @2
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    if (mappedFile != null) {                                                                                                       // @3
        disableDeleteDledger();
    }
    long maxPhyOffset = dLedgerFileList.getMaxWrotePosition();
    // Clear ConsumeQueue redundant data
    if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) {      // @4
        log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset);
        this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset);
    }
    return;
}

Step2:若是已存在 DLedger 的數據文件,則只須要恢復 DLedger 相關數據文建,由於在加載舊的 commitlog 文件時已經將其重要的數據指針設置爲最大值。其關鍵實現點以下:

  • 首先調用 DLedger 文件存儲實現類 DLedgerFileStore 的 recover 方法,恢復管轄的 MMapFile 對象(一個文件對應一個MMapFile實例)的相關指針,其實現方法與 RocketMQ 的 DefaultMessageStore 的恢復過程相似。
  • 設置 dividedCommitlogOffset 的值爲 DLedger 中全部物理文件的最小偏移量。操做消息的物理偏移量小於該值,則從 commitlog 文件中查找;物理偏移量大於等於該值的話則從 DLedger 相關的文件中查找消息。
  • 若是存在舊的 commitlog 文件,則禁止刪除 DLedger 文件,其具體作法就是禁止強制刪除文件,並將文件的有效存儲時間設置爲 10 年。
  • 若是 consumequeue 中存儲的最大物理偏移量大於 DLedger 中最大的物理偏移量,則刪除多餘的 consumequeue 文件。

>舒適提示:爲何當存在 commitlog 文件的狀況下,不能刪除 DLedger 相關的日誌文件呢?

由於在此種狀況下,若是 DLedger 中的物理文件有刪除,則物理偏移量會斷層。 在這裏插入圖片描述

正常狀況下, maxCommitlogPhyOffset 與 dividedCommitlogOffset 是連續的,這樣很是方即是訪問 commitlog 仍是 訪問 DLedger ,但若是DLedger 部分文件刪除後,這兩個值就變的不連續,就會形成中間的文件空洞,沒法被連續訪問。

DLedgerCommitLog#recover

isInrecoveringOldCommitlog = true;
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
isInrecoveringOldCommitlog = false;

Step3:若是啓用了 DLedger 而且是初次啓動(還未生成 DLedger 相關的日誌文件),則須要恢復 舊的 commitlog 文件。

DLedgerCommitLog#recover

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {           // @1
    return;
}
ByteBuffer byteBuffer =  mappedFile.sliceByteBuffer();
byteBuffer.position(mappedFile.getWrotePosition());
boolean needWriteMagicCode = true;
// 1 TOTAL SIZE
byteBuffer.getInt(); //size
int magicCode = byteBuffer.getInt();
if (magicCode == CommitLog.BLANK_MAGIC_CODE) {   // @2
    needWriteMagicCode = false;
} else {
    log.info("Recover old commitlog found a illegal magic code={}", magicCode);
}
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();   // @3
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset);
if (needWriteMagicCode) {  // @4
    byteBuffer.position(mappedFile.getWrotePosition());
    byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
    byteBuffer.putInt(BLANK_MAGIC_CODE);
    mappedFile.flush(0);
}
mappedFile.setWrotePosition(mappedFile.getFileSize());   // @5
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
}

Step4:若是存在舊的 commitlog 文件,須要將最後的文件剩餘部分所有填充,即再也不接受新的數據寫入,新的數據所有寫入到 DLedger 的數據文件中。其關鍵實現點以下:

  • 嘗試查找最後一個 commitlog 文件,若是未找到,則結束。
  • 從最後一個文件的最後寫入點(原 commitlog 文件的 待寫入位點)嘗試去查找寫入的魔數,若是存在魔數並等於 CommitLog.BLANK_MAGIC_CODE,則無需再寫入魔數,在升級 DLedger 第一次啓動時,魔數爲空,故須要寫入魔數。
  • 初始化 dividedCommitlogOffset ,等於最後一個文件的起始偏移量加上文件的大小,即該指針指向最後一個文件的結束位置。
  • 將最後一個 commitlog 未寫滿的數據所有寫入,其方法爲 設置消息體的 size 與 魔數便可。
  • 設置最後一個文件的 wrotePosition、flushedPosition、committedPosition 爲文件的大小,一樣有意味者最後一個文件已經寫滿,下一條消息將寫入 DLedger 中。

在啓用 DLedger 機制時 Broker 的啓動流程就介紹到這裏了,相信你們已經瞭解 DLedger 在整合 RocketMQ 上作的努力,接下來咱們從消息追加、消息讀取兩個方面再來探討 DLedger 是如何無縫整合 RocketMQ 的,實現平滑升級的。

四、從消息追加看 DLedger 整合 RocketMQ 如何實現無縫兼容

> 舒適提示:本節一樣也不會詳細介紹整個消息追加(存儲流程),只是要點出與 DLedger(多副本、主從切換)相關的核心關鍵點。若是想詳細瞭解消息追加的流程,能夠閱讀筆者所著的《RocketMQ技術內幕》一書。

DLedgerCommitLog#putMessage

AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dledgerFuture = (AppendFuture<appendentryresponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}

關鍵點一:消息追加時,則再也不寫入到原先的 commitlog 文件中,而是調用 DLedgerServer 的 handleAppend 進行消息追加,該方法會有集羣內的 Leader 節點負責消息追加以及在消息複製,只有超過集羣內的半數節點成功寫入消息後,纔會返回寫入成功。若是追加成功,將會返回本次追加成功後的起始偏移量,即 pos 屬性,即相似於 rocketmq 中 commitlog 的偏移量,即物理偏移量。

DLedgerCommitLog#putMessage

long wroteOffset =  dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);

關鍵點二:根據 DLedger 的起始偏移量計算真正的消息的物理偏移量,從開頭部分得知,DLedger 自身有其存儲協議,其 body 字段存儲真實的消息,即 commitlog 條目的存儲結構,返回給客戶端的消息偏移量爲 body 字段的開始偏移量,即經過 putMessage 返回的物理偏移量與不使用Dledger 方式返回的物理偏移量的含義是同樣的,即從開偏移量開始,能夠正確讀取消息,這樣 DLedger 完美的兼容了 RocketMQ Commitlog。關於 pos 以及 wroteOffset 的圖解以下: 在這裏插入圖片描述

五、從消息讀取看 DLedger 整合 RocketMQ 如何實現無縫兼容

DLedgerCommitLog#getMessage

public SelectMappedBufferResult getMessage(final long offset, final int size) {
    if (offset &lt; dividedCommitlogOffset) {   // @1
        return super.getMessage(offset, size);
    }
    int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
    MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);   // @2
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        return  convertSbr(mappedFile.selectMappedBuffer(pos, size));                                       // @3
    }
    return null;
}

消息查找比較簡單,由於返回給客戶端消息,轉發給 consumequeue 的消息物理偏移量並非 DLedger 條目的偏移量,而是真實消息的起始偏移量。其實現關鍵點以下:

  • 若是查找的物理偏移量小於 dividedCommitlogOffset,則從原先的 commitlog 文件中查找。
  • 而後根據物理偏移量按照二分方找到具體的物理文件。
  • 對物理偏移量取模,得出在該物理文件中中的絕對偏移量,進行消息查找便可,由於只有知道其物理偏移量,從該處先將消息的長度讀取出來,而後便可讀出一條完整的消息。

五、總結

根據上面詳細的介紹,我想讀者朋友們應該不可貴出以下結論:

  • DLedger 在整合時,使用 DLedger 條目包裹 RocketMQ 中的 commitlog 條目,即在 DLedger 條目的 body 字段來存儲整條 commitlog 條目。
  • 引入 dividedCommitlogOffset 變量,表示物理偏移量小於該值的消息存在於舊的 commitlog 文件中,實現 升級 DLedger 集羣后能訪問到舊的數據。
  • 新 DLedger 集羣啓動後,會將最後一個 commitlog 填充,即新的數據不會再寫入到 原先的 commitlog 文件。
  • 消息追加到 DLedger 數據日誌文件中,返回的偏移量不是 DLedger 條目的起始偏移量,而是DLedger 條目中 body 字段的起始偏移量,即真實消息的起始偏移量,保證消息物理偏移量的語義與 RocketMQ Commitlog同樣。

RocketMQ 整合 DLedger(多副本)實現平滑升級的設計技巧就介紹到這裏了。

若是本文對您有必定的幫助話,麻煩幫忙點個贊,很是感謝。


> 做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,重點關注JAVA集合、JAVA併發包、Netty、Dubbo、RocketMQ、Mybatis、Elasticsearch、Netty。可掃描以下二維碼與做者進行互動。

</appendentryresponse>

相關文章
相關標籤/搜索