ZooKeeper的選舉過程默認使用FastLeaderElection類,FastLeaderElection啓動時啓動Messenger收發選舉信息。選舉完成後選出1個Leader和若干Follower。網絡
首先理解幾個概念: 數據結構
Epoch:投票週期,用於區分每個round,每一次創建一個新的leader-follower關係,都會有一個惟一的epoch值去標識。就好像皇帝登基必須得有一個年號,與以前或以後的皇帝進行區分。剛啓動是從文件讀取保存的currentEpoch和acceptedEpoch值。默認是-1。this
zxid:事務ID,表示Zookeeper當前寫操做的序列號,確保寫操做的按順序執行。有一種場景是當Follower的最大zxid大於Leader的zxid時,Leader會發送TRUNC包給Follower截斷多餘的事務,保證和Leader數據一致。zxid會在ZxDataBase的loadDataBase方法中初始化。每次執行一次寫事務zxid就會加1,默認爲0。線程
選舉規則:先判斷雙方的Epoch,留取大的Epoch;再看雙方的zxid,留取大的zxid一方。最後的結果就是(Epoch<32|zxid)的一方會被推舉爲Leader。debug
集羣間選舉使用Messenger來負責選舉信息的交互,Messenger底層使用QuorumCnxManager管理本機和集羣其餘機器之間的Socket數據傳遞,通訊流程示意圖以下所示。code
集羣中全部LOOKING狀態的機器同時啓動選舉過程。選舉結束則產生一個Leader和多個Follower。而後就是各Follower鏈接到Leader進行事務初始化,實現數據同步。server
Observer不參與選舉,但會接收Leader的信息同步要求。blog
QuorumPeer剛啓動時會設置初始狀態爲LOOKING,而後啓動FastLeaderElection選舉過程,首先將本身做爲Vote羣發給其餘QuorumPeer,羣發的數據結構爲Notification。同時啓動Messenage的收發線程,調用lookForLeader方法發起一輪選舉Leader的過程。接口
在Leader/Follower狀態,若是接收到遠方LOOKING節點尋找Leader而發送的Notification包,則Messenger會自動回覆Leader信息給對方;在LOOKING狀態,則Messenger會回覆當前Vote節點(臨時Leader)信息給對方。進程
默認選舉規則規定當超過半數的QuorumPeer都認同同一個Leader時,選舉過程結束,各QuorumPeer將本身設置爲Leader、Follower或Observer。固然有興趣的讀者也能夠嘗試建立本身的選舉規則類,只要實現QuorumVerifier便可。
進行Fast leader election的先決條件:
一、 每一個QuorumPeer都知道其餘QuorumPeer的ip地址,並知道QuorumPeer的總數。
二、 每一個QuorumPeer一開始都是發起一個vote,選取本身爲leader。向其餘全部的QuorumPeer發送vote的notification,並等待回覆。
三、 根據QuorumPeer的狀態處理vote notification消息。
選舉過程當中產生的臨時Leader成爲Vote,當選舉結束後最後一輪產生的Vote即成爲新的Leader。
QuorumPeer每次發起選舉都調用lookForLeader方法實現,首先將本身設置爲LOOKING狀態。該方法是FastLeaderElection類的主方法,具體的流程以下:
lookForLeader代碼較長,咱們先看看它的主體結構。
public Vote lookForLeader() throws InterruptedException { try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); if (self.getCurrentAndNextConfigVoters().contains(n.sid)) { switch (n.state) { case LOOKING: {A代碼} break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: {B代碼} break; default: LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)"); break; } } } return null; } }
具體分析以下:
首先經過sendNotification方法告訴集羣我在尋找Leader,集羣中其餘機器會在Messenger進程中接收到Notification,而後都會回覆誰應該是當前Leader。
FastLeaderElection收集到足夠的Notification消息,來判斷到底誰纔是合法的Leader。爲此它對每條Notification消息進行下列判斷:
A. 回覆Notification的發送方也是LOOKING狀態
若是回覆Notification的發送方也是LOOKING狀態,說明它還不知道最終的Leader是誰,這時候FastLeaderElection和發送方比較,看看誰的潛在Leader的Epoch和zxid最大,最大的設置爲當前的候選Leader,而後將這個候選Leader廣播出去,讓發送方也能更改本身的後續Leader。同時判斷本身的收集的Notification可否達到超過半數的條件從而決定最終的Leader,若是能決定則設置最終Leader,退出選舉過程。
case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } } break;
totalOrderPredicate方法用於判斷對方發送過來的Vote是否是更新的Leader候選者,若是是的話則更新本地proposedLeader。主要代碼以下:
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
termPredicate用於判斷是否知足選出Leader條件,QuorumVerifier接口的實現,若是知足則設置Leader,並退出FastLeaderElection過程。主要代碼以下:
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } for (Map.Entry<Long, Vote> entry : votes.entrySet()) { if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); } } return voteSet.hasAllQuorums(); }
B. 回覆Notification的發送方也是Leader或Follower
若是回覆Notification的發送方是Leader或者Follower,則流程比較簡單,將Notification消息保存到recvset中,並調用termPredicate方法判斷是否能肯定Leader並結束選舉過程。
代碼片斷:
case FOLLOWING: case LEADING: if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(termPredicate(recvset, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, n.electionEpoch)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } }