源碼分析 RocketMQ DLedger(多副本) 之日誌複製(傳播)

本文緊接着 源碼分析 RocketMQ DLedger(多副本) 之日誌追加流程 ,繼續 Leader 處理客戶端 append 的請求流程中最相當重要的一環:日誌複製。java

DLedger 多副本的日誌轉發由 DLedgerEntryPusher 實現,接下來將對其進行詳細介紹。node

> 舒適提示:因爲本篇幅較長,爲了更好的理解其實現,你們能夠帶着以下疑問來通讀本篇文章: 一、raft 協議中有一個很是重要的概念:已提交日誌序號,該如何實現。 二、客戶端向 DLedger 集羣發送一條日誌,必須獲得集羣中大多數節點的承認才能被認爲寫入成功。 三、raft 協議中追加、提交兩個動做如何實現。json

日誌複製(日誌轉發)由 DLedgerEntryPusher 實現,具體類圖以下: 在這裏插入圖片描述服務器

主要由以下4個類構成:網絡

  • DLedgerEntryPusher DLedger 日誌轉發與處理核心類,該內會啓動以下3個對象,其分別對應一個線程。
  • EntryHandler 日誌接收處理線程,當節點爲從節點時激活。
  • QuorumAckChecker 日誌追加ACK投票處理線程,當前節點爲主節點時激活。
  • EntryDispatcher 日誌轉發線程,當前節點爲主節點時追加。

接下來咱們將詳細介紹上述4個類,從而揭曉日誌複製的核心實現原理。數據結構

一、DLedgerEntryPusher

1.1 核心類圖

在這裏插入圖片描述

DLedger 多副本日誌推送的核心實現類,裏面會建立 EntryDispatcher、QuorumAckChecker、EntryHandler 三個核心線程。其核心屬性以下:app

  • DLedgerConfig dLedgerConfig 多副本相關配置。
  • DLedgerStore dLedgerStore 存儲實現類。
  • MemberState memberState 節點狀態機。
  • DLedgerRpcService dLedgerRpcService RPC 服務實現類,用於集羣內的其餘節點進行網絡通信。
  • Map<long, concurrentmap<string, long>> peerWaterMarksByTerm 每一個節點基於投票輪次的當前水位線標記。鍵值爲投票輪次,值爲 ConcurrentMap<string ** 節點id* , Long 節點對應的日誌序號* />。
  • Map<long, concurrentmap<long, timeoutfuture<appendentryresponse>>> pendingAppendResponsesByTerm 用於存放追加請求的響應結果(Future模式)。
  • EntryHandler entryHandler 從節點上開啓的線程,用於接收主節點的 push 請求(append、commit、append)。
  • QuorumAckChecker quorumAckChecker 主節點上的追加請求投票器。
  • Map<string, entrydispatcher> dispatcherMap 主節點日誌請求轉發器,向從節點複製消息等。

接下來介紹一下其核心方法的實現。異步

1.2 構造方法

public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
    DLedgerRpcService dLedgerRpcService) {
    this.dLedgerConfig = dLedgerConfig;
    this.memberState = memberState;
    this.dLedgerStore = dLedgerStore;
    this.dLedgerRpcService = dLedgerRpcService;
    for (String peer : memberState.getPeerMap().keySet()) {
        if (!peer.equals(memberState.getSelfId())) {
            dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
        }
    }
}

構造方法的重點是會根據集羣內的節點,依次構建對應的 EntryDispatcher 對象。源碼分析

1.3 startup

DLedgerEntryPusher#startupui

public void startup() {
    entryHandler.start();
    quorumAckChecker.start();
    for (EntryDispatcher dispatcher : dispatcherMap.values()) {
        dispatcher.start();
    }
}

依次啓動 EntryHandler、QuorumAckChecker 與 EntryDispatcher 線程。

> 備註:DLedgerEntryPusher 的其餘核心方法在詳細分析其日誌複製原理的過程當中會一一介紹。

接下來將從 EntryDispatcher、QuorumAckChecker、EntryHandler 來闡述 RocketMQ DLedger(多副本)的實現原理。

二、EntryDispatcher 詳解

2.1 核心類圖

在這裏插入圖片描述

其核心屬性以下。

  • AtomicReference<pushentryrequest.type> type = new AtomicReference<>(PushEntryRequest.Type.COMPARE) 向從節點發送命令的類型,可選值:PushEntryRequest.Type.COMPARE、TRUNCATE、APPEND、COMMIT,下面詳細說明。
  • long lastPushCommitTimeMs = -1 上一次發送提交類型的時間戳。
  • String peerId 目標節點ID。
  • long compareIndex = -1 已完成比較的日誌序號。
  • long writeIndex = -1 已寫入的日誌序號。
  • int maxPendingSize = 1000 容許的最大掛起日誌數量。
  • long term = -1 Leader 節點當前的投票輪次。
  • String leaderId = null Leader 節點ID。
  • long lastCheckLeakTimeMs = System.currentTimeMillis() 上次檢測泄漏的時間,所謂的泄漏,就是看掛起的日誌請求數量是否查過了 maxPendingSize 。
  • ConcurrentMap<long, long> pendingMap = new ConcurrentHashMap<>() 記錄日誌的掛起時間,key:日誌的序列(entryIndex),value:掛起時間戳。
  • Quota quota = new Quota(dLedgerConfig.getPeerPushQuota()) 配額。

2.2 Push 請求類型

DLedger 主節點向從從節點複製日誌總共定義了4類請求類型,其枚舉類型爲 PushEntryRequest.Type,其值分別爲 COMPARE、TRUNCATE、APPEND、COMMIT。

  • COMPARE 若是 Leader 發生變化,新的 Leader 須要與他的從節點的日誌條目進行比較,以便截斷從節點多餘的數據。
  • TRUNCATE 若是 Leader 經過索引完成日誌對比,則 Leader 將發送 TRUNCATE 給它的從節點。
  • APPEND 將日誌條目追加到從節點。
  • COMMIT 一般,leader 會將提交的索引附加到 append 請求,可是若是 append 請求不多且分散,leader 將發送一個單獨的請求來通知從節點提交的索引。

對主從節點的請求類型有了一個初步的認識後,咱們將從 EntryDispatcher 的業務處理入口 doWork 方法開始講解。

2.3 doWork 方法詳解

public void doWork() {
    try {
        if (!checkAndFreshState()) {                                            // [@1](https://my.oschina.net/u/1198)
            waitForRunning(1);
            return;
        }

        if (type.get() == PushEntryRequest.Type.APPEND) {   // @2
            doAppend();
        } else {
            doCompare();                                                           // [@3](https://my.oschina.net/u/2648711)
        }
        waitForRunning(1);
    } catch (Throwable t) {
        DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);
        DLedgerUtils.sleep(500);
    }
}

代碼@1:檢查狀態,是否能夠繼續發送 append 或 compare。

代碼@2:若是推送類型爲APPEND,主節點向從節點傳播消息請求。

代碼@3:主節點向從節點發送對比數據差別請求(當一個新節點被選舉成爲主節點時,每每這是第一步)。

2.3.1 checkAndFreshState 詳解

EntryDispatcher#checkAndFreshState

private boolean checkAndFreshState() {
    if (!memberState.isLeader()) {     // @1
        return false;
    }
    if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) {     // @2
        synchronized (memberState) {
            if (!memberState.isLeader()) {
                return false;
            }
            PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
            term = memberState.currTerm();
            leaderId = memberState.getSelfId();
            changeState(-1, PushEntryRequest.Type.COMPARE);
        }
    }
    return true;
}

代碼@1:若是節點的狀態不是主節點,則直接返回 false。則結束 本次 doWork 方法。由於只有主節點才須要向從節點轉發日誌。

代碼@2:若是當前節點狀態是主節點,但當前的投票輪次與狀態機輪次或 leaderId 還未設置,或 leaderId 與狀態機的 leaderId 不相等,這種狀況一般是集羣觸發了從新選舉,設置其term、leaderId與狀態機同步,即將發送COMPARE 請求。

接下來看一下 changeState (改變狀態)。

private synchronized void changeState(long index, PushEntryRequest.Type target) {
    logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);
    switch (target) {
        case APPEND:      // @1
            compareIndex = -1;
            updatePeerWaterMark(term, peerId, index);
            quorumAckChecker.wakeup();
            writeIndex = index + 1;
            break;
        case COMPARE:    // @2
            if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
                compareIndex = -1;
                pendingMap.clear();
            }
            break;
        case TRUNCATE:     // @3
            compareIndex = -1;
            break;
        default:
            break;
    }
    type.set(target);
}

代碼@1:若是將目標類型設置爲 append,則重置 compareIndex ,並設置 writeIndex 爲當前 index 加1。

代碼@2:若是將目標類型設置爲 COMPARE,則重置 compareIndex 爲負一,接下將向各個從節點發送 COMPARE 請求相似,並清除已掛起的請求。

代碼@3:若是將目標類型設置爲 TRUNCATE,則重置 compareIndex 爲負一。

接下來具體來看一下 APPEND、COMPARE、TRUNCATE 等請求。

2.3.2 append 請求詳解

EntryDispatcher#doAppend

private void doAppend() throws Exception {
    while (true) {
        if (!checkAndFreshState()) {                                                 // @1
            break;
        }
        if (type.get() != PushEntryRequest.Type.APPEND) {        // @2
            break;
        }
        if (writeIndex &gt; dLedgerStore.getLedgerEndIndex()) {    // @3
            doCommit();
            doCheckAppendResponse();
            break;
        }
        if (pendingMap.size() &gt;= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) &gt; 1000)) {     // @4
            long peerWaterMark = getPeerWaterMark(term, peerId);
            for (Long index : pendingMap.keySet()) {
                if (index &lt; peerWaterMark) {
                    pendingMap.remove(index);
                }
            }
            lastCheckLeakTimeMs = System.currentTimeMillis();
        }
        if (pendingMap.size() &gt;= maxPendingSize) {    // @5
            doCheckAppendResponse();
            break;
        }
        doAppendInner(writeIndex);                               // @6
        writeIndex++;
    }
}

代碼@1:檢查狀態,已經在上面詳細介紹。

代碼@2:若是請求類型不爲 APPEND,則退出,結束本輪 doWork 方法執行。

