Zookeeper 源碼機制分析

zkServer.sh 中的main函數: org.apache.zookeeper.server.quorum.QuorumPeerMainjava

zkCli.sh 中的main函數: org.apache.zookeeper.ZooKeeperMain算法

選主

  • HashMap<Long, Vote> outofelection :  只在外部選票狀態爲 FOLLOWING/LEADING 時 存儲推送來的選票, 添加vote;
  • HashMap<Long, Vote> recvset: 存儲外部的選票;
  •  LinkedBlockingQueue<Notification> recvqueue:  接收外部選票的隊列;
  • LinkedBlockingQueue<ToSend> sendqueue:  發送本身選票的隊列;

key:推送過來的服務實例 myid,表明哪一個服務實例的選票;  apache

Value:  Vote對象對應Notification,  標識該服務器推舉的選票信息:
           版本默認爲 version = 0x0  
           leader的myid: leader,
           leader的zxid:  zxid, 
          leader的epoch: peerEpoch,
          發送者的logicalclock: electionEpoch服務器

算法

選票來自OBSERVING,拋棄;ide

選票來自FOLLOWING/LEADING函數

         斷定 外票的選舉輪次electionEpoch與本身自己的logicalclockthis

  1. 相同: 外票歸於<recvset>中。
              斷定 該外票在<recvset>中相同的選票個數符合集羣配置的有效策略(過半有效) 且  在非選主狀態的外票集合<outofelection>中該選票就是leader:
          true:   肯定本身的角色, 清空接收外票的隊列<recvqueue>, 返回該外票,跳出整個選舉方法。
          false:   什麼也不幹。
  2. 外票歸於<outofelection>中。
           斷定 外票在<outofelection>中相同的選票個數符合整個集羣配置的有效策略(過半有效)且  在非選主狀態的選票集合<outofelection>中該選票就是leader; 
             true:   則更新當前logicalclock爲選票electionEpoch, 肯定本身角色, 清空接收外票隊列<recvqueue>, 返回當前選票; 跳出整個選舉方法。 
              false:   什麼也不幹。

當一個新啓動的節點加入集羣時,它對集羣內其餘節點發出投票請求,而其餘節點已不處於LOOKING狀態,此時其餘節點回應選舉結果,該節點收集這些結果到outofelection中,最終在收到合法LEADER消息且這些選票也構成選舉結束條件時,該節點就結束本身的選舉行爲。注意到代碼中會logicalclock = n.electionEpoch;更新選舉輪數。spa

選票來自LOOKING
 比較 選票發送方的選舉序列electionEpoch  與   自己時序logicalclockdebug

  1. 大於本身 :
    1. logicalclock = n.electionEpoch;
    2. 清空已經收到的選票<recvset>; 由於有更高一級的選舉序列了。選舉序列比本身小的被拋棄, 本身又會更新成比本身大的, 因此<recvset>中的選票的都等於本身的當前值。
    3. 與本身初始選票比較出更優的選票, 選出來後,更新本身推舉proposed的選票對象,推送出去。
      1. 推舉的leader的選舉序列<peerEpoch>高的;
      2. 事務zxid 高的;
      3. myid 高的 ; 
  2. 小於本身 :   丟棄;跳出switch,處理下一個Notification。
  3. 相同與本身推舉proposed的選票比較出更優的選票,再更新本身推舉proposed的選票對象,推送出去。

將外票保存在選票集合<recvset>中,若是更新以後的本地推薦選票知足集羣配置的有效策略(過半有效) 則 從隊列recvqueue中循環獲取poll,比較:code

  1. 有比本地推薦選票還優先級高的,再扔回recvqueue中,break出循環;
  2. 其餘的拋棄 --- 其實就是把接收隊列中比本身優先級低的扔掉。

處理完以後recvqueue沒有了對象, 說明接收選票完結了,當前更新以後的本地推薦選票就是leader。肯定本身的角色,清空接收隊列,返回該選票。

switch (n.state) {
   case LOOKING:
       // If notification > current, replace and send messages out
       if (n.electionEpoch > logicalclock) {
           logicalclock = n.electionEpoch;
           recvset.clear();
           if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                   getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
               updateProposal(n.leader, n.zxid, n.peerEpoch);
           } else {
               updateProposal(getInitId(),
                       getInitLastLoggedZxid(),
                       getPeerEpoch());
           }
           sendNotifications();
       } else if (n.electionEpoch < logicalclock) {
           if(LOG.isDebugEnabled()){
               LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                       + Long.toHexString(n.electionEpoch)
                       + ", logicalclock=0x" + Long.toHexString(logicalclock));
           }
           break;
       } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
               proposedLeader, proposedZxid, proposedEpoch)) {
           updateProposal(n.leader, n.zxid, n.peerEpoch);
           sendNotifications();
       }

       if(LOG.isDebugEnabled()){
           LOG.debug("Adding vote: from=" + n.sid +
                   ", proposed leader=" + n.leader +
                   ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                   ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
       }

       recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

       if (termPredicate(recvset,
               new Vote(proposedLeader, proposedZxid,
                       logicalclock, proposedEpoch))) {

           // Verify if there is any change in the proposed leader
           while((n = recvqueue.poll(finalizeWait,
                   TimeUnit.MILLISECONDS)) != null){
               if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                       proposedLeader, proposedZxid, proposedEpoch)){
                   recvqueue.put(n);
                   break;
               }
           }

           /*
            * This predicate is true once we don't read any new
            * relevant message from the reception queue
            */
           if (n == null) {
               self.setPeerState((proposedLeader == self.getId()) ?
                       ServerState.LEADING: learningState());

               Vote endVote = new Vote(proposedLeader,
                                       proposedZxid,
                                       logicalclock,
                                       proposedEpoch);
               leaveInstance(endVote);
               return endVote;
           }
       }
       break;
   case OBSERVING:
       LOG.debug("Notification from observer: " + n.sid);
       break;
   case FOLLOWING:
   case LEADING:
       /*
        * Consider all notifications from the same epoch
        * together.
        */
       if(n.electionEpoch == logicalclock){
           recvset.put(n.sid, new Vote(n.leader,
                                         n.zxid,
                                         n.electionEpoch,
                                         n.peerEpoch));
          
           if(ooePredicate(recvset, outofelection, n)) {
               self.setPeerState((n.leader == self.getId()) ?
                       ServerState.LEADING: learningState());

               Vote endVote = new Vote(n.leader, 
                       n.zxid, 
                       n.electionEpoch, 
                       n.peerEpoch);
               leaveInstance(endVote);
               return endVote;
           }
       }

       /*
        * Before joining an established ensemble, verify
        * a majority is following the same leader.
        */
       outofelection.put(n.sid, new Vote(n.version,
                                           n.leader,
                                           n.zxid,
                                           n.electionEpoch,
                                           n.peerEpoch,
                                           n.state));

       if(ooePredicate(outofelection, outofelection, n)) {
           synchronized(this){
               logicalclock = n.electionEpoch;
               self.setPeerState((n.leader == self.getId()) ?
                       ServerState.LEADING: learningState());
           }
           Vote endVote = new Vote(n.leader,
                                   n.zxid,
                                   n.electionEpoch,
                                   n.peerEpoch);
           leaveInstance(endVote);
           return endVote;
       }
       break;
   default:
       LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
               n.state, n.sid);
       break;
   }
相關文章
相關標籤/搜索