源碼分析 RocketMQ DLedger 多副本即主從切換實現原理

DLedger 基於 raft 協議,故自然支持主從切換,即主節點(Leader)發生故障,會從新觸發選主,在集羣內再選舉出新的主節點。java

RocketMQ 中主從同步,從節點不只會從主節點同步數據,也會同步元數據,包含 topic 路由信息、消費進度、延遲隊列處理隊列、消費組訂閱配置等信息。那主從切換後元數據如何同步呢?特別是主從切換過程當中,對消息消費有多大的影響,會丟失消息嗎?服務器

> 舒適提示:本文假設你們已經對 RocketMQ4.5 版本以前的主從同步實現有必定的瞭解,這部份內容在《RocketMQ技術內幕》一書中有詳細的介紹,你們也能夠參考以下兩篇文章: > 一、 RocketMQ HA機制(主從同步) 。 > 二、RocketMQ 整合 DLedger(多副本)即主從切換實現平滑升級的設計技巧微信

一、BrokerController 中與主從相關的方法詳解

本節先對 BrokerController 中與主從切換相關的方法。併發

1.1 startProcessorByHa

BrokerController#startProcessorByHaide

private void startProcessorByHa(BrokerRole role) {
    if (BrokerRole.SLAVE != role) {
        if (this.transactionalMessageCheckService != null) {
            this.transactionalMessageCheckService.start();
        }
    }
}

感受該方法的取名較爲隨意,該方法的做用是開啓事務狀態回查處理器,即當節點爲主節點時,開啓對應的事務狀態回查處理器,對PREPARE狀態的消息發起事務狀態回查請求。源碼分析

1.2 shutdownProcessorByHa

BrokerController#shutdownProcessorByHa學習

private void shutdownProcessorByHa() {
    if (this.transactionalMessageCheckService != null) {
        this.transactionalMessageCheckService.shutdown(true);
    }
}

關閉事務狀態回查處理器,當節點從主節點變動爲從節點後,該方法被調用。this

1.3 handleSlaveSynchronize

BrokerController#handleSlaveSynchronizespa

private void handleSlaveSynchronize(BrokerRole role) {
    if (role == BrokerRole.SLAVE) {   // [@1](https://my.oschina.net/u/1198)
        if (null != slaveSyncFuture) {   
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);   // 
        slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            [@Override](https://my.oschina.net/u/1162528)
            public void run() {
                try {
                    BrokerController.this.slaveSynchronize.syncAll();
                } catch (Throwable e) {
                    log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                }
            }
        }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
    } else {  // @2
        //handle the slave synchronise
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);
    }
}

該方法的主要做用是處理從節點的元數據同步,即從節點向主節點主動同步 topic 的路由信息、消費進度、延遲隊列處理隊列、消費組訂閱配置等信息。.net

代碼@1:若是當前節點的角色爲從節點:

  • 若是上次同步的 future 不爲空,則首先先取消。
  • 而後設置 slaveSynchronize 的 master 地址爲空。不知你們是否與筆者同樣,有一個疑問,從節點的時候,若是將 master 地址設置爲空,那如何同步元數據,那這個值會在何時設置呢?
  • 開啓定時同步任務,每 10s 從主節點同步一次元數據。

代碼@2:若是當前節點的角色爲主節點,則取消定時同步任務並設置 master 的地址爲空。

1.4 changeToSlave

BrokerController#changeToSlave

public void changeToSlave(int brokerId) {
    log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
    //change the role
    brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check       // @1
    messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);                            // @2
    //handle the scheduled service
    try {
        this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);    //  @3
    } catch (Throwable t) {
        log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
    }
    //handle the transactional service
    try {
        this.shutdownProcessorByHa();                                                                    //  @4
    } catch (Throwable t) {
        log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
    }
    //handle the slave synchronise
    handleSlaveSynchronize(BrokerRole.SLAVE);                                               // @5
    try {
        this.registerBrokerAll(true, true, brokerConfig.isForceRegister());              // @6
    } catch (Throwable ignored) {
    }
    log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}

Broker 狀態變動爲從節點。其關鍵實現以下:

  • 設置 brokerId,若是broker的id爲0,則設置爲1,這裏在使用的時候,注意規劃好集羣內節點的 brokerId。
  • 設置 broker 的狀態爲 BrokerRole.SLAVE。
  • 若是是從節點,則關閉定時調度線程(處理 RocketMQ 延遲隊列),若是是主節點,則啓動該線程。
  • 關閉事務狀態回查處理器。
  • 從節點須要啓動元數據同步處理器,即啓動 SlaveSynchronize 定時從主服務器同步元數據。
  • 當即向集羣內全部的 nameserver 告知 broker 信息狀態的變動。