代碼@3:writeIndex 表示當前追加到從該節點的序號,一般狀況下主節點向從節點發送 append 請求時,會附帶主節點的已提交指針,但如何 append 請求發不那麼頻繁,writeIndex 大於 leaderEndIndex 時(因爲pending請求超過其 pending 請求的隊列長度(默認爲1w),時,會阻止數據的追加,此時有可能出現 writeIndex 大於 leaderEndIndex 的狀況,此時單獨發送 COMMIT 請求。

代碼@4:檢測 pendingMap(掛起的請求數量)是否發送泄漏,即掛起隊列中容量是否超過容許的最大掛起閥值。獲取當前節點關於本輪次的當前水位線(已成功 append 請求的日誌序號),若是發現正在掛起請求的日誌序號小於水位線,則丟棄。

代碼@5:若是掛起的請求(等待從節點追加結果)大於 maxPendingSize 時,檢查並追加一次 append 請求。

代碼@6:具體的追加請求。

2.3.2.1 doCommit 發送提交請求

EntryDispatcher#doCommit

private void doCommit() throws Exception {
    if (DLedgerUtils.elapsed(lastPushCommitTimeMs) &gt; 1000) {   // @1
        PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT);   // @2
        //Ignore the results
        dLedgerRpcService.push(request);                                                                                        // @3
        lastPushCommitTimeMs = System.currentTimeMillis();
    }
}

代碼@1:若是上一次單獨發送 commit 的請求時間與當前時間相隔低於 1s,放棄本次提交請求。

代碼@2:構建提交請求。

代碼@3:經過網絡向從節點發送 commit 請求。

接下來先了解一下如何構建 commit 請求包。

EntryDispatcher#buildPushRequest

private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {
    PushEntryRequest request = new PushEntryRequest();
    request.setGroup(memberState.getGroup());  
    request.setRemoteId(peerId);                          
    request.setLeaderId(leaderId);
    request.setTerm(term);
    request.setEntry(entry);
    request.setType(target);
    request.setCommitIndex(dLedgerStore.getCommittedIndex());
    return request;
}

提交包請求字段主要包含以下字段:DLedger 節點所屬組、從節點 id、主節點 id,當前投票輪次、日誌內容、請求類型與 committedIndex(主節點已提交日誌序號)。

2.3.2.2 doCheckAppendResponse 檢查並追加請求

EntryDispatcher#doCheckAppendResponse

private void doCheckAppendResponse() throws Exception {
    long peerWaterMark = getPeerWaterMark(term, peerId);   // @1
    Long sendTimeMs = pendingMap.get(peerWaterMark + 1); 
    if (sendTimeMs != null &amp;&amp; System.currentTimeMillis() - sendTimeMs &gt; dLedgerConfig.getMaxPushTimeOutMs()) { // @2
        logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + 1);
        doAppendInner(peerWaterMark + 1);
    }
}

該方法的做用是檢查 append 請求是否超時,其關鍵實現以下:

  • 獲取已成功 append 的序號。
  • 從掛起的請求隊列中獲取下一條的發送時間,若是不爲空並去超過了 append 的超時時間,則再從新發送 append 請求,最大超時時間默認爲 1s,能夠經過 maxPushTimeOutMs 來改變默認值。
2.3.2.3 doAppendInner 追加請求

向從節點發送 append 請求。

EntryDispatcher#doAppendInner

private void doAppendInner(long index) throws Exception {
    DLedgerEntry entry = dLedgerStore.get(index);   // @1
    PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);
    checkQuotaAndWait(entry);                                   // @2
    PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);   // @3
    CompletableFuture<pushentryresponse> responseFuture = dLedgerRpcService.push(request);   // @4
    pendingMap.put(index, System.currentTimeMillis());                                                                          // @5
    responseFuture.whenComplete((x, ex) -&gt; {
        try {
            PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
            DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
            switch (responseCode) {
                case SUCCESS:                                                                                                                // @6
                    pendingMap.remove(x.getIndex());
                    updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
                    quorumAckChecker.wakeup();
                    break;
                case INCONSISTENT_STATE:                                                                                         // @7
                    logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
                    changeState(-1, PushEntryRequest.Type.COMPARE);
                    break;
                default:
                    logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
                    break;
            }
        } catch (Throwable t) {
            logger.error("", t);
        }
    });
    lastPushCommitTimeMs = System.currentTimeMillis();
}

代碼@1:首先根據序號查詢出日誌。

代碼@2:檢測配額,若是超過配額,會進行必定的限流,其關鍵實現點:

  • 首先觸發條件:append 掛起請求數已超過最大容許掛起數;基於文件存儲並主從差別超過300m,可經過 peerPushThrottlePoint 配置。
  • 每秒追加的日誌超過 20m(可經過 peerPushQuota 配置),則會 sleep 1s中後再追加。

代碼@3:構建 PUSH 請求日誌。

代碼@4:經過 Netty 發送網絡請求到從節點,從節點收到請求會進行處理(本文並不會探討與網絡相關的實現細節)。

代碼@5:用 pendingMap 記錄待追加的日誌的發送時間,用於發送端判斷是否超時的一個依據。

代碼@6:請求成功的處理邏輯,其關鍵實現點以下:

  • 移除 pendingMap 中的關於該日誌的發送超時時間。
  • 更新已成功追加的日誌序號(按投票輪次組織,而且每一個從服務器一個鍵值對)。
  • 喚醒 quorumAckChecker 線程(主要用於仲裁 append 結果),後續會詳細介紹。

代碼@7:Push 請求出現狀態不一致狀況,將發送 COMPARE 請求,來對比主從節點的數據是否一致。

日誌轉發 append 追加請求類型就介紹到這裏了,接下來咱們繼續探討另外一個請求類型 compare。

2.3.3 compare 請求詳解

COMPARE 類型的請求有 doCompare 方法發送,首先該方法運行在 while (true) 中,故在查閱下面代碼時,要注意其退出循環的條件。 EntryDispatcher#doCompare

if (!checkAndFreshState()) {
    break;
}
if (type.get() != PushEntryRequest.Type.COMPARE
    &amp;&amp; type.get() != PushEntryRequest.Type.TRUNCATE) {
    break;
}
if (compareIndex == -1 &amp;&amp; dLedgerStore.getLedgerEndIndex() == -1) {
    break;
}

Step1:驗證是否執行,有幾個關鍵點以下:

  • 判斷是不是主節點,若是不是主節點,則直接跳出。
  • 若是是請求類型不是 COMPARE 或 TRUNCATE 請求,則直接跳出。
  • 若是已比較索引 和 ledgerEndIndex 都爲 -1 ,表示一個新的 DLedger 集羣,則直接跳出。

EntryDispatcher#doCompare

if (compareIndex == -1) {
    compareIndex = dLedgerStore.getLedgerEndIndex();
    logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
} else if (compareIndex &gt; dLedgerStore.getLedgerEndIndex() || compareIndex &lt; dLedgerStore.getLedgerBeginIndex()) {
    logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());
    compareIndex = dLedgerStore.getLedgerEndIndex();
}

Step2:若是 compareIndex 爲 -1 或compareIndex 不在有效範圍內,則重置待比較序列號爲當前已已存儲的最大日誌序號:ledgerEndIndex。

DLedgerEntry entry = dLedgerStore.get(compareIndex);
PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
CompletableFuture<pushentryresponse> responseFuture = dLedgerRpcService.push(request);
PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);

Step3:根據序號查詢到日誌,並向從節點發起 COMPARE 請求,其超時時間爲 3s。

EntryDispatcher#doCompare

long truncateIndex = -1;
if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {   // @1
    if (compareIndex == response.getEndIndex()) {
        changeState(compareIndex, PushEntryRequest.Type.APPEND);
        break;
    } else {
        truncateIndex = compareIndex;
    }

} else if (response.getEndIndex() &lt; dLedgerStore.getLedgerBeginIndex() 
        || response.getBeginIndex() &gt; dLedgerStore.getLedgerEndIndex()) {    // @2
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex &lt; response.getBeginIndex()) {                                    // @3
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex &gt; response.getEndIndex()) {                                      // @4
    compareIndex = response.getEndIndex();
} else {                                                                                                              // @5
	compareIndex--;
}

if (compareIndex &lt; dLedgerStore.getLedgerBeginIndex()) {                          // @6
    truncateIndex = dLedgerStore.getLedgerBeginIndex();
}

Step4:根據響應結果計算須要截斷的日誌序號,其主要實現關鍵點以下:

  • 代碼@1:若是二者的日誌序號相同,則無需截斷,下次將直接先從節點發送 append 請求;不然將 truncateIndex 設置爲響應結果中的 endIndex。
  • 代碼@2:若是從節點存儲的最大日誌序號小於主節點的最小序號,或者從節點的最小日誌序號大於主節點的最大日誌序號,即二者不相交,這一般發生在從節點崩潰很長一段時間,而主節點刪除了過時的條目時。truncateIndex 設置爲主節點的 ledgerBeginIndex,即主節點目前最小的偏移量。
  • 代碼@3:若是已比較的日誌序號小於從節點的開始日誌序號,極可能是從節點磁盤發送損耗,從主節點最小日誌序號開始同步。
  • 代碼@4:若是已比較的日誌序號大於從節點的最大日誌序號,則已比較索引設置爲從節點最大的日誌序號,觸發數據的繼續同步。
  • 代碼@5:若是已比較的日誌序號大於從節點的開始日誌序號,但小於從節點的最大日誌序號,則待比較索引減一。
  • 代碼@6:若是比較出來的日誌序號小於主節點的最小日誌須要,則設置爲主節點的最小序號。
if (truncateIndex != -1) {
    changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
    doTruncate(truncateIndex);
    break;
}

Step5:若是比較出來的日誌序號不等於 -1 ,則向從節點發送 TRUNCATE 請求。

2.3.3.1 doTruncate 詳解
private void doTruncate(long truncateIndex) throws Exception {
    PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
    DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);
    PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);
    logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());
    PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
    PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);
    PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
    PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
    lastPushCommitTimeMs = System.currentTimeMillis();
    changeState(truncateIndex, PushEntryRequest.Type.APPEND);
}

該方法主要就是構建 truncate 請求到從節點。

關於服務端的消息複製轉發就介紹到這裏了,主節點負責向從服務器PUSH請求,從節點天然而然的要處理這些請求,接下來咱們就按照主節點發送的請求,來具體分析一下從節點是如何響應的。

三、EntryHandler 詳解

EntryHandler 一樣是一個線程,當節點狀態爲從節點時激活。

3.1 核心類圖

在這裏插入圖片描述

其核心屬性以下:

  • long lastCheckFastForwardTimeMs 上一次檢查主服務器是否有 push 消息的時間戳。
  • ConcurrentMap<long, pair<pushentryrequest, completablefuture< pushentryresponse>>> writeRequestMap append 請求處理隊列。
  • BlockingQueue<pair<pushentryrequest, completablefuture< pushentryresponse>>> compareOrTruncateRequests COMMIT、COMPARE、TRUNCATE 相關請求

3.2 handlePush

從上文得知,主節點會主動向從節點傳播日誌,從節點會經過網絡接受到請求數據進行處理,其調用鏈如圖所示: 在這裏插入圖片描述

最終會調用 EntryHandler 的 handlePush 方法。

EntryHandler#handlePush

public CompletableFuture<pushentryresponse> handlePush(PushEntryRequest request) throws Exception {
    //The timeout should smaller than the remoting layer's request timeout
    CompletableFuture<pushentryresponse> future = new TimeoutFuture&lt;&gt;(1000);      // @1
    switch (request.getType()) {
        case APPEND:                                                                                                          // @2
            PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
            long index = request.getEntry().getIndex();
            Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; old = writeRequestMap.putIfAbsent(index, new Pair&lt;&gt;(request, future));
            if (old != null) {
                logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
                future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
            }
            break;
        case COMMIT:                                                                                                           // @3
            compareOrTruncateRequests.put(new Pair&lt;&gt;(request, future));
            break;
        case COMPARE:
        case TRUNCATE:                                                                                                     // @4
            PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
            writeRequestMap.clear();
            compareOrTruncateRequests.put(new Pair&lt;&gt;(request, future));
            break;
        default:
            logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());
            future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
            break;
    }
    return future;
}

從幾點處理主節點的 push 請求,其實現關鍵點以下。

代碼@1:首先構建一個響應結果Future,默認超時時間 1s。

代碼@2:若是是 APPEND 請求,放入到 writeRequestMap 集合中,若是已存在該數據結構,說明主節點重複推送,構建返回結果,其狀態碼爲 REPEATED_PUSH。放入到 writeRequestMap 中,由 doWork 方法定時去處理待寫入的請求。

代碼@3:若是是提交請求, 將請求存入 compareOrTruncateRequests 請求處理中,由 doWork 方法異步處理。

代碼@4:若是是 COMPARE 或 TRUNCATE 請求,將待寫入隊列 writeRequestMap 清空,並將請求放入 compareOrTruncateRequests 請求隊列中,由 doWork 方法異步處理。

接下來,咱們重點來分析 doWork 方法的實現。

3.3 doWork 方法詳解

EntryHandler#doWork

public void doWork() {
    try {
        if (!memberState.isFollower()) {     // @1
            waitForRunning(1);
            return;
        }
        if (compareOrTruncateRequests.peek() != null) {    // @2
            Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; pair = compareOrTruncateRequests.poll();
            PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);
            switch (pair.getKey().getType()) {
                case TRUNCATE:
                    handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                    break;
                case COMPARE:
                    handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                    break;
                case COMMIT:
                    handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());
                    break;
                default:
                    break;
            }
        } else { // @3
            long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
            Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; pair = writeRequestMap.remove(nextIndex);
            if (pair == null) {
                checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
                waitForRunning(1);
                return;
            }
            PushEntryRequest request = pair.getKey();
            handleDoAppend(nextIndex, request, pair.getValue());
        }
    } catch (Throwable t) {
        DLedgerEntryPusher.logger.error("Error in {}", getName(), t);
        DLedgerUtils.sleep(100);
    }
}

代碼@1:若是當前節點的狀態不是從節點,則跳出。

代碼@2:若是 compareOrTruncateRequests 隊列不爲空,說明有COMMIT、COMPARE、TRUNCATE 等請求,這類請求優先處理。值得注意的是這裏使用是 peek、poll 等非阻塞方法,而後根據請求的類型,調用對應的方法。稍後詳細介紹。

代碼@3:若是隻有 append 類請求,則根據當前節點最大的消息序號,嘗試從 writeRequestMap 容器中,獲取下一個消息複製請求(ledgerEndIndex + 1) 爲 key 去查找。若是不爲空,則執行 doAppend 請求,若是爲空,則調用 checkAbnormalFuture 來處理異常狀況。

接下來咱們來重點分析各個處理細節。

3.3.1 handleDoCommit

處理提交請求,其處理比較簡單,就是調用 DLedgerStore 的 updateCommittedIndex 更新其已提交偏移量,故咱們仍是具體看一下DLedgerStore 的 updateCommittedIndex 方法。

DLedgerMmapFileStore#updateCommittedIndex

