舒適提示:《RocketMQ技術內幕》做者傾力打造的全新專欄:RocketMQ 多副本(主從切換): 一、《RocketMQ 多副本前置篇:初探raft協議》java
本文將按照《RocketMQ 多副本前置篇:初探raft協議》的思路來學習RocketMQ選主邏輯。首先先回顧一下關於Leader的一些思考:服務器
舒適提示:本文是從源碼的角度分析 DLedger 選主實現原理,可能比較鼓譟,文末給出了選主流程圖。網絡
@TOC併發
多副本模塊相關的配置信息,例如集羣節點信息。app
節點狀態機,即raft協議中的follower、candidate、leader三種狀態的狀態機實現。dom
DLedger客戶端協議,主要定義以下三個方法,在後面的日誌複製部分會重點闡述。異步
DLedger服務端協議,主要定義以下三個方法。async
DLedgerClientProtocolHandler、DLedgerProtocolHander協議處理器。源碼分析
DLedger Server(節點)之間的網絡通訊,默認基於Netty實現,其實現類爲:DLedgerRpcNettyService。post
Leader選舉實現器。
Dledger Server,Dledger節點的封裝類。
接下來將從DLedgerLeaderElector開始剖析DLedger是如何實現Leader選舉的。(基於raft協議)。
經過 DLedgerLeaderElector 的 startup 方法啓動狀態管理機,代碼以下: DLedgerLeaderElector#startup
public void startup() {
stateMaintainer.start(); // @1
for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) { // @2
roleChangeHandler.startup();
}
}
複製代碼
代碼@1:啓動狀態維護管理器。
代碼@2:遍歷狀態改變監聽器並啓動它,可經過DLedgerLeaderElector 的 addRoleChangeHandler 方法增長狀態變化監聽器。
其中的是啓動狀態管理器線程,其run方法實現:
public void run() {
while (running.get()) {
try {
doWork();
} catch (Throwable t) {
if (logger != null) {
logger.error("Unexpected Error in running {} ", getName(), t);
}
}
}
latch.countDown();
}
複製代碼
從上面來看,主要是循環調用doWork方法,接下來重點看其doWork的實現:
public void doWork() {
try {
if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) { // @1
DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig); // @2
DLedgerLeaderElector.this.maintainState(); // @3
}
sleep(10); // @4
} catch (Throwable t) {
DLedgerLeaderElector.logger.error("Error in heartbeat", t);
}
}
複製代碼
代碼@1:若是該節點參與Leader選舉,則首先調用@2重置定時器,而後驅動狀態機(@3),是接下來重點須要剖析的。
代碼@4:沒執行一次選主,休息10ms。
DLedgerLeaderElector#maintainState
private void maintainState() throws Exception {
if (memberState.isLeader()) {
maintainAsLeader();
} else if (memberState.isFollower()) {
maintainAsFollower();
} else {
maintainAsCandidate();
}
}
複製代碼
根據當前的狀態機狀態,執行對應的操做,從raft協議中可知,總共存在3種狀態:
咱們在繼續往下看以前,須要知道 memberState 的初始值是什麼?咱們追溯到建立 MemberState 的地方,發現其初始狀態爲 CANDIDATE。那咱們接下從 maintainAsCandidate 方法開始跟進。
舒適提示:在raft協議中,節點的狀態默認爲follower,DLedger的實現從candidate開始,一開始,集羣內的全部節點都會嘗試發起投票,這樣第一輪要達成選舉幾乎不太可能。
整個狀態機的驅動,由線程反覆執行maintainState方法。下面重點來分析其狀態的驅動。
DLedgerLeaderElector#maintainAsCandidate
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
return;
}
long term;
long ledgerEndTerm;
long ledgerEndIndex;
複製代碼
Step1:首先先介紹幾個變量的含義:
DLedgerLeaderElector#maintainAsCandidate
synchronized (memberState) {
if (!memberState.isCandidate()) {
return;
}
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
long prevTerm = memberState.currTerm();
term = memberState.nextTerm();
logger.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
term = memberState.currTerm();
}
ledgerEndIndex = memberState.getLedgerEndIndex();
ledgerEndTerm = memberState.getLedgerEndTerm();
}
複製代碼
Step2:初始化team、ledgerEndIndex 、ledgerEndTerm 屬性,其實現關鍵點以下:
DLedgerLeaderElector#maintainAsCandidate
if (needIncreaseTermImmediately) {
nextTimeToRequestVote = getNextTimeToRequestVote();
needIncreaseTermImmediately = false;
return;
}
複製代碼
Step3:若是needIncreaseTermImmediately爲true,則重置該標記位爲false,並從新設置下一次投票超時時間,其實現代碼以下:
private long getNextTimeToRequestVote() {
return System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs);
}
複製代碼
下一次倒計時:當前時間戳 + 上次投票的開銷 + 最小投票間隔(300ms) + (1000- 300 )之間的隨機值。
final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
複製代碼
Step4:向集羣內的其餘節點發起投票請,並返回投票結果列表,稍後會重點分析其投票過程。能夠預見,接下來就是根據各投票結果進行仲裁。
final AtomicLong knownMaxTermInGroup = new AtomicLong(-1);
final AtomicInteger allNum = new AtomicInteger(0);
final AtomicInteger validNum = new AtomicInteger(0);
final AtomicInteger acceptedNum = new AtomicInteger(0);
final AtomicInteger notReadyTermNum = new AtomicInteger(0);
final AtomicInteger biggerLedgerNum = new AtomicInteger(0);
final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
複製代碼
Step5:在進行投票結果仲裁以前,先來介紹幾個局部變量的含義:
for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
// 省略部分代碼
}
複製代碼
Step5:遍歷投票結果,收集投票結果,接下來重點看其內部實現。
if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
validNum.incrementAndGet();
}
複製代碼
Step6:若是投票結果不是UNKNOW,則有效投票數量增1。
synchronized (knownMaxTermInGroup) {
switch (x.getVoteResult()) {
case ACCEPT:
acceptedNum.incrementAndGet();
break;
case REJECT_ALREADY_VOTED:
break;
case REJECT_ALREADY_HAS_LEADER:
alreadyHasLeader.compareAndSet(false, true);
break;
case REJECT_TERM_SMALL_THAN_LEDGER:
case REJECT_EXPIRED_VOTE_TERM:
if (x.getTerm() > knownMaxTermInGroup.get()) {
knownMaxTermInGroup.set(x.getTerm());
}
break;
case REJECT_EXPIRED_LEDGER_TERM:
case REJECT_SMALL_LEDGER_END_INDEX:
biggerLedgerNum.incrementAndGet();
break;
case REJECT_TERM_NOT_READY:
notReadyTermNum.incrementAndGet();
break;
default:
break;
}
}
複製代碼
Step7:統計投票結構,幾個關鍵點以下:
try {
voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {
}
複製代碼
Step8:等待收集投票結果,並設置超時時間。
lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);
VoteResponse.ParseResult parseResult;
if (knownMaxTermInGroup.get() > term) {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
changeRoleToCandidate(knownMaxTermInGroup.get());
} else if (alreadyHasLeader.get()) {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;
} else if (!memberState.isQuorum(validNum.get())) {
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else if (memberState.isQuorum(acceptedNum.get())) {
parseResult = VoteResponse.ParseResult.PASSED;
} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
} else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) {
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
}
複製代碼
Step9:根據收集的投票結果判斷是否能成爲Leader。
舒適提示:在講解關鍵點以前,咱們先定義先將(當前時間戳 + 上次投票的開銷 + 最小投票間隔(300ms) + (1000- 300 )之間的隨機值)定義爲「 1個常規計時器」。
其關鍵點以下:
if (parseResult == VoteResponse.ParseResult.PASSED) {
logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);
changeRoleToLeader(term);
}
複製代碼
Step10:若是投票成功,則狀態機狀態設置爲Leader,而後狀態管理在驅動狀態時會調用DLedgerLeaderElector#maintainState時,將進入到maintainAsLeader方法。
通過maintainAsCandidate 投票選舉後,被其餘節點選舉成爲領導後,會執行該方法,其餘節點的狀態仍是Candidate,並在計時器過時後,又嘗試去發起選舉。接下來重點分析成爲Leader節點後,該節點會作些什麼?
DLedgerLeaderElector#maintainAsLeader
private void maintainAsLeader() throws Exception {
if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) { // @1
long term;
String leaderId;
synchronized (memberState) {
if (!memberState.isLeader()) { // @2
//stop sending
return;
}
term = memberState.currTerm();
leaderId = memberState.getLeaderId();
lastSendHeartBeatTime = System.currentTimeMillis(); // @3
}
sendHeartbeats(term, leaderId); // @4
}
}
複製代碼
代碼@1:首先判斷上一次發送心跳的時間與當前時間的差值是否大於心跳包發送間隔,若是超過,則說明須要發送心跳包。
代碼@2:若是當前不是leader節點,則直接返回,主要是爲了二次判斷。
代碼@3:重置心跳包發送計時器。
代碼@4:向集羣內的全部節點發送心跳包,稍後會詳細介紹心跳包的發送。
當 Candidate 狀態的節點在收到主節點發送的心跳包後,會將狀態變動爲follower,那咱們先來看一下在follower狀態下,節點會作些什麼事情?
private void maintainAsFollower() {
if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {
synchronized (memberState) {
if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {
logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());
changeRoleToCandidate(memberState.currTerm());
}
}
}
}
複製代碼
若是maxHeartBeatLeak (默認爲3)個心跳包週期內未收到心跳,則將狀態變動爲Candidate。
狀態機的驅動就介紹到這裏,在上面的流程中,其實咱們忽略了兩個重要的過程,一個是發起投票請求與投票請求響應、發送心跳包與心跳包響應,那咱們接下來將重點介紹這兩個過程。
節點的狀態爲 Candidate 時會向集羣內的其餘節點發起投票請求(我的以爲理解爲拉票更好),向對方詢問是否願意選舉我爲Leader,對端節點會根據本身的狀況對其投同意票、拒絕票,若是是拒絕票,還會給出拒絕緣由,具體由voteForQuorumResponses、handleVote 這兩個方法來實現,接下來咱們分別對這兩個方法進行詳細分析。
發起投票請求。
private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term, long ledgerEndTerm,
long ledgerEndIndex) throws Exception { // @1
List<CompletableFuture<VoteResponse>> responses = new ArrayList<>();
for (String id : memberState.getPeerMap().keySet()) { // @2
VoteRequest voteRequest = new VoteRequest(); // @3 start
voteRequest.setGroup(memberState.getGroup());
voteRequest.setLedgerEndIndex(ledgerEndIndex);
voteRequest.setLedgerEndTerm(ledgerEndTerm);
voteRequest.setLeaderId(memberState.getSelfId());
voteRequest.setTerm(term);
voteRequest.setRemoteId(id);
CompletableFuture<VoteResponse> voteResponse; // @3 end
if (memberState.getSelfId().equals(id)) { // @4
voteResponse = handleVote(voteRequest, true);
} else {
//async
voteResponse = dLedgerRpcService.vote(voteRequest); // @5
}
responses.add(voteResponse);
}
return responses;
}
複製代碼
代碼@1:首先先解釋一下參數的含義:
代碼@2:遍歷集羣內的節點集合,準備異步發起投票請求。這個集合在啓動的時候指定,不能修改。
代碼@3:構建投票請求。
代碼@4:若是是發送給本身的,則直接調用handleVote進行投票請求響應,若是是發送給集羣內的其餘節點,則經過網絡發送投票請求,對端節點調用各自的handleVote對集羣進行響應。
接下來重點關注 handleVote 方法,重點探討其投票處理邏輯。
因爲handleVote 方法會併發被調用,由於可能同時收到多個節點的投票請求,故本方法都被synchronized方法包含,鎖定的對象爲狀態機 memberState 對象。
if (!memberState.isPeerMember(request.getLeaderId())) {
logger.warn("[BUG] [HandleVote] remoteId={} is an unknown member", request.getLeaderId());
return CompletableFuture.completedFuture(newVoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER));
}
if (!self && memberState.getSelfId().equals(request.getLeaderId())) {
logger.warn("[BUG] [HandleVote] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER));
}
複製代碼
Step1:爲了邏輯的完整性對其請求進行檢驗,除非有BUG存在,不然是不會出現上述問題的。
if (request.getTerm() < memberState.currTerm()) { // @1
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
} else if (request.getTerm() == memberState.currTerm()) { // @2
if (memberState.currVoteFor() == null) {
//let it go
} else if (memberState.currVoteFor().equals(request.getLeaderId())) {
//repeat just let it go
} else {
if (memberState.getLeaderId() != null) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY__HAS_LEADER));
} else {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
}
}
} else { // @3
//stepped down by larger term
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
//only can handleVote when the term is consistent
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
}
複製代碼
Step2:判斷髮起節點、響應節點維護的team進行投票「仲裁」,分以下3種狀況討論:
if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
} else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));
}
if (request.getTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));
}
複製代碼
Step3:判斷請求節點的 ledgerEndTerm 與當前節點的 ledgerEndTerm,這裏主要是判斷日誌的複製進度。
memberState.setCurrVoteFor(request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
複製代碼
Step4:通過層層條件帥選,將寶貴的同意票投給請求節點。
通過幾輪投票,最終一個節點能成功被推舉出來,選爲主節點。主節點爲了維持其領導地位,須要定時向從節點發送心跳包,接下來咱們重點看一下心跳包的發送與響應。
Step1:遍歷集羣中的節點,異步發送心跳包。
CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
try {
if (ex != null) {
throw ex;
}
switch (DLedgerResponseCode.valueOf(x.getCode())) {
case SUCCESS:
succNum.incrementAndGet();
break;
case EXPIRED_TERM:
maxTerm.set(x.getTerm());
break;
case INCONSISTENT_LEADER:
inconsistLeader.compareAndSet(false, true);
break;
case TERM_NOT_READY:
notReadyNum.incrementAndGet();
break;
default:
break;
}
if (memberState.isQuorum(succNum.get())
|| memberState.isQuorum(succNum.get() + notReadyNum.get())) {
beatLatch.countDown();
}
} catch (Throwable t) {
logger.error("Parse heartbeat response failed", t);
} finally {
allNum.incrementAndGet();
if (allNum.get() == memberState.peerSize()) {
beatLatch.countDown();
}
}
});
}
複製代碼
Step2:統計心跳包發送響應結果,關鍵點以下:
這些響應值,咱們在處理心跳包時重點探討。
beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);
if (memberState.isQuorum(succNum.get())) { // @1
lastSuccHeartBeatTime = System.currentTimeMillis();
} else {
logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
if (memberState.isQuorum(succNum.get() + notReadyNum.get())) { // @2
lastSendHeartBeatTime = -1;
} else if (maxTerm.get() > term) { // @3
changeRoleToCandidate(maxTerm.get());
} else if (inconsistLeader.get()) { // @4
changeRoleToCandidate(term);
} else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) {
changeRoleToCandidate(term);
}
}
複製代碼
對收集的響應結果作仲裁,其實現關鍵點:
接下來咱們重點看一下心跳包的處理邏輯。
if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) {
if (request.getLeaderId().equals(memberState.getLeaderId())) {
lastLeaderHeartBeatTime = System.currentTimeMillis();
return CompletableFuture.completedFuture(new HeartBeatResponse());
}
}
複製代碼
Step1:若是主節點的 term 小於 從節點的term,發送反饋給主節點,告知主節點的 term 已過期;若是投票輪次相同,而且發送心跳包的節點是該節點的主節點,則返回成功。
下面重點討論主節點的 term 大於從節點的狀況。
synchronized (memberState) {
if (request.getTerm() < memberState.currTerm()) { // @1
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) { // @2
if (memberState.getLeaderId() == null) {
changeRoleToFollower(request.getTerm(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse());
} else if (request.getLeaderId().equals(memberState.getLeaderId())) {
lastLeaderHeartBeatTime = System.currentTimeMillis();
return CompletableFuture.completedFuture(new HeartBeatResponse());
} else {
//this should not happen, but if happened
logger.error("[{}][BUG] currTerm {} has leader {}, but received leader {}", memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));
}
} else {
//To make it simple, for larger term, do not change to follower immediately
//first change to candidate, and notify the state-maintainer thread
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
//TOOD notify
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));
}
}
複製代碼
Step2:加鎖來處理(這裏更多的是從節點第一次收到主節點的心跳包)
代碼@1:若是主節的投票輪次小於當前投票輪次,則返回主節點投票輪次過時。
代碼@2:若是投票輪次相同。
代碼@3:若是主節點的投票輪次大於從節點的投票輪次,則認爲從節點併爲準備好,則從節點進入Candidate 狀態,並當即發起一次投票。
心跳包的處理就介紹到這裏。
RocketMQ 多副本之 Leader 選舉的源碼分析就介紹到這裏了,爲了增強對源碼的理解,先梳理流程圖以下:
本文就介紹到這裏了,若是對您有必定的幫助,麻煩幫忙點個贊,謝謝。
做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。