1.5 changeToMaster

BrokerController#changeToMaster

public void changeToMaster(BrokerRole role) {
    if (role == BrokerRole.SLAVE) {
        return;
    }
    log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
    //handle the slave synchronise
    handleSlaveSynchronize(role);   // @1
    //handle the scheduled service
    try {
        this.messageStore.handleScheduleMessageService(role);      // @2
    } catch (Throwable t) {
        log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
    }
    //handle the transactional service
    try {
        this.startProcessorByHa(BrokerRole.SYNC_MASTER);         // @3
    } catch (Throwable t) {
        log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
    }
    //if the operations above are totally successful, we change to master
    brokerConfig.setBrokerId(0); //TO DO check                              // @4
    messageStoreConfig.setBrokerRole(role);                               
    try {
        this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); // @5
    } catch (Throwable ignored) {
    }
    log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}

該方法是 Broker 角色從從節點變動爲主節點的處理邏輯,其實現要點以下:

  • 關閉元數據同步器,由於主節點無需同步。
  • 開啓定時任務處理線程。
  • 開啓事務狀態回查處理線程。
  • 設置 brokerId 爲 0。
  • 向 nameserver 當即發送心跳包以便告知 broker 服務器當前最新的狀態。

主從節點狀態變動的核心方法就介紹到這裏了,接下來看看如何觸發主從切換。

二、如何觸發主從切換

從前面的文章咱們能夠得知,RocketMQ DLedger 是基於 raft 協議實現的,在該協議中就實現了主節點的選舉與主節點失效後集羣會自動進行從新選舉,通過協商投票產生新的主節點,從而實現高可用。

BrokerController#initialize

if (messageStoreConfig.isEnableDLegerCommitLog()) {
    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}

上述代碼片斷截取自 BrokerController 的 initialize 方法,咱們能夠得知在 Broker 啓動時,若是開啓了 多副本機制,即 enableDLedgerCommitLog 參數設置爲 true,會爲 集羣節點選主器添加 roleChangeHandler 事件處理器,即節點發送變動後的事件處理器。

接下來咱們將重點探討 DLedgerRoleChangeHandler 。

2.1 類圖

在這裏插入圖片描述

DLedgerRoleChangeHandler 繼承自 RoleChangeHandler,即節點狀態發生變動後的事件處理器。上述的屬性都很簡單,在這裏就重點介紹一下 ExecutorService executorService,事件處理線程池,但只會開啓一個線程,故事件將一個一個按順序執行。

接下來咱們來重點看一下 handle 方法的執行。

2.2 handle 主從狀態切換處理邏輯

DLedgerRoleChangeHandler#handle

public void handle(long term, MemberState.Role role) {
    Runnable runnable = new Runnable() {
        public void run() {
            long start = System.currentTimeMillis();
            try {
                boolean succ = true;
                log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
                switch (role) {
                    case CANDIDATE:    // @1
                        if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
                            brokerController.changeToSlave(dLedgerCommitLog.getId());
                        }
                        break;
                    case FOLLOWER:         // @2
                        brokerController.changeToSlave(dLedgerCommitLog.getId());
                        break;
                    case LEADER:           // @3
                        while (true) {
                            if (!dLegerServer.getMemberState().isLeader()) {
                                succ = false;
                                break;
                            }
                            if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
                                break;
                            }
                            if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
                                && messageStore.dispatchBehindBytes() == 0) {
                                break;
                            }
                            Thread.sleep(100);
                        }
                        if (succ) {
                            messageStore.recoverTopicQueueTable();
                            brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
                        }
                        break;
                    default:
                        break;
                }
                log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
            } catch (Throwable t) {
                log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
            }
        }
    };
    executorService.submit(runnable);
}

代碼@1:若是當前節點狀態機狀態爲 CANDIDATE,表示正在發起 Leader 節點,若是該服務器的角色不是 SLAVE 的話,須要將狀態切換爲 SLAVE。

代碼@2:若是當前節點狀態機狀態爲 FOLLOWER,broker 節點將轉換爲 從節點。