public void updateCommittedIndex(long term, long newCommittedIndex) {   // @1
    if (newCommittedIndex == -1
            || ledgerEndIndex == -1
            || term &lt; memberState.currTerm()
            || newCommittedIndex == this.committedIndex) {                               // @2
            return;
    }
    if (newCommittedIndex &lt; this.committedIndex
            || newCommittedIndex &lt; this.ledgerBeginIndex) {                             // @3
        logger.warn("[MONITOR]Skip update committed index for new={} &lt; old={} or new={} &lt; beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex);
        return;
    }
    long endIndex = ledgerEndIndex;
    if (newCommittedIndex &gt; endIndex) {                                                       // @4
            //If the node fall behind too much, the committedIndex will be larger than enIndex.
        newCommittedIndex = endIndex;
    }
    DLedgerEntry dLedgerEntry = get(newCommittedIndex);                        // @5                
    PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.DISK_ERROR);
    this.committedIndex = newCommittedIndex;
    this.committedPos = dLedgerEntry.getPos() + dLedgerEntry.getSize();     // @6
}

代碼@1:首先介紹一下方法的參數:

  • long term 主節點當前的投票輪次。
  • long newCommittedIndex: 主節點發送日誌複製請求時的已提交日誌序號。

代碼@2:若是待更新提交序號爲 -1 或 投票輪次小於從節點的投票輪次或主節點投票輪次等於從節點的已提交序號,則直接忽略本次提交動做。

代碼@3:若是主節點的已提交日誌序號小於從節點的已提交日誌序號或待提交序號小於當前節點的最小有效日誌序號,則輸出警告日誌[MONITOR],並忽略本次提交動做。

代碼@4:若是從節點落後主節點太多,則重置 提交索引爲從節點當前最大有效日誌序號。

代碼@5:嘗試根據待提交序號從從節點查找數據,若是數據不存在,則拋出 DISK_ERROR 錯誤。

代碼@6:更新 commitedIndex、committedPos 兩個指針,DledgerStore會定時將已提交指針刷入 checkpoint 文件,達到持久化 commitedIndex 指針的目的。

3.3.2 handleDoCompare

處理主節點發送過來的 COMPARE 請求,其實現也比較簡單,最終調用 buildResponse 方法構造響應結果。

EntryHandler#buildResponse

private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
    PushEntryResponse response = new PushEntryResponse();
    response.setGroup(request.getGroup());
    response.setCode(code);
    response.setTerm(request.getTerm());
    if (request.getType() != PushEntryRequest.Type.COMMIT) {
        response.setIndex(request.getEntry().getIndex());
    }
    response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());
    response.setEndIndex(dLedgerStore.getLedgerEndIndex());
    return response;
}

主要也是返回當前從幾點的 ledgerBeginIndex、ledgerEndIndex 以及投票輪次,供主節點進行判斷比較。

3.3.3 handleDoTruncate

handleDoTruncate 方法實現比較簡單,刪除從節點上 truncateIndex 日誌序號以後的全部日誌,具體調用dLedgerStore 的 truncate 方法,因爲其存儲與 RocketMQ 的存儲設計基本相似故本文就不在詳細介紹,簡單介紹其實現要點:根據日誌序號,去定位到日誌文件,若是命中具體的文件,則修改相應的讀寫指針、刷盤指針等,並將所在在物理文件以後的全部文件刪除。你們若有興趣,能夠查閱筆者的《RocketMQ技術內幕》第4章:RocketMQ 存儲相關內容。

3.3.4 handleDoAppend

private void handleDoAppend(long writeIndex, PushEntryRequest request,
    CompletableFuture<pushentryresponse> future) {
    try {
        PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
        DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
        PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
        dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
    } catch (Throwable t) {
        logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t);
        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
    }
}

其實現也比較簡單,調用DLedgerStore 的 appendAsFollower 方法進行日誌的追加,與appendAsLeader 在日誌存儲部分相同,只是從節點無需再轉發日誌。

3.3.5 checkAbnormalFuture

該方法是本節的重點,doWork 的從服務器存儲的最大有效日誌序號(ledgerEndIndex) + 1 序號,嘗試從待寫請求中獲取不到對應的請求時調用,這種狀況也很常見,例如主節點並麼有將最新的數據 PUSH 給從節點。接下來咱們詳細來看看該方法的實現細節。 EntryHandler#checkAbnormalFuture

if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) &lt; 1000) {
    return;
}
lastCheckFastForwardTimeMs  = System.currentTimeMillis();
if (writeRequestMap.isEmpty()) {
    return;
}

Step1:若是上一次檢查的時間距如今不到1s,則跳出;若是當前沒有積壓的append請求,一樣跳出,由於能夠一樣明確的判斷出主節點還未推送日誌。

EntryHandler#checkAbnormalFuture

for (Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; pair : writeRequestMap.values()) {
    long index = pair.getKey().getEntry().getIndex();             // @1
    //Fall behind
    if (index &lt;= endIndex) {                                                   // @2
        try {
            DLedgerEntry local = dLedgerStore.get(index);
            PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
            logger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex);
        } catch (Throwable t) {
            logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t);
            pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
        }
        writeRequestMap.remove(index);
        continue;
    }
    //Just OK
    if (index ==  endIndex + 1) {    // @3
        //The next entry is coming, just return
        return;
    }
    //Fast forward
    TimeoutFuture<pushentryresponse> future  = (TimeoutFuture<pushentryresponse>) pair.getValue();    // @4
    if (!future.isTimeOut()) {
        continue;
    }
    if (index &lt; minFastForwardIndex) {                                                                                                                // @5
        minFastForwardIndex = index;
    }
}

