ZooKeeper系列之(十):投票選舉(2)

ZooKeeper的選舉過程默認使用FastLeaderElection類,FastLeaderElection啓動時啓動Messenger收發選舉信息。選舉完成後選出1個Leader和若干Follower。網絡

首先理解幾個概念:   數據結構

Epoch:投票週期,用於區分每個round,每一次創建一個新的leader-follower關係,都會有一個惟一的epoch值去標識。就好像皇帝登基必須得有一個年號,與以前或以後的皇帝進行區分。剛啓動是從文件讀取保存的currentEpoch和acceptedEpoch值。默認是-1。this

zxid:事務ID,表示Zookeeper當前寫操做的序列號,確保寫操做的按順序執行。有一種場景是當Follower的最大zxid大於Leader的zxid時,Leader會發送TRUNC包給Follower截斷多餘的事務,保證和Leader數據一致。zxid會在ZxDataBase的loadDataBase方法中初始化。每次執行一次寫事務zxid就會加1,默認爲0。線程

選舉規則:先判斷雙方的Epoch,留取大的Epoch;再看雙方的zxid,留取大的zxid一方。最後的結果就是(Epoch<32|zxid)的一方會被推舉爲Leader。debug

集羣間選舉使用Messenger來負責選舉信息的交互,Messenger底層使用QuorumCnxManager管理本機和集羣其餘機器之間的Socket數據傳遞,通訊流程示意圖以下所示。code

集羣中全部LOOKING狀態的機器同時啓動選舉過程。選舉結束則產生一個Leader和多個Follower。而後就是各Follower鏈接到Leader進行事務初始化,實現數據同步。server

Observer不參與選舉,但會接收Leader的信息同步要求。blog

QuorumPeer剛啓動時會設置初始狀態爲LOOKING,而後啓動FastLeaderElection選舉過程,首先將本身做爲Vote羣發給其餘QuorumPeer,羣發的數據結構爲Notification。同時啓動Messenage的收發線程,調用lookForLeader方法發起一輪選舉Leader的過程。接口

在Leader/Follower狀態,若是接收到遠方LOOKING節點尋找Leader而發送的Notification包,則Messenger會自動回覆Leader信息給對方;在LOOKING狀態,則Messenger會回覆當前Vote節點(臨時Leader)信息給對方。進程

默認選舉規則規定當超過半數的QuorumPeer都認同同一個Leader時,選舉過程結束,各QuorumPeer將本身設置爲Leader、Follower或Observer。固然有興趣的讀者也能夠嘗試建立本身的選舉規則類,只要實現QuorumVerifier便可。

進行Fast leader election的先決條件:

一、 每一個QuorumPeer都知道其餘QuorumPeer的ip地址,並知道QuorumPeer的總數。

二、 每一個QuorumPeer一開始都是發起一個vote,選取本身爲leader。向其餘全部的QuorumPeer發送vote的notification,並等待回覆。

三、 根據QuorumPeer的狀態處理vote notification消息。

選舉過程當中產生的臨時Leader成爲Vote,當選舉結束後最後一輪產生的Vote即成爲新的Leader。

QuorumPeer每次發起選舉都調用lookForLeader方法實現,首先將本身設置爲LOOKING狀態。該方法是FastLeaderElection類的主方法,具體的流程以下:

  • 首先更新選舉週期logicalclock, 並把本身做爲leader做爲投票發給全部其餘的server。
  • 而後進入本輪投票的循環,直到本身再也不是LOOKING狀態。
  1. 從recvqueue獲取一個網絡包(recvqueue的數據來自Messenger),若是沒有收到包則檢查是否要重連和重發本身的投票。
  2. 收到投票後判斷對方投票的狀態。
  1. LOOKING:
    • 若是對方投票的週期(Epoch)大於本身的週期(Epoch),那就清空本身的已經收到的投票集合recvset,並將本身做爲候選和對方投票的leader作比較,選出大的做爲新的投票,而後再發送給全部人。 這裏比較大小是經過比較(zxid,sid)這個二元組來的,zxid大的就大,不然sid大的就大。
    • 若是對方的投票週期小於本身,則忽略對方的投票。
    • 若是週期相等,則比較對方的投票和本身認爲的候選,選出大的做爲新的候選,而後再發送給全部人。
    • 而後判斷當前收到的投票是否能夠得出誰是leader的結論,這裏主要是經過判斷當前的候選leader在收到的投票中是否佔了多數。
    • 若是候選leader在收到的投票中佔了多數,則再等待finalizeWait時鐘,看是否有人修改leader的候選,若是修改了則把投票放到recvqueue中再從新循環。
  2. OBSERVING:若是對方是一個觀察者,因爲它沒有投票權,則無視它
  3. FOLLOWING或LEADING:
  • 若是對方和本身再一個時鐘週期,說明對方已經完成選舉,若是對方說它是leader,那咱們就把它做爲leader,不然就要比較下對方選舉的leader在本身這裏是否佔有多數,而且選舉的leader確認了願意當leader,若是都經過了,就把這個候選做爲本身的leader
  • 若是對方和本身不在一個時鐘週期,說明本身掛掉後又恢復起來,這個時候把別人的投票收集到一個單獨的集合outofelection(從名字能夠看出這個集合不是用在選舉判斷),若是對方的投票在outofelection中佔有大多數,而且leader也確認了本身願意作leader,這個時候更新本身的選舉週期logicalclock,並修改本身的狀態爲FOLLOWING或LEADING

 