代碼@3:若是當前節點狀態機狀態爲 Leader,說明該節點被選舉爲 Leader,在切換到 Master 節點以前,首先須要等待當前節點追加的數據都已經被提交後才能夠將狀態變動爲 Master,其關鍵實現以下:

  • 若是 ledgerEndIndex 爲 -1,表示當前節點還未又數據轉發,直接跳出循環,無需等待。
  • 若是 ledgerEndIndex 不爲 -1 ,則必須等待數據都已提交,即 ledgerEndIndex 與 committedIndex 相等。
  • 而且須要等待 commitlog 日誌所有已轉發到 consumequeue中,即 ReputMessageService 中的 reputFromOffset 與 commitlog 的 maxOffset 相等。

等待上述條件知足後,便可以進行狀態的變動,須要恢復 ConsumeQueue,維護每個 queue 對應的 maxOffset,而後將 broker 角色轉變爲 master。

通過上面的步驟,就能實時完成 broker 主節點的自動切換。因爲單從代碼的角度來看主從切換不夠直觀,下面我將給出主從切換的流程圖。

2.3 主從切換流程圖

因爲從源碼的角度或許不夠直觀,故本節給出其流程圖。

> 舒適提示:該流程圖的前半部分在 源碼分析 RocketMQ 整合 DLedger(多副本)實現平滑升級的設計技巧 該文中有所闡述。

在這裏插入圖片描述

三、主從切換若干問題思考

我相信通過上面的講解,你們應該對主從切換的實現原理有了一個比較清晰的理解,我更相信讀者朋友們會拋出一個疑問,主從切換會不會丟失消息,消息消費進度是否會丟失而致使重複消費呢?

3.1 消息消費進度是否存在丟失風險

首先,因爲 RocketMQ 元數據,固然也包含消息消費進度的同步是採用的從服務器定時向主服務器拉取進行更新,存在時延,引入 DLedger 機制,也並不保證其一致性,DLedger 只保證 commitlog 文件的一致性。

當主節點宕機後,各個從節點並不會完成同步了消息消費進度,於此同時,消息消費繼續,此時消費者會繼續從從節點拉取消息進行消費,但彙報的從節點並不必定會成爲新的主節點,故消費進度在 broker 端存在丟失的可能性。固然並非必定會丟失,由於消息消費端只要不重啓,消息消費進度會存儲在內存中。

綜合所述,消息消費進度在 broker 端會有丟失的可能性,存在重複消費的可能性,不過問題不大,由於 RocketMQ 自己也不承若不會重複消費。

3.2 消息是否存在丟失風險

消息會不會丟失的關鍵在於,日誌複製進度較慢的從節點是否能夠被選舉爲主節點,若是在一個集羣中,從節點的複製進度落後與從主節點,但當主節點宕機後,若是該從節點被選舉成爲新的主節點,那這將是一個災難,將會丟失數據。關於一個節點是否給另一個節點投同意票的邏輯在 源碼分析 RocketMQ DLedger 多副本之 Leader 選主 的 2.4.2 handleVote 方法中已詳細介紹,在這裏我以截圖的方式再展現其核心點: 在這裏插入圖片描述 在這裏插入圖片描述

從上面能夠得知,若是發起投票節點的複製進度比本身小的話,會投拒絕票。

在這裏插入圖片描述

在這裏插入圖片描述

必須獲得集羣內超過半數節點承認,即最終選舉出來的主節點的當前複製進度必定是比絕大多數的從節點要大,而且也會等於承偌給客戶端的已提交偏移量。故得出的結論是不會丟消息。

本文的介紹就到此爲止了,最後拋出一個思考題與你們相互交流學習,也算是對 DLedger 多副本即主從切換一個總結回顧。答案我會以留言的方式或在下一篇文章中給出。

四、思考題

例如一個集羣內有5個節點的 DLedgr 集羣。 Leader Node: n0-broker-a folloer Node: n1-broker-a,n2-broker-a,n3-broker-a,n4-broker-a

從節點的複製進度可能不一致,例如: n1-broker-a複製進度爲 100 n2-broker-a複製進度爲 120 n3-broker-a複製進度爲 90 n4-broker-a負載進度爲 90

若是此時 n0-broker-a 節點宕機,觸發選主,若是 n1率先發起投票,因爲 n1,的複製進度大於 n3,n4,再加上本身一票,是有可能成爲leader的,此時消息會丟失嗎?爲何?

歡迎你們以留言的方式進行交流,也能夠加我微信號 dingwpmz 與我進行探討,最後若是這篇文章對你們有所幫助的話,麻煩點個贊,謝謝你們。


> 做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,重點關注JAVA集合、JAVA併發包、Netty、Dubbo、RocketMQ、Mybatis、Elasticsearch、Netty。可掃描以下二維碼與做者進行互動。

相關文章
相關標籤/搜索