Step2:遍歷當前待寫入的日誌追加請求(主服務器推送過來的日誌複製請求),找到須要快速快進的的索引。其關鍵實現點以下:

  • 代碼@1:首先獲取待寫入日誌的序號。
  • 代碼@2:若是待寫入的日誌序號小於從節點已追加的日誌(endIndex),而且日誌的確已存儲在從節點,則返回成功,並輸出警告日誌【PushFallBehind】,繼續監測下一條待寫入日誌。
  • 代碼@3:若是待寫入 index 等於 endIndex + 1,則結束循環,由於下一條日誌消息已經在待寫入隊列中,即將寫入。
  • 代碼@4:若是待寫入 index 大於 endIndex + 1,而且未超時,則直接檢查下一條待寫入日誌。
  • 代碼@5:若是待寫入 index 大於 endIndex + 1,而且已經超時,則記錄該索引,使用 minFastForwardIndex 存儲。

EntryHandler#checkAbnormalFuture

if (minFastForwardIndex == Long.MAX_VALUE) {
    return;
}
Pair<pushentryrequest, completablefuture<pushentryresponse>&gt; pair = writeRequestMap.get(minFastForwardIndex);
if (pair == null) {
    return;
}

Step3:若是未找到須要快速失敗的日誌序號或 writeRequestMap 中未找到其請求,則直接結束檢測。

EntryHandler#checkAbnormalFuture

logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));

Step4:則向主節點報告從節點已經與主節點發生了數據不一致,從節點並無寫入序號 minFastForwardIndex 的日誌。若是主節點收到此種響應,將會中止日誌轉發,轉而向各個從節點發送 COMPARE 請求,從而使數據恢復一致。

行爲至此,已經詳細介紹了主服務器向從服務器發送請求,從服務作出響應,那接下來就來看一下,服務端收到響應結果後的處理,咱們要知道主節點會向它全部的從節點傳播日誌,主節點須要在指定時間內收到超過集羣一半節點的確認,才能認爲日誌寫入成功,那咱們接下來看一下其實現過程。

四、QuorumAckChecker

日誌複製投票器,一個日誌寫請求只有獲得集羣內的的大多數節點的響應,日誌纔會被提交。

4.1 類圖

在這裏插入圖片描述

其核心屬性以下:

  • long lastPrintWatermarkTimeMs 上次打印水位線的時間戳,單位爲毫秒。
  • long lastCheckLeakTimeMs 上次檢測泄漏的時間戳,單位爲毫秒。
  • long lastQuorumIndex 已投票仲裁的日誌序號。

4.2 doWork 詳解

QuorumAckChecker#doWork

if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) &gt; 3000) {    
    logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
            memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
    lastPrintWatermarkTimeMs = System.currentTimeMillis();
}

Step1:若是離上一次打印 watermak 的時間超過3s,則打印一下當前的 term、ledgerBegin、ledgerEnd、committed、peerWaterMarksByTerm 這些數據日誌。

QuorumAckChecker#doWork

if (!memberState.isLeader()) {   // @2
    waitForRunning(1);
    return;
}

Step2:若是當前節點不是主節點,直接返回,不做爲。

QuorumAckChecker#doWork

if (pendingAppendResponsesByTerm.size() &gt; 1) {   // @1
    for (Long term : pendingAppendResponsesByTerm.keySet()) {
        if (term == currTerm) {
            continue;
        }
        for (Map.Entry<long, timeoutfuture<appendentryresponse>&gt; futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setIndex(futureEntry.getKey());
            response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
            response.setLeaderId(memberState.getLeaderId());
            logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);
            futureEntry.getValue().complete(response);
        }
        pendingAppendResponsesByTerm.remove(term);
    }
}
if (peerWaterMarksByTerm.size() &gt; 1) {
    for (Long term : peerWaterMarksByTerm.keySet()) {
        if (term == currTerm) {
            continue;
        }
        logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);
        peerWaterMarksByTerm.remove(term);
    }
}

Step3:清理pendingAppendResponsesByTerm、peerWaterMarksByTerm 中本次投票輪次的數據,避免一些沒必要要的內存使用。

Map<string, long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
long quorumIndex = -1;
for (Long index : peerWaterMarks.values()) {  // @1
    int num = 0;
    for (Long another : peerWaterMarks.values()) {  // @2
        if (another &gt;= index) {
            num++;
        }
    }
    if (memberState.isQuorum(num) &amp;&amp; index &gt; quorumIndex) {  // @3
        quorumIndex = index;
    }
}
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);  // @4

Step4:根據各個從節點反饋的進度,進行仲裁,肯定已提交序號。爲了加深對這段代碼的理解,再來囉嗦一下 peerWaterMarks 的做用,存儲的是各個從節點當前已成功追加的日誌序號。例如一個三節點的 DLedger 集羣,peerWaterMarks 數據存儲大概以下:

{
「dledger_group_01_0」 : 100,
"dledger_group_01_1" : 101,
}

