舒適提示:《RocketMQ技術內幕》做者傾力打造的全新專欄:RocketMQ 多副本(主從切換): 一、《RocketMQ 多副本前置篇:初探raft協議》java
本文將按照《RocketMQ 多副本前置篇:初探raft協議》的思路來學習RocketMQ選主邏輯。首先先回顧一下關於Leader的一些思考:服務器
- 節點狀態 須要引入3種節點狀態:Follower(跟隨者)、Candidate(候選者),該狀態下的節點會發起投票請求,Leader(主節點)。
- 選舉計時器 Follower、Candidate兩個狀態時,須要維護一個定時器,每次定時時間從150ms-300ms直接進行隨機,即每一個節點的定時過時不同,Follower狀態時,定時器到點後,觸發一輪投票。節點在收到投票請求、Leader的心跳請求並做出響應後,須要重置定時器。
- 投票輪次Team Candidate狀態的節點,每發起一輪投票,Team加一。
- 投票機制 每一輪一個節點只能爲一個節點投同意票,例如節點A中維護的輪次爲3,而且已經爲節點B投了同意票,若是收到其餘節點,投票輪次爲3,則會投反對票,若是收到輪次爲4的節點,是又能夠投同意票的。
- 成爲Leader的條件 必須獲得集羣中初始數量的大多數,例如若是集羣中有3臺,則必須獲得兩票,若是其中一臺服務器宕機,剩下的兩個節點,還能進行選主嗎?答案是能夠的,由於能夠獲得2票,超過初始集羣中3的一半,因此一般集羣中的機器各位儘可能爲奇數,由於4臺的可用性與3臺的同樣。
舒適提示:本文是從源碼的角度分析 DLedger 選主實現原理,可能比較鼓譟,文末給出了選主流程圖。網絡
@TOC架構
一、DLedger關於選主的核心類圖
1.1 DLedgerConfig
多副本模塊相關的配置信息,例如集羣節點信息。併發
1.2 MemberState
節點狀態機,即raft協議中的follower、candidate、leader三種狀態的狀態機實現。app
1.3 raft協議相關
1.3.1 DLedgerClientProtocol
DLedger客戶端協議,主要定義以下三個方法,在後面的日誌複製部分會重點闡述。dom
- CompletableFuture< GetEntriesResponse> get(GetEntriesRequest request) 客戶端從服務器獲取日誌條目(獲取數據)
- CompletableFuture< AppendEntryResponse> append(AppendEntryRequest request) 客戶端向服務器追加日誌(存儲數據)
- CompletableFuture< MetadataResponse> metadata(MetadataRequest request) 獲取元數據。
1.3.2 DLedgerProtocol
DLedger服務端協議,主要定義以下三個方法。異步
- CompletableFuture< VoteResponse> vote(VoteRequest request) 發起投票請求。
- CompletableFuture< HeartBeatResponse> heartBeat(HeartBeatRequest request) Leader向從節點發送心跳包。
- CompletableFuture< PullEntriesResponse> pull(PullEntriesRequest request) 拉取日誌條目,在日誌複製部分會詳細介紹。
- CompletableFuture< PushEntryResponse> push(PushEntryRequest request) 推送日誌條件,在日誌複製部分會詳細介紹。
1.3.3 協議處理Handler
DLedgerClientProtocolHandler、DLedgerProtocolHander協議處理器。async
1.4 DLedgerRpcService
DLedger Server(節點)之間的網絡通訊,默認基於Netty實現,其實現類爲:DLedgerRpcNettyService。分佈式
1.5 DLedgerLeaderElector
Leader選舉實現器。
1.6 DLedgerServer
Dledger Server,Dledger節點的封裝類。
接下來將從DLedgerLeaderElector開始剖析DLedger是如何實現Leader選舉的。(基於raft協議)。
二、源碼分析Leader選舉
2.1 DLedgerLeaderElector 類圖
咱們先一一來介紹其屬性的含義:
- Random random 隨機數生成器,對應raft協議中選舉超時時間是一隨機數。
- DLedgerConfig dLedgerConfig 配置參數。
- MemberState memberState 節點狀態機。
- DLedgerRpcService dLedgerRpcService rpc服務,實現向集羣內的節點發送心跳包、投票的RPC實現。 l- ong lastLeaderHeartBeatTime 上次收到心跳包的時間戳。
- long lastSendHeartBeatTime 上次發送心跳包的時間戳。
- long lastSuccHeartBeatTime 上次成功收到心跳包的時間戳。
- int heartBeatTimeIntervalMs 一個心跳包的週期,默認爲2s。
- int maxHeartBeatLeak 容許最大的N個心跳週期內未收到心跳包,狀態爲Follower的節點只有超過 maxHeartBeatLeak * heartBeatTimeIntervalMs 的時間內未收到主節點的心跳包,纔會從新進入 Candidate 狀態,從新下一輪的選舉。
- long nextTimeToRequestVote 發送下一個心跳包的時間戳。
- boolean needIncreaseTermImmediately 是否應該當即發起投票。
- int minVoteIntervalMs 最小的發送投票間隔時間,默認爲300ms。
- int maxVoteIntervalMs 最大的發送投票的間隔,默認爲1000ms。
- List< RoleChangeHandler> roleChangeHandlers 註冊的節點狀態處理器,經過 addRoleChangeHandler 方法添加。
- long lastVoteCost 上一次投票的開銷。
- StateMaintainer stateMaintainer 狀態機管理器。
2.2 啓動選舉狀態管理器
經過 DLedgerLeaderElector 的 startup 方法啓動狀態管理機,代碼以下: DLedgerLeaderElector#startup
public void startup() { stateMaintainer.start(); // @1 for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) { // @2 roleChangeHandler.startup(); } }
代碼@1:啓動狀態維護管理器。
代碼@2:遍歷狀態改變監聽器並啓動它,可經過DLedgerLeaderElector 的 addRoleChangeHandler 方法增長狀態變化監聽器。
其中的是啓動狀態管理器線程,其run方法實現:
public void run() { while (running.get()) { try { doWork(); } catch (Throwable t) { if (logger != null) { logger.error("Unexpected Error in running {} ", getName(), t); } } } latch.countDown(); }
從上面來看,主要是循環調用doWork方法,接下來重點看其doWork的實現:
public void doWork() { try { if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) { // @1 DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig); // @2 DLedgerLeaderElector.this.maintainState(); // @3 } sleep(10); // @4 } catch (Throwable t) { DLedgerLeaderElector.logger.error("Error in heartbeat", t); } }
代碼@1:若是該節點參與Leader選舉,則首先調用@2重置定時器,而後驅動狀態機(@3),是接下來重點須要剖析的。
代碼@4:沒執行一次選主,休息10ms。
DLedgerLeaderElector#maintainState
private void maintainState() throws Exception { if (memberState.isLeader()) { maintainAsLeader(); } else if (memberState.isFollower()) { maintainAsFollower(); } else { maintainAsCandidate(); } }
根據當前的狀態機狀態,執行對應的操做,從raft協議中可知,總共存在3種狀態:
- leader 領導者,主節點,該狀態下,須要定時向從節點發送心跳包,用來傳播數據、確保其領導地位。
- follower 從節點,該狀態下,會開啓定時器,嘗試進入到candidate狀態,以便發起投票選舉,同時一旦收到主節點的心跳包,則重置定時器。
- candidate 候選者,該狀態下的節點會發起投票,嘗試選擇本身爲主節點,選舉成功後,不會存在該狀態下的節點。
咱們在繼續往下看以前,須要知道 memberState 的初始值是什麼?咱們追溯到建立 MemberState 的地方,發現其初始狀態爲 CANDIDATE。那咱們接下從 maintainAsCandidate 方法開始跟進。
舒適提示:在raft協議中,節點的狀態默認爲follower,DLedger的實現從candidate開始,一開始,集羣內的全部節點都會嘗試發起投票,這樣第一輪要達成選舉幾乎不太可能。
2.3 選舉狀態機狀態流轉
整個狀態機的驅動,由線程反覆執行maintainState方法。下面重點來分析其狀態的驅動。
2.3.1 maintainAsCandidate 方法
DLedgerLeaderElector#maintainAsCandidate
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) { return; } long term; long ledgerEndTerm; long ledgerEndIndex;
Step1:首先先介紹幾個變量的含義:
- nextTimeToRequestVote 下一次發發起的投票的時間,若是當前時間小於該值,說明計時器未過時,此時無需發起投票。
- needIncreaseTermImmediately 是否應該當即發起投票。若是爲true,則忽略計時器,該值默認爲false,當收到從主節點的心跳包而且當前狀態機的輪次大於主節點的輪次,說明集羣中Leader的投票輪次小於從幾點的輪次,應該當即發起新的投票。
- term 投票輪次。
- ledgerEndTerm Leader節點當前的投票輪次。
- ledgerEndIndex 當前日誌的最大序列,即下一條日誌的開始index,在日誌複製部分會詳細介紹。
DLedgerLeaderElector#maintainAsCandidate
synchronized (memberState) { if (!memberState.isCandidate()) { return; } if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) { long prevTerm = memberState.currTerm(); term = memberState.nextTerm(); logger.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term); lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; } else { term = memberState.currTerm(); } ledgerEndIndex = memberState.getLedgerEndIndex(); ledgerEndTerm = memberState.getLedgerEndTerm(); }
Step2:初始化team、ledgerEndIndex 、ledgerEndTerm 屬性,其實現關鍵點以下:
- 若是上一次的投票結果爲待下一次投票或應該當即開啓投票,而且根據當前狀態機獲取下一輪的投票輪次,稍後會着重講解一下狀態機輪次的維護機制。
- 若是上一次的投票結果不是WAIT_TO_VOTE_NEXT(等待下一輪投票),則投票輪次依然爲狀態機內部維護的輪次。
DLedgerLeaderElector#maintainAsCandidate
if (needIncreaseTermImmediately) { nextTimeToRequestVote = getNextTimeToRequestVote(); needIncreaseTermImmediately = false; return; }
Step3:若是needIncreaseTermImmediately爲true,則重置該標記位爲false,並從新設置下一次投票超時時間,其實現代碼以下:
private long getNextTimeToRequestVote() { return System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs); }
下一次倒計時:當前時間戳 + 上次投票的開銷 + 最小投票間隔(300ms) + (1000- 300 )之間的隨機值。
final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
Step4:向集羣內的其餘節點發起投票請,並返回投票結果列表,稍後會重點分析其投票過程。能夠預見,接下來就是根據各投票結果進行仲裁。
final AtomicLong knownMaxTermInGroup = new AtomicLong(-1); final AtomicInteger allNum = new AtomicInteger(0); final AtomicInteger validNum = new AtomicInteger(0); final AtomicInteger acceptedNum = new AtomicInteger(0); final AtomicInteger notReadyTermNum = new AtomicInteger(0); final AtomicInteger biggerLedgerNum = new AtomicInteger(0); final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
Step5:在進行投票結果仲裁以前,先來介紹幾個局部變量的含義:
- knownMaxTermInGroup 已知的最大投票輪次。
- allNum 全部投票票數。
- validNum 有效投票數。
- acceptedNum 得到的投票數。
- notReadyTermNum 未準備投票的節點數量,若是對端節點的投票輪次小於發起投票的輪次,則認爲對端未準備好,對端節點使用本次的輪次進入 - Candidate 狀態。
- biggerLedgerNum 發起投票的節點的ledgerEndTerm小於對端節點的個數。
- alreadyHasLeader 是否已經存在Leader。
for (CompletableFuture<VoteResponse> future : quorumVoteResponses) { // 省略部分代碼 }
Step5:遍歷投票結果,收集投票結果,接下來重點看其內部實現。
if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) { validNum.incrementAndGet(); }
Step6:若是投票結果不是UNKNOW,則有效投票數量增1。
synchronized (knownMaxTermInGroup) { switch (x.getVoteResult()) { case ACCEPT: acceptedNum.incrementAndGet(); break; case REJECT_ALREADY_VOTED: break; case REJECT_ALREADY_HAS_LEADER: alreadyHasLeader.compareAndSet(false, true); break; case REJECT_TERM_SMALL_THAN_LEDGER: case REJECT_EXPIRED_VOTE_TERM: if (x.getTerm() > knownMaxTermInGroup.get()) { knownMaxTermInGroup.set(x.getTerm()); } break; case REJECT_EXPIRED_LEDGER_TERM: case REJECT_SMALL_LEDGER_END_INDEX: biggerLedgerNum.incrementAndGet(); break; case REJECT_TERM_NOT_READY: notReadyTermNum.incrementAndGet(); break; default: break; } }
Step7:統計投票結構,幾個關鍵點以下:
- ACCEPT 同意票,acceptedNum加一,只有獲得的同意票超過集羣節點數量的一半才能成爲Leader。
- REJECT_ALREADY_VOTED 拒絕票,緣由是已經投了其餘節點的票。
- REJECT_ALREADY_HAS_LEADER 拒絕票,緣由是由於集羣中已經存在Leaer了。alreadyHasLeader設置爲true,無需在判斷其餘投票結果了,結束本輪投票。
- REJECT_TERM_SMALL_THAN_LEDGER 拒絕票,若是本身維護的term小於遠端維護的ledgerEndTerm,則返回該結果,若是對端的team大於本身的team,須要記錄對端最大的投票輪次,以便更新本身的投票輪次。
- REJECT_EXPIRED_VOTE_TERM 拒絕票,若是本身維護的term小於遠端維護的term,更新本身維護的投票輪次。
- REJECT_EXPIRED_LEDGER_TERM 拒絕票,若是本身維護的 ledgerTerm小於對端維護的ledgerTerm,則返回該結果。若是是此種狀況,增長計數器- biggerLedgerNum的值。
- REJECT_SMALL_LEDGER_END_INDEX 拒絕票,若是對端的ledgerTeam與本身維護的ledgerTeam相等,可是本身維護的dedgerEndIndex小於對端維護的值,返回該值,增長biggerLedgerNum計數器的值。
- REJECT_TERM_NOT_READY 拒絕票,對端的投票輪次小於本身的team,則認爲對端還未準備好投票,對端使用本身的投票輪次,是本身進入到Candidate狀態。
try { voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS); } catch (Throwable ignore) { }
Step8:等待收集投票結果,並設置超時時間。
lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs); VoteResponse.ParseResult parseResult; if (knownMaxTermInGroup.get() > term) { parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT; nextTimeToRequestVote = getNextTimeToRequestVote(); changeRoleToCandidate(knownMaxTermInGroup.get()); } else if (alreadyHasLeader.get()) { parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT; nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak; } else if (!memberState.isQuorum(validNum.get())) { parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; nextTimeToRequestVote = getNextTimeToRequestVote(); } else if (memberState.isQuorum(acceptedNum.get())) { parseResult = VoteResponse.ParseResult.PASSED; } else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) { parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY; } else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) { parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; nextTimeToRequestVote = getNextTimeToRequestVote(); } else { parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT; nextTimeToRequestVote = getNextTimeToRequestVote(); }
Step9:根據收集的投票結果判斷是否能成爲Leader。
舒適提示:在講解關鍵點以前,咱們先定義先將(當前時間戳 + 上次投票的開銷 + 最小投票間隔(300ms) + (1000- 300 )之間的隨機值)定義爲「 1個常規計時器」。
其關鍵點以下:
- 若是對端的投票輪次大於發起投票的節點,則該節點使用對端的輪次,從新進入到Candidate狀態,而且重置投票計時器,其值爲「1個常規計時器」
- 若是已經存在Leader,該節點從新進入到Candidate,並重置定時器,該定時器的時間: 「1個常規計時器」 + heartBeatTimeIntervalMs * maxHeartBeatLeak ,其中 heartBeatTimeIntervalMs 爲一次心跳間隔時間, maxHeartBeatLeak 爲 容許最大丟失的心跳包,即若是Flower節點在多少個心跳週期內未收到心跳包,則認爲Leader已下線。
- 若是收到的有效票數未超過半數,則重置計時器爲「 1個常規計時器」,而後等待從新投票,注意狀態爲WAIT_TO_REVOTE,該狀態下的特徵是下次投票時不增長投票輪次。
- 若是獲得的贊同票超過半數,則成爲Leader。
- 若是獲得的同意票加上未準備投票的節點數超過半數,則應該當即發起投票,故其結果爲REVOTE_IMMEDIATELY。
- 若是獲得的同意票加上對端維護的ledgerEndIndex超過半數,則重置計時器,繼續本輪次的選舉。
- 其餘狀況,開啓下一輪投票。
if (parseResult == VoteResponse.ParseResult.PASSED) { logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term); changeRoleToLeader(term); }
Step10:若是投票成功,則狀態機狀態設置爲Leader,而後狀態管理在驅動狀態時會調用DLedgerLeaderElector#maintainState時,將進入到maintainAsLeader方法。
2.3.2 maintainAsLeader 方法
通過maintainAsCandidate 投票選舉後,被其餘節點選舉成爲領導後,會執行該方法,其餘節點的狀態仍是Candidate,並在計時器過時後,又嘗試去發起選舉。接下來重點分析成爲Leader節點後,該節點會作些什麼?
DLedgerLeaderElector#maintainAsLeader
private void maintainAsLeader() throws Exception { if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) { // @1 long term; String leaderId; synchronized (memberState) { if (!memberState.isLeader()) { // @2 //stop sending return; } term = memberState.currTerm(); leaderId = memberState.getLeaderId(); lastSendHeartBeatTime = System.currentTimeMillis(); // @3 } sendHeartbeats(term, leaderId); // @4 } }
代碼@1:首先判斷上一次發送心跳的時間與當前時間的差值是否大於心跳包發送間隔,若是超過,則說明須要發送心跳包。
代碼@2:若是當前不是leader節點,則直接返回,主要是爲了二次判斷。
代碼@3:重置心跳包發送計時器。
代碼@4:向集羣內的全部節點發送心跳包,稍後會詳細介紹心跳包的發送。
2.3.3 maintainAsFollower方法
當 Candidate 狀態的節點在收到主節點發送的心跳包後,會將狀態變動爲follower,那咱們先來看一下在follower狀態下,節點會作些什麼事情?
private void maintainAsFollower() { if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) { synchronized (memberState) { if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) { logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId()); changeRoleToCandidate(memberState.currTerm()); } } } }
若是maxHeartBeatLeak (默認爲3)個心跳包週期內未收到心跳,則將狀態變動爲Candidate。
狀態機的驅動就介紹到這裏,在上面的流程中,其實咱們忽略了兩個重要的過程,一個是發起投票請求與投票請求響應、發送心跳包與心跳包響應,那咱們接下來將重點介紹這兩個過程。
2.4 投票與投票請求
節點的狀態爲 Candidate 時會向集羣內的其餘節點發起投票請求(我的以爲理解爲拉票更好),向對方詢問是否願意選舉我爲Leader,對端節點會根據本身的狀況對其投同意票、拒絕票,若是是拒絕票,還會給出拒絕緣由,具體由voteForQuorumResponses、handleVote 這兩個方法來實現,接下來咱們分別對這兩個方法進行詳細分析。
2.4.1 voteForQuorumResponses
發起投票請求。
private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term, long ledgerEndTerm, long ledgerEndIndex) throws Exception { // @1 List<CompletableFuture<VoteResponse>> responses = new ArrayList<>(); for (String id : memberState.getPeerMap().keySet()) { // @2 VoteRequest voteRequest = new VoteRequest(); // @3 start voteRequest.setGroup(memberState.getGroup()); voteRequest.setLedgerEndIndex(ledgerEndIndex); voteRequest.setLedgerEndTerm(ledgerEndTerm); voteRequest.setLeaderId(memberState.getSelfId()); voteRequest.setTerm(term); voteRequest.setRemoteId(id); CompletableFuture<VoteResponse> voteResponse; // @3 end if (memberState.getSelfId().equals(id)) { // @4 voteResponse = handleVote(voteRequest, true); } else { //async voteResponse = dLedgerRpcService.vote(voteRequest); // @5 } responses.add(voteResponse); } return responses; }
代碼@1:首先先解釋一下參數的含義:
- long term 發起投票的節點當前的投票輪次。
- long ledgerEndTerm 發起投票節點維護的已知的最大投票輪次。
- long ledgerEndIndex 發起投票節點維護的已知的最大日誌條目索引。
代碼@2:遍歷集羣內的節點集合,準備異步發起投票請求。這個集合在啓動的時候指定,不能修改。
代碼@3:構建投票請求。
代碼@4:若是是發送給本身的,則直接調用handleVote進行投票請求響應,若是是發送給集羣內的其餘節點,則經過網絡發送投票請求,對端節點調用各自的handleVote對集羣進行響應。
接下來重點關注 handleVote 方法,重點探討其投票處理邏輯。
2.4.2 handleVote 方法
因爲handleVote 方法會併發被調用,由於可能同時收到多個節點的投票請求,故本方法都被synchronized方法包含,鎖定的對象爲狀態機 memberState 對象。
if (!memberState.isPeerMember(request.getLeaderId())) { logger.warn("[BUG] [HandleVote] remoteId={} is an unknown member", request.getLeaderId()); return CompletableFuture.completedFuture(newVoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER)); } if (!self && memberState.getSelfId().equals(request.getLeaderId())) { logger.warn("[BUG] [HandleVote] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId()); return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER)); }
Step1:爲了邏輯的完整性對其請求進行檢驗,除非有BUG存在,不然是不會出現上述問題的。
if (request.getTerm() < memberState.currTerm()) { // @1 return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM)); } else if (request.getTerm() == memberState.currTerm()) { // @2 if (memberState.currVoteFor() == null) { //let it go } else if (memberState.currVoteFor().equals(request.getLeaderId())) { //repeat just let it go } else { if (memberState.getLeaderId() != null) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY__HAS_LEADER)); } else { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED)); } } } else { // @3 //stepped down by larger term changeRoleToCandidate(request.getTerm()); needIncreaseTermImmediately = true; //only can handleVote when the term is consistent return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY)); }
Step2:判斷髮起節點、響應節點維護的team進行投票「仲裁」,分以下3種狀況討論:
- 若是發起投票節點的 term 小於當前節點的 term 此種狀況下投拒絕票,也就是說在 raft 協議的世界中,誰的 term 越大,越有話語權。
- 若是發起投票節點的 term 等於當前節點的 term 若是二者的 term 相等,說明二者都處在同一個投票輪次中,地位平等,接下來看該節點是否已經投過票。
- 若是未投票、或已投票給請求節點,則繼續後面的邏輯(請看step3)。
- 若是該節點已存在的Leader節點,則拒絕並告知已存在Leader節點。
- 若是該節點還未有Leader節點,但已經投了其餘節點的票,則拒絕請求節點,並告知已投票。
- 若是發起投票節點的 term 大於當前節點的 term 拒絕請求節點的投票請求,並告知自身還未準備投票,自身會使用請求節點的投票輪次當即進入到Candidate狀態。
if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM)); } else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX)); } if (request.getTerm() < memberState.getLedgerEndTerm()) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER)); }
Step3:判斷請求節點的 ledgerEndTerm 與當前節點的 ledgerEndTerm,這裏主要是判斷日誌的複製進度。
- 若是請求節點的 ledgerEndTerm 小於當前節點的 ledgerEndTerm 則拒絕,其緣由是請求節點的日誌複製進度比當前節點低,這種狀況是不能成爲主節點的。
- 若是 ledgerEndTerm 相等,可是 ledgerEndIndex 比當前節點小,則拒絕,緣由與上一條相同。
- 若是請求的 term 小於 ledgerEndTerm 以一樣的理由拒絕。
memberState.setCurrVoteFor(request.getLeaderId()); return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
Step4:通過層層條件帥選,將寶貴的同意票投給請求節點。
通過幾輪投票,最終一個節點能成功被推舉出來,選爲主節點。主節點爲了維持其領導地位,須要定時向從節點發送心跳包,接下來咱們重點看一下心跳包的發送與響應。
2.5 心跳包與心跳包響應
2.5.1 sendHeartbeats
Step1:遍歷集羣中的節點,異步發送心跳包。
CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest); future.whenComplete((HeartBeatResponse x, Throwable ex) -> { try { if (ex != null) { throw ex; } switch (DLedgerResponseCode.valueOf(x.getCode())) { case SUCCESS: succNum.incrementAndGet(); break; case EXPIRED_TERM: maxTerm.set(x.getTerm()); break; case INCONSISTENT_LEADER: inconsistLeader.compareAndSet(false, true); break; case TERM_NOT_READY: notReadyNum.incrementAndGet(); break; default: break; } if (memberState.isQuorum(succNum.get()) || memberState.isQuorum(succNum.get() + notReadyNum.get())) { beatLatch.countDown(); } } catch (Throwable t) { logger.error("Parse heartbeat response failed", t); } finally { allNum.incrementAndGet(); if (allNum.get() == memberState.peerSize()) { beatLatch.countDown(); } } }); }
Step2:統計心跳包發送響應結果,關鍵點以下:
- SUCCESS 心跳包成功響應。
- EXPIRED_TERM 主節點的投票 term 小於從節點的投票輪次。
- INCONSISTENT_LEADER 從節點已經有了新的主節點。
- TERM_NOT_READY 從節點未準備好。
這些響應值,咱們在處理心跳包時重點探討。
beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS); if (memberState.isQuorum(succNum.get())) { // @1 lastSuccHeartBeatTime = System.currentTimeMillis(); } else { logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}", memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime)); if (memberState.isQuorum(succNum.get() + notReadyNum.get())) { // @2 lastSendHeartBeatTime = -1; } else if (maxTerm.get() > term) { // @3 changeRoleToCandidate(maxTerm.get()); } else if (inconsistLeader.get()) { // @4 changeRoleToCandidate(term); } else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) { changeRoleToCandidate(term); } }
對收集的響應結果作仲裁,其實現關鍵點:
- 若是成功的票數大於進羣內的半數,則表示集羣狀態正常,正常按照心跳包間隔發送心跳包(見代碼@1)。
- 若是成功的票數加上未準備的投票的節點數量超過集羣內的半數,則當即發送心跳包(見代碼@2)。
- 若是從節點的投票輪次比主節點的大,則使用從節點的投票輪次,或從節點已經有了另外的主節點,節點狀態從 Leader 轉換爲 Candidate。
接下來咱們重點看一下心跳包的處理邏輯。
2.5.2 handleHeartBeat
if (request.getTerm() < memberState.currTerm()) { return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode())); } else if (request.getTerm() == memberState.currTerm()) { if (request.getLeaderId().equals(memberState.getLeaderId())) { lastLeaderHeartBeatTime = System.currentTimeMillis(); return CompletableFuture.completedFuture(new HeartBeatResponse()); } }
Step1:若是主節點的 term 小於 從節點的term,發送反饋給主節點,告知主節點的 term 已過期;若是投票輪次相同,而且發送心跳包的節點是該節點的主節點,則返回成功。
下面重點討論主節點的 term 大於從節點的狀況。
synchronized (memberState) { if (request.getTerm() < memberState.currTerm()) { // @1 return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode())); } else if (request.getTerm() == memberState.currTerm()) { // @2 if (memberState.getLeaderId() == null) { changeRoleToFollower(request.getTerm(), request.getLeaderId()); return CompletableFuture.completedFuture(new HeartBeatResponse()); } else if (request.getLeaderId().equals(memberState.getLeaderId())) { lastLeaderHeartBeatTime = System.currentTimeMillis(); return CompletableFuture.completedFuture(new HeartBeatResponse()); } else { //this should not happen, but if happened logger.error("[{}][BUG] currTerm {} has leader {}, but received leader {}", memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId()); return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode())); } } else { //To make it simple, for larger term, do not change to follower immediately //first change to candidate, and notify the state-maintainer thread changeRoleToCandidate(request.getTerm()); needIncreaseTermImmediately = true; //TOOD notify return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode())); } }
Step2:加鎖來處理(這裏更多的是從節點第一次收到主節點的心跳包)
代碼@1:若是主節的投票輪次小於當前投票輪次,則返回主節點投票輪次過時。
代碼@2:若是投票輪次相同。
- 若是當前節點的主節點字段爲空,則使用主節點的ID,並返回成功。
- 若是當前節點的主節點就是發送心跳包的節點,則更新上一次收到心跳包的時間戳,並返回成功。
- 若是從節點的主節點與發送心跳包的節點ID不一樣,說明有另一個Leaer,按道理來講是不會發送的,若是發生,則返回已存在- 主節點,標記該心跳包處理結束。
代碼@3:若是主節點的投票輪次大於從節點的投票輪次,則認爲從節點併爲準備好,則從節點進入Candidate 狀態,並當即發起一次投票。
心跳包的處理就介紹到這裏。
RocketMQ 多副本之 Leader 選舉的源碼分析就介紹到這裏了,爲了增強對源碼的理解,先梳理流程圖以下:
本文就介紹到這裏了,若是對您有必定的幫助,麻煩幫忙點個贊,謝謝。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接加入中間件知識星球 ,一塊兒探討高併發、分佈式服務架構,交流源碼。