lookForLeader代碼較長,咱們先看看它的主體結構。

public Vote lookForLeader() throws InterruptedException {
     try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
       int notTimeout = finalizeWait;
        synchronized(this){
             logicalclock.incrementAndGet();
             updateProposal(getInitId(), getInitLastLoggedZxid(), 
getPeerEpoch());
       }
        sendNotifications();
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){           
            Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);
            if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
               switch (n.state) {
                  case LOOKING:
                       {A代碼}
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:                    
                        {B代碼}
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: " + n.state
                              + " (n.state), " + n.sid + " (n.sid)");
                        break;
                    }
                } 
            }
            return null;
        }   
 }

具體分析以下:

首先經過sendNotification方法告訴集羣我在尋找Leader,集羣中其餘機器會在Messenger進程中接收到Notification,而後都會回覆誰應該是當前Leader。

FastLeaderElection收集到足夠的Notification消息,來判斷到底誰纔是合法的Leader。爲此它對每條Notification消息進行下列判斷:

A. 回覆Notification的發送方也是LOOKING狀態

若是回覆Notification的發送方也是LOOKING狀態,說明它還不知道最終的Leader是誰,這時候FastLeaderElection和發送方比較,看看誰的潛在Leader的Epoch和zxid最大,最大的設置爲當前的候選Leader,而後將這個候選Leader廣播出去,讓發送方也能更改本身的後續Leader。同時判斷本身的收集的Notification可否達到超過半數的條件從而決定最終的Leader,若是能決定則設置最終Leader,退出選舉過程。

case LOOKING:   
   // If notification > current, replace and send messages out
   if (n.electionEpoch > logicalclock.get()) {
       logicalclock.set(n.electionEpoch);
       recvset.clear();
       if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                       getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
            updateProposal(n.leader, n.zxid, n.peerEpoch);
       } else {
         updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());
       }
       sendNotifications();
   } else if (n.electionEpoch < logicalclock.get()) {
       break;
   } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {
       updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications();
   }
   recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, 
n.peerEpoch));
   if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
                            logicalclock.get(), proposedEpoch))) {
      while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
         if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){
             recvqueue.put(n);
             break;
         }
      }
      if (n == null) {
           self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
           Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
           leaveInstance(endVote);
        return endVote;
      }
  }
  break;

totalOrderPredicate方法用於判斷對方發送過來的Vote是否是更新的Leader候選者,若是是的話則更新本地proposedLeader。主要代碼以下:

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&  ((newZxid > curZxid) ||
 ((newZxid == curZxid) && (newId > curId)))));
}

termPredicate用於判斷是否知足選出Leader條件,QuorumVerifier接口的實現,若是知足則設置Leader,並退出FastLeaderElection過程。主要代碼以下:

private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                voteSet.addAck(entry.getKey());
            }
        }
        return voteSet.hasAllQuorums();
}

 

B. 回覆Notification的發送方也是LeaderFollower

若是回覆Notification的發送方是Leader或者Follower,則流程比較簡單,將Notification消息保存到recvset中,並調用termPredicate方法判斷是否能肯定Leader並結束選舉過程。

代碼片斷:

case FOLLOWING:
case LEADING:
     if(n.electionEpoch == logicalclock.get()){
         recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, 
n.peerEpoch));
         if(termPredicate(recvset, new Vote(n.leader, n.zxid, n.electionEpoch,
n.peerEpoch, n.state))  && checkLeader(outofelection, n.leader, 
n.electionEpoch)) {
            self.setPeerState((n.leader == self.getId()) ?
                                  ServerState.LEADING: learningState());
            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
            leaveInstance(endVote);
            return endVote;
         }
    }
相關文章
相關標籤/搜索