其中 dledger_group_01_0 爲從節點1的ID,當前已複製的序號爲 100,而 dledger_group_01_1 爲節點2的ID,當前已複製的序號爲 101。再加上主節點,如何肯定可提交序號呢?

  • 代碼@1:首先遍歷 peerWaterMarks 的 value 集合,即上述示例中的 {100, 101},用臨時變量 index 來表示待投票的日誌序號,須要集羣內超過半數的節點的已複製序號超過該值,則該日誌能被確認提交。
  • 代碼@2:遍歷 peerWaterMarks 中的全部已提交序號,與當前值進行比較,若是節點的已提交序號大於等於待投票的日誌序號(index),num 加一,表示投同意票。
  • 代碼@3:對 index 進行仲裁,若是超過半數 而且 index 大於 quorumIndex,更新 quorumIndex 的值爲 index。quorumIndex 通過遍歷的,得出當前最大的可提交日誌序號。
  • 代碼@4:更新 committedIndex 索引,方便 DLedgerStore 定時將 committedIndex 寫入 checkpoint 中。
ConcurrentMap<long, timeoutfuture<appendentryresponse>&gt; responses = pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
if (quorumIndex &gt;= 0) {
    for (Long i = quorumIndex; i &gt;= 0; i--) {  // @1
        try {
            CompletableFuture<appendentryresponse> future = responses.remove(i);   // @2
            if (future == null) {                                                                                              // @3
                needCheck = lastQuorumIndex != -1 &amp;&amp; lastQuorumIndex != quorumIndex &amp;&amp; i != lastQuorumIndex;
                break;
            } else if (!future.isDone()) {                                                                                // @4
                AppendEntryResponse response = new AppendEntryResponse();
                response.setGroup(memberState.getGroup());
                response.setTerm(currTerm);
                response.setIndex(i);
                response.setLeaderId(memberState.getSelfId());
                response.setPos(((AppendFuture) future).getPos());
                future.complete(response);
            }
            ackNum++;                                                                                                      // @5
        } catch (Throwable t) {
            logger.error("Error in ack to index={} term={}", i, currTerm, t);
        }
    }
}

Step5:處理 quorumIndex 以前的掛起請求,須要發送響應到客戶端,其實現步驟:

  • 代碼@1:從 quorumIndex 開始處理,沒處理一條,該序號減一,直到大於0或主動退出,請看後面的退出邏輯。

  • 代碼@2:responses 中移除該日誌條目的掛起請求。

  • 代碼@3:若是未找到掛起請求,說明前面掛起的請求已經所有處理完畢,準備退出,退出以前再 設置 needCheck 的值,其依據以下(三個條件必須同時知足):

    • 最後一次仲裁的日誌序號不等於-1
    • 而且最後一次不等於本次新仲裁的日誌序號
    • 最後一次仲裁的日誌序號不等於最後一次仲裁的日誌。正常狀況一下,條件1、條件二一般爲true,但這一條大機率會返回false。
  • 代碼@4:向客戶端返回結果。

  • 代碼@5:ackNum,表示本次確認的數量。

if (ackNum == 0) {
    for (long i = quorumIndex + 1; i &lt; Integer.MAX_VALUE; i++) {
        TimeoutFuture<appendentryresponse> future = responses.get(i);
        if (future == null) {
            break;
        } else if (future.isTimeOut()) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
            response.setTerm(currTerm);
            response.setIndex(i);
            response.setLeaderId(memberState.getSelfId());
            future.complete(response);
        } else {
            break;
        }
    }
    waitForRunning(1);
}

Step6:若是本次確認的個數爲0,則嘗試去判斷超過該仲裁序號的請求,是否已經超時,若是已超時,則返回超時響應結果。

if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) &gt; 1000 || needCheck) {
    updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
    for (Map.Entry<long, timeoutfuture<appendentryresponse>&gt; futureEntry : responses.entrySet()) {
        if (futureEntry.getKey() &lt; quorumIndex) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(memberState.getGroup());
            response.setTerm(currTerm);
            response.setIndex(futureEntry.getKey());
            response.setLeaderId(memberState.getSelfId());
            response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
            futureEntry.getValue().complete(response);
            responses.remove(futureEntry.getKey());
        }
    }
    lastCheckLeakTimeMs = System.currentTimeMillis();
}

Step7:檢查是否發送泄漏。其判斷泄漏的依據是若是掛起的請求的日誌序號小於已提交的序號,則移除。

Step8:一第二天志仲裁就結束了,最後更新 lastQuorumIndex 爲本次仲裁的的新的提交值。

關於 DLedger 的日誌複製部分就介紹到這裏了。本文篇幅較長,看到這裏的各位親愛的讀者朋友們,麻煩點個贊,謝謝。


> 做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。

</long,></appendentryresponse></appendentryresponse></long,></string,></long,></pushentryrequest,></pushentryresponse></pushentryresponse></pushentryrequest,></pushentryresponse></pushentryrequest,></pushentryrequest,></pushentryrequest,></pushentryresponse></pushentryresponse></pair<pushentryrequest,></long,></pushentryresponse></pushentryresponse></long,></pushentryrequest.type></string,></long,></long,>

相關文章
相關標籤/搜索