在上一篇文章當中,咱們講解了NodeImpl在init方法裏面會初始化話的動做,選舉也是在這個方法裏面進行的,這篇文章來從這個方法裏詳細講一下選舉的過程。html
因爲我這裏介紹的是如何實現的,因此請你們先看一下原理:SOFAJRaft 選舉機制剖析 | SOFAJRaft 實現原理java
文章比較長,我也慢慢的寫了半個月時間~node
我在這裏只把有關選舉的代碼列舉出來,其餘的代碼暫且忽略 NodeImpl#initapp
public boolean init(final NodeOptions opts) {
....
// Init timers
//設置投票計時器
this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
//處理投票超時
handleVoteTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在必定範圍內返回一個隨機的時間戳
return randomTimeout(timeoutMs);
}
};
//設置預投票計時器
//當leader在規定的一段時間內沒有與 Follower 艦船進行通訊時,
// Follower 就能夠認爲leader已經不能正常擔任旗艦的職責,則 Follower 能夠去嘗試接替leader的角色。
// 這段通訊超時被稱爲 Election Timeout
//候選者在發起投票以前,先發起預投票
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
handleElectionTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在必定範圍內返回一個隨機的時間戳
//爲了不同時發起選舉而致使失敗
return randomTimeout(timeoutMs);
}
};
//leader下臺的計時器
//定時檢查是否須要從新選舉leader
this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) {
@Override
protected void onTrigger() {
handleStepDownTimeout();
}
};
....
if (!this.conf.isEmpty()) {
//新啓動的node須要從新選舉
stepDown(this.currTerm, false, new Status());
}
....
}
複製代碼
在這個init方法裏面會初始化三個計時器是和選舉有關的:dom
RepeatedTimer的分析我已經寫好了:2. SOFAJRaft源碼分析—JRaft的定時任務調度器是怎麼作的?ide
咱們先跟着init方法的思路往下看,通常來講this.conf裏面裝的是整個集羣的節點信息,是不會爲空的,因此會調用stepDown,因此先從這個方法看起。源碼分析
private void stepDown(final long term, final boolean wakeupCandidate, final Status status) {
LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", getNodeId(), this.currTerm, term,
wakeupCandidate);
//校驗一下當前節點的狀態是否有異常,或正在關閉
if (!this.state.isActive()) {
return;
}
//若是是候選者,那麼中止選舉
if (this.state == State.STATE_CANDIDATE) {
//調用voteTimer的stop方法
stopVoteTimer();
//若是當前狀態是leader或TRANSFERRING
} else if (this.state.compareTo(State.STATE_TRANSFERRING) <= 0) {
//讓啓動的stepDownTimer中止運做
stopStepDownTimer();
//清空選票箱中的內容
this.ballotBox.clearPendingTasks();
// signal fsm leader stop immediately
if (this.state == State.STATE_LEADER) {
//發送leader下臺的事件給其餘Follower
onLeaderStop(status);
}
}
// reset leader_id
//重置當前節點的leader
resetLeaderId(PeerId.emptyPeer(), status);
// soft state in memory
this.state = State.STATE_FOLLOWER;
//重置Configuration的上下文
this.confCtx.reset();
updateLastLeaderTimestamp(Utils.monotonicMs());
if (this.snapshotExecutor != null) {
//中止當前的快照生成
this.snapshotExecutor.interruptDownloadingSnapshots(term);
}
//設置任期爲大的那個
// meta state
if (term > this.currTerm) {
this.currTerm = term;
this.votedId = PeerId.emptyPeer();
//重設元數據信息保存到文件中
this.metaStorage.setTermAndVotedFor(term, this.votedId);
}
if (wakeupCandidate) {
this.wakingCandidate = this.replicatorGroup.stopAllAndFindTheNextCandidate(this.conf);
if (this.wakingCandidate != null) {
Replicator.sendTimeoutNowAndStop(this.wakingCandidate, this.options.getElectionTimeoutMs());
}
} else {
//把replicatorGroup裏面的全部replicator標記爲stop
this.replicatorGroup.stopAll();
}
//leader轉移的時候會用到
if (this.stopTransferArg != null) {
if (this.transferTimer != null) {
this.transferTimer.cancel(true);
}
// There is at most one StopTransferTimer at the same term, it's safe to
// mark stopTransferArg to NULL
this.stopTransferArg = null;
}
//啓動
this.electionTimer.start();
}
複製代碼
一個leader的下臺須要作不少交接的工做:ui
調用stopVoteTimer和stopStepDownTimer方法裏面主要是調用相應的RepeatedTimer的stop方法,在stop方法裏面會將stopped狀態設置爲ture,並將timeout設置爲取消,並將這個timeout加入到cancelledTimeouts集合中去: 若是看了2. SOFAJRaft源碼分析—JRaft的定時任務調度器是怎麼作的?這篇文章的話,那麼下面這段代碼應該一看就明白是怎麼回事了的。this
public void stop() {
this.lock.lock();
try {
if (this.stopped) {
return;
}
this.stopped = true;
if (this.timeout != null) {
this.timeout.cancel();
this.running = false;
this.timeout = null;
}
} finally {
this.lock.unlock();
}
}
複製代碼
在調用NodeImpl的onLeaderStop方法中,其實是調用了FSMCallerImpl的onLeaderStop方法 NodeImpl#onLeaderStopspa
private void onLeaderStop(final Status status) {
this.replicatorGroup.clearFailureReplicators();
this.fsmCaller.onLeaderStop(status);
}
複製代碼
FSMCallerImpl#onLeaderStop
public boolean onLeaderStop(final Status status) {
return enqueueTask((task, sequence) -> {
//設置當前task的狀態爲LEADER_STOP
task.type = TaskType.LEADER_STOP;
task.status = new Status(status);
});
}
private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
if (this.shutdownLatch != null) {
// Shutting down
LOG.warn("FSMCaller is stopped, can not apply new task.");
return false;
}
//使用Disruptor發佈事件
this.taskQueue.publishEvent(tpl);
return true;
}
複製代碼
這個方法裏像taskQueue隊列裏面發佈了一個LEADER_STOP事件,taskQueue是在FSMCallerImpl的init方法中被初始化的:
public boolean init(final FSMCallerOptions opts) {
.....
this.disruptor = DisruptorBuilder.<ApplyTask>newInstance() //
.setEventFactory(new ApplyTaskFactory()) //
.setRingBufferSize(opts.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
this.disruptor.handleEventsWith(new ApplyTaskHandler());
this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.taskQueue = this.disruptor.start();
.....
}
複製代碼
在taskQueue中發佈了一個任務以後會交給ApplyTaskHandler進行處理
ApplyTaskHandler
private class ApplyTaskHandler implements EventHandler<ApplyTask> {
// max committed index in current batch, reset to -1 every batch
private long maxCommittedIndex = -1;
@Override
public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
}
}
複製代碼
每當有任務到達taskQueue隊列的時候會調用ApplyTaskHandler的onEvent方法來處理事件,具體的執行邏輯由runApplyTask方法進行處理
FSMCallerImpl#runApplyTask
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
CountDownLatch shutdown = null;
...
switch (task.type) {
...
case LEADER_STOP:
this.currTask = TaskType.LEADER_STOP;
doLeaderStop(task.status);
break;
...
}
....
}
複製代碼
在runApplyTask方法裏會對不少的事件進行處理,咱們這裏只看LEADER_STOP是怎麼作的:
在switch裏會調用doLeaderStop方法,這個方法會調用到FSMCallerImpl裏面封裝的StateMachine狀態機的onLeaderStart方法:
private void doLeaderStop(final Status status) {
this.fsm.onLeaderStop(status);
}
複製代碼
這樣就能夠對leader中止時進行客製化的處理了。
接下來會調用resetLeaderId(PeerId.emptyPeer(), status);方法來重置leader
private void resetLeaderId(final PeerId newLeaderId, final Status status) {
if (newLeaderId.isEmpty()) {
//這個判斷表示若是當前節點是候選者或者是Follower,而且已經有leader了
if (!this.leaderId.isEmpty() && this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
//向狀態機裝發佈中止跟隨該leader的事件
this.fsmCaller.onStopFollowing(new LeaderChangeContext(this.leaderId.copy(), this.currTerm, status));
}
//把當前的leader設置爲一個空值
this.leaderId = PeerId.emptyPeer();
} else {
//若是當前節點沒有leader
if (this.leaderId == null || this.leaderId.isEmpty()) {
//那麼發佈要跟隨該leader的事件
this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status));
}
this.leaderId = newLeaderId.copy();
}
}
複製代碼
這個方法由兩個做用,若是傳入的newLeaderId不是個空的,那麼就會設置一個新的leader,並向狀態機發送一個START_FOLLOWING事件;若是傳入的newLeaderId是空的,那麼就會發送一個STOP_FOLLOWING事件,並把當前的leader置空。
electionTimer是RepeatedTimer的實現類,在這裏我就很少說了,上一篇文章已經介紹過了。
我這裏來看看electionTimer的onTrigger方法是怎麼處理選舉事件的,electionTimer的onTrigger方法會調用NodeImpl的handleElectionTimeout方法,因此直接看這個方法:
NodeImpl#handleElectionTimeout
private void handleElectionTimeout() {
boolean doUnlock = true;
this.writeLock.lock();
try {
if (this.state != State.STATE_FOLLOWER) {
return;
}
//若是當前選舉沒有超時則說明此輪選舉有效
if (isCurrentLeaderValid()) {
return;
}
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
this.leaderId));
doUnlock = false;
//預投票 (pre-vote) 環節
//候選者在發起投票以前,先發起預投票,
// 若是沒有獲得半數以上節點的反饋,則候選者就會識趣的放棄參選
preVote();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
複製代碼
在這個方法裏,首先會加上一個寫鎖,而後進行校驗,最後先發起一個預投票。
校驗的時候會校驗當前的狀態是否是follower,校驗leader和follower上次的通訊時間是否是超過了ElectionTimeoutMs,若是沒有超過,說明leader存活,不必發起選舉;若是通訊超時,那麼會將leader置空,而後調用預選舉。
NodeImpl#isCurrentLeaderValid
private boolean isCurrentLeaderValid() {
return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs();
}
複製代碼
用當前時間和上次leader通訊時間相減,若是小於ElectionTimeoutMs(默認1s),那麼就沒有超時,說明leader有效
咱們在handleElectionTimeout方法中最後調用了preVote方法,接下來重點看一下這個方法。
下面我將preVote拆分紅幾部分來進行講解: NodeImpl#preVote part1
private void preVote() {
long oldTerm;
try {
LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm);
if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn(
"Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.",
getNodeId());
return;
}
//conf裏面記錄了集羣節點的信息,若是當前的節點不包含在集羣裏說明是由問題的
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
return;
}
//設置一下當前的任期
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}
....
}
複製代碼
這部分代碼是一開始進到preVote這個方法首先要通過一些校驗,例如當前的節點不能再安裝快照的時候進行選舉;查看一下當前的節點是否是在本身設置的conf裏面,conf這個屬性會包含了集羣的全部節點;最後設置一下當前的任期後解鎖。
NodeImpl#preVote part2
private void preVote() {
....
//返回最新的log實體類
final LogId lastLogId = this.logManager.getLastLogId(true);
boolean doUnlock = true;
this.writeLock.lock();
try {
// pre_vote need defense ABA after unlock&writeLock
//由於在上面沒有從新加鎖的間隙裏可能會被別的線程改變了,因此這裏校驗一下
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
return;
}
//初始化預投票投票箱
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
for (final PeerId peer : this.conf.listPeers()) {
//若是遍歷的節點是當前節點就跳過
if (peer.equals(this.serverId)) {
continue;
}
//失聯的節點也跳過
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
//設置一個回調的類
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
//向被遍歷到的這個節點發送一個預投票的請求
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term,注意這裏發送過去的任期會加一
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
}
//本身也能夠投給本身
this.prevVoteCtx.grant(this.serverId);
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
electSelf();
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
複製代碼
這部分代碼:
初始化預投票箱是調用了Ballot的init方法進行初始化,分別傳入新的集羣節點信息,和老的集羣節點信息
public boolean init(Configuration conf, Configuration oldConf) {
this.peers.clear();
this.oldPeers.clear();
quorum = oldQuorum = 0;
int index = 0;
//初始化新的節點
if (conf != null) {
for (PeerId peer : conf) {
this.peers.add(new UnfoundPeerId(peer, index++, false));
}
}
//設置須要多少票數才能成爲leader
this.quorum = this.peers.size() / 2 + 1;
....
return true;
}
複製代碼
我這裏爲了使邏輯更清晰,假設沒有oldConf,省略oldConf相關設置。 這個方法裏會遍歷全部的peer節點,並將peer封裝成UnfoundPeerId插入到peers集合中;而後設置quorum屬性,這個屬性會在每得到一個投票就減1,當減到0如下時說明得到了足夠多的票數,就表明預投票成功。
//設置一個回調的類
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
//向被遍歷到的這個節點發送一個預投票的請求
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term,注意這裏發送過去的任期會加一
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
複製代碼
在構造RequestVoteRequest的時候,會將PreVote屬性設置爲true,表示此次請求是預投票;設置當前節點爲ServerId;傳給對方的任期是當前節點的任期加一。最後在發送成功收到響應以後會回調OnPreVoteRpcDone的run方法。
OnPreVoteRpcDone#run
public void run(final Status status) {
NodeImpl.this.metrics.recordLatency("pre-vote", Utils.monotonicMs() - this.startMs);
if (!status.isOk()) {
LOG.warn("Node {} PreVote to {} error: {}.", getNodeId(), this.peer, status);
} else {
handlePreVoteResponse(this.peer, this.term, getResponse());
}
}
複製代碼
在這個方法中若是收到正常的響應,那麼會調用handlePreVoteResponse方法處理響應
OnPreVoteRpcDone#handlePreVoteResponse
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//只有follower才能夠嘗試發起選舉
if (this.state != State.STATE_FOLLOWER) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",
getNodeId(), peerId, this.state);
return;
}
if (term != this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
peerId, term, this.currTerm);
return;
}
//若是返回的任期大於當前的任期,那麼此次請求也是無效的
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,
response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term pre_vote_response."));
return;
}
LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
response.getTerm(), response.getGranted());
// check granted quorum?
if (response.getGranted()) {
this.prevVoteCtx.grant(peerId);
//獲得了半數以上的響應
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
//進行選舉
electSelf();
}
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
複製代碼
這裏作了3重校驗,咱們分別來談談:
校驗完以後響應的節點會返回一個受權,若是受權經過的話則調用Ballot的grant方法,表示給當前的節點投一票
Ballot#grant
public void grant(PeerId peerId) {
this.grant(peerId, new PosHint());
}
public PosHint grant(PeerId peerId, PosHint hint) {
UnfoundPeerId peer = findPeer(peerId, peers, hint.pos0);
if (peer != null) {
if (!peer.found) {
peer.found = true;
this.quorum--;
}
hint.pos0 = peer.index;
} else {
hint.pos0 = -1;
}
....
return hint;
}
複製代碼
grant方法會根據peerId去集羣集合裏面去找被封裝的UnfoundPeerId實例,而後判斷一下,若是沒有被記錄過,那麼就將quorum減一,表示收到一票,而後將found設置爲ture表示已經找過了。
在查找UnfoundPeerId實例的時候方法裏面作了一個頗有趣的設置: 首先在存入到peers集合裏面的時候是這樣的:
int index = 0;
for (PeerId peer : conf) {
this.peers.add(new UnfoundPeerId(peer, index++, false));
}
複製代碼
這裏會遍歷conf,而後會存入index,index從零開始。 而後在查找的時候會傳入peerId和posHint還有peers集合:
private UnfoundPeerId findPeer(PeerId peerId, List<UnfoundPeerId> peers, int posHint) {
if (posHint < 0 || posHint >= peers.size() || !peers.get(posHint).peerId.equals(peerId)) {
for (UnfoundPeerId ufp : peers) {
if (ufp.peerId.equals(peerId)) {
return ufp;
}
}
return null;
}
return peers.get(posHint);
}
複製代碼
這裏傳入的posHint默認是-1 ,即若是是第一次傳入,那麼會遍歷整個peers集合,而後一個個比對以後返回。
由於PosHint實例會在調用完以後將pos0設置爲peer的index,若是grant方法不是第一次調用,那麼在調用findPeer方法的時候就能夠直接經過get方法獲取,不用再遍歷整個集合了。
這種寫法也能夠運用到平時的代碼中去。
調用了grant方法以後會調用Ballot的isGranted判斷一下是否達到了半數以上的響應。 Ballot#isGranted
public boolean isGranted() {
return this.quorum <= 0 && oldQuorum <= 0;
}
複製代碼
即判斷一下投票箱裏面的票數是否是被減到了0。若是返回是的話,那麼就調用electSelf進行選舉。
選舉的方法暫時先不看,咱們來看看預選舉的請求是怎麼被處理的
RequestVoteRequest請求的處理器是在RaftRpcServerFactory的addRaftRequestProcessors方法中被安置的,具體的處理時RequestVoteRequestProcessor。
具體的處理方法是交由processRequest0方法來處理的。
RequestVoteRequestProcessor#processRequest0
public Message processRequest0(RaftServerService service, RequestVoteRequest request, RpcRequestClosure done) {
//若是是預選舉
if (request.getPreVote()) {
return service.handlePreVoteRequest(request);
} else {
return service.handleRequestVoteRequest(request);
}
}
複製代碼
由於這個處理器能夠處理選舉和預選舉,因此加了個判斷。預選舉的方法交給NodeImpl的handlePreVoteRequest來具體實現的。
NodeImpl#handlePreVoteRequest
public Message handlePreVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//校驗這個節點是否是正常的節點
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
//發送過來的request請求攜帶的ServerId格式不能錯
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
}
boolean granted = false;
// noinspection ConstantConditions
do {
//已經有leader的狀況
if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
LOG.info(
"Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
break;
}
//請求的任期小於當前的任期
if (request.getTerm() < this.currTerm) {
LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
// A follower replicator may not be started when this node become leader, so we must check it.
//那麼這個節點也多是leader,因此校驗一下請求的節點是否是複製節點,從新加入到replicatorGroup中
checkReplicator(candidateId);
break;
} else if (request.getTerm() == this.currTerm + 1) {
// A follower replicator may not be started when this node become leader, so we must check it.
// check replicator state
//由於請求的任期和當前的任期相等,那麼這個節點也多是leader,
// 因此校驗一下請求的節點是否是複製節點,從新加入到replicatorGroup中
checkReplicator(candidateId);
}
doUnlock = false;
this.writeLock.unlock();
//獲取最新的日誌
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
//比較當前節點的日誌完整度和請求節點的日誌完整度
granted = requestLastLogId.compareTo(lastLogId) >= 0;
LOG.info(
"Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
lastLogId);
} while (false);//這個while蠻有意思,爲了用break想盡了辦法
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
.setGranted(granted) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
複製代碼
這個方法裏面也是蠻有意思的,寫的很長,可是邏輯很清楚。
這裏有一個有意思的地方是,由於java中只能在循環中goto,因此這裏使用了do-while(false)只作單次的循環,這樣就能夠do代碼塊裏使用break了。
下面稍微看一下checkReplicator: NodeImpl#checkReplicator
private void checkReplicator(final PeerId candidateId) {
if (this.state == State.STATE_LEADER) {
this.replicatorGroup.checkReplicator(candidateId, false);
}
}
複製代碼
這裏判斷一下是否是leader,而後就會調用ReplicatorGroupImpl的checkReplicator
ReplicatorGroupImpl#checkReplicator
private final ConcurrentMap<PeerId, ThreadId> replicatorMap = new ConcurrentHashMap<>();
private final Set<PeerId> failureReplicators = new ConcurrentHashSet<>();
public void checkReplicator(final PeerId peer, final boolean lockNode) {
//根據傳入的peer獲取相應的ThreadId
final ThreadId rid = this.replicatorMap.get(peer);
// noinspection StatementWithEmptyBody
if (rid == null) {
// Create replicator if it's not found for leader.
final NodeImpl node = this.commonOptions.getNode();
if (lockNode) {
node.writeLock.lock();
}
try {
//若是當前的節點是leader,而且傳入的peer在failureReplicators中,那麼從新添加到replicatorMap
if (node.isLeader() && this.failureReplicators.contains(peer) && addReplicator(peer)) {
this.failureReplicators.remove(peer);
}
} finally {
if (lockNode) {
node.writeLock.unlock();
}
}
} else { // NOPMD
// Unblock it right now.
// Replicator.unBlockAndSendNow(rid);
}
}
複製代碼
checkReplicator會從replicatorMap根據傳入的peer實例校驗一下是否是爲空。由於replicatorMap裏面存放了集羣全部的節點。而後經過ReplicatorGroupImpl的commonOptions獲取當前的Node實例,若是當前的node實例是leader,而且若是存在失敗集合failureReplicators中的話就從新添加進replicatorMap中。
ReplicatorGroupImpl#addReplicator
public boolean addReplicator(final PeerId peer) {
//校驗當前的任期
Requires.requireTrue(this.commonOptions.getTerm() != 0);
//若是replicatorMap裏面已經有這個節點了,那麼將它從failureReplicators集合中移除
if (this.replicatorMap.containsKey(peer)) {
this.failureReplicators.remove(peer);
return true;
}
//賦值一個新的ReplicatorOptions
final ReplicatorOptions opts = this.commonOptions == null ? new ReplicatorOptions() : this.commonOptions.copy();
//新的ReplicatorOptions添加這個PeerId
opts.setPeerId(peer);
final ThreadId rid = Replicator.start(opts, this.raftOptions);
if (rid == null) {
LOG.error("Fail to start replicator to peer={}.", peer);
this.failureReplicators.add(peer);
return false;
}
return this.replicatorMap.put(peer, rid) == null;
}
複製代碼
addReplicator裏面主要是作了兩件事:1. 將要加入的節點從failureReplicators集合裏移除;2. 將要加入的節點放入到replicatorMap集合中去。
private void electSelf() {
long oldTerm;
try {
LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
//1. 若是當前節點不在集羣裏面則不進行選舉
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
return;
}
//2. 大概是由於要進行正式選舉了,把預選舉關掉
if (this.state == State.STATE_FOLLOWER) {
LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
this.electionTimer.stop();
}
//3. 清空leader
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
"A follower's leader_id is reset to NULL as it begins to request_vote."));
this.state = State.STATE_CANDIDATE;
this.currTerm++;
this.votedId = this.serverId.copy();
LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
//4. 開始發起投票定時器,由於可能投票失敗須要循環發起投票
this.voteTimer.start();
//5. 初始化投票箱
this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}
final LogId lastLogId = this.logManager.getLastLogId(true);
this.writeLock.lock();
try {
// vote need defense ABA after unlock&writeLock
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
return;
}
//6. 遍歷全部節點
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(false) // It's not a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm) //
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
}
this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
this.voteCtx.grant(this.serverId);
if (this.voteCtx.isGranted()) {
//7. 投票成功,那麼就晉升爲leader
becomeLeader();
}
} finally {
this.writeLock.unlock();
}
}
複製代碼
不要看這個方法這麼長,其實都是和前面預選舉的方法preVote重複度很高的。方法太長,因此標了號,從上面號碼來一步步講解:
我先來看看RequestVoteRequestProcessor怎麼處理的選舉: 在RequestVoteRequestProcessor的processRequest0會調用NodeImpl的handleRequestVoteRequest來處理具體的邏輯。
NodeImpl#handleRequestVoteRequest
public Message handleRequestVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//是否存活
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
}
// noinspection ConstantConditions
do {
// check term
if (request.getTerm() >= this.currTerm) {
LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
//1. 若是請求的任期大於當前任期
// increase current term, change state to follower
if (request.getTerm() > this.currTerm) {
stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term RequestVoteRequest."));
}
} else {
// ignore older term
LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
break;
}
doUnlock = false;
this.writeLock.unlock();
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
// vote need ABA check after unlock&writeLock
if (request.getTerm() != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
break;
}
//2. 判斷日誌完整度
final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm())
.compareTo(lastLogId) >= 0;
//3. 判斷當前的節點是否是已經投過票了
if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
"Raft node votes for some candidate, step down to restart election_timer."));
this.votedId = candidateId.copy();
this.metaStorage.setVotedFor(candidateId);
}
} while (false);
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
//4.贊成投票的條件是當前的任期和請求的任期同樣,而且已經將votedId設置爲請求節點
.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
複製代碼
這個方法大體也和handlePreVoteRequest差很少。我這裏只分析一下我標註的。
投票完畢以後若是收到的票數大於一半,那麼就會晉升爲leader,調用becomeLeader方法。
private void becomeLeader() {
Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
this.conf.getConf(), this.conf.getOldConf());
// cancel candidate vote timer
//晉升leader以後就會把選舉的定時器關閉了
stopVoteTimer();
//設置當前的狀態爲leader
this.state = State.STATE_LEADER;
this.leaderId = this.serverId.copy();
//複製集羣中設置新的任期
this.replicatorGroup.resetTerm(this.currTerm);
//遍歷全部的集羣節點
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
LOG.debug("Node {} add replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
//若是成爲leader,那麼須要把本身的日誌信息複製到其餘節點
if (!this.replicatorGroup.addReplicator(peer)) {
LOG.error("Fail to add replicator, peer={}.", peer);
}
}
// init commit manager
this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
// Register _conf_ctx to reject configuration changing before the first log
// is committed.
if (this.confCtx.isBusy()) {
throw new IllegalStateException();
}
this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
//若是是leader了,那麼就要定時的檢查不是有資格勝任
this.stepDownTimer.start();
}
複製代碼
這個方法裏面首先會中止選舉定時器,而後設置當前的狀態爲leader,並設值任期,而後遍歷全部的節點將節點加入到複製集羣中,最後將stepDownTimer打開,定時對leader進行校驗是否是又半數以上的節點響應當前的leader。
好了,到這裏就講完了,但願下次還能夠see you again~