Zookeeper啓動和集羣選舉

1. QuorumPeerMain運行

1)判斷是採用單實例模式仍是多實例模式啓動QuorumPeerMain算法

2)在多實例模式下,加載啓動參數中指定的配置文件服務器

3)啓動QuorumPeeride

public class QuorumPeerMain {
    ...

    protected QuorumPeer quorumPeer;

    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        }
        ...
    }

    protected void initializeAndRun(String[] args) throws ConfigException, IOException {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }
        // 啓動data目錄下的定時清理任務
        ...
        // 啓動參數中指定配置文件,且配置文件中指定爲多實例
        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config); //
        } else {
          // 單實例模式啓動
          ...
        }
    }

    public void runFromConfig(QuorumPeerConfig config) throws IOException {
      ...
      try {
          // 客戶端鏈接工廠,默認NIOServerCnxnFactory
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();           cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
          quorumPeer = new QuorumPeer(config.getServers(),
                                      new File(config.getDataDir()),
                                      new File(config.getDataLogDir()),
 config.getElectionAlg(), // 選舉算法,默認FastLeaderElection(3)
                                      config.getServerId(),
                                      config.getTickTime(),
                                      config.getInitLimit(),
                                      config.getSyncLimit(),
                                      config.getQuorumListenOnAllIPs(),
                                      cnxnFactory,
                                      config.getQuorumVerifier());
          ...
 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));           ...
 quorumPeer.start(); // 啓動當前實例
          quorumPeer.join(); // 主線程等待quorumPeer線程執行
      } catch (InterruptedException e) {
          ...
      }
    }
}

2. QuorumPeer啓動

1)監聽客戶端鏈接this

2)確認選舉算法,監聽選舉端口spa

3)啓動服務(run),開始選舉線程

4 ) 成爲Leader / Follower / Observer(後續介紹)日誌

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    ...
    ServerCnxnFactory cnxnFactory; // NIOServerCnxnFactory
    ...
    @Override
    public synchronized void start() {
        loadDataBase(); // 加載currentEpoch和acceptedEpoch
        cnxnFactory.start(); // 監聽客戶端鏈接,NIOServerCnxnFactory.start
        startLeaderElection(); // 確認選舉算法,監聽選舉端口
        super.start(); // 啓動服務(run),開始選舉
    }
    ...
    synchronized public void startLeaderElection() {
        try {
            // 當前投票投給本身
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        } catch(IOException e) {
            ...
        }
        ...
        // 確認選舉算法,監聽選舉端口
        this.electionAlg = createElectionAlgorithm(electionType);
    }
    ...
    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
        switch (electionAlgorithm) {
        ...
        case 3:
            qcm = createCnxnManager(); // QuorumCnxManager負責收發投票
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start(); // 啓動ServerSocket監聽選舉端口
                le = new FastLeaderElection(this, qcm);             }
            ...
            break;
        ...
        }
        return le;
    }
    ...
    @Override
    public void run() {
        ...
        try {
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        // 啓動ReadOnlyZooKeeperServer處理只讀請求
                        ...
                    } else {
                        try {
                            setBCVote(null);
                            // 開始選舉,FastLeaderElection.lookForLeader
 setCurrentVote(makeLEStrategy().lookForLeader());                         }
                        ...
                    }
                    break;
                case OBSERVING:
                    try {
 setObserver(makeObserver(logFactory)); // Observer observer.observeLeader();                     }
                    ...
                    break;
                case FOLLOWING:
                    try {
 setFollower(makeFollower(logFactory)); // Follower follower.followLeader();                     }
                    ...
                    break;
                case LEADING:
                    try {
 setLeader(makeLeader(logFactory)); // Leader leader.lead();                         setLeader(null);
                    }
                    ...
                    break;
                }
            }
        } finally {
            ...
        }
    }
}

 

3. 快速選舉算法code

1)廣播推薦的Leader爲本身,並接收其它ZK服務器的投票server

2)選舉進行中blog

  1' 若對方推薦的Leader > 本身推薦Leader,跟隨對方投票並廣播

  2' 更新票箱,判斷對方投票是否過半,是則返回最終投票,不然選舉繼續

3)選舉已經結束

  更新票箱,判斷對方投票是否過半且爲Leader,是則返回最終投票,不然選舉繼續

public class FastLeaderElection implements Election {
    ...
    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
        ...
        starter(self, manager);
    }
    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        ...
        sendqueue = new LinkedBlockingQueue<ToSend>(); // 待發送投票隊列
        recvqueue = new LinkedBlockingQueue<Notification>(); // 待接收投票隊列
        // 經過QuorumCnxManager往sendqueue添加投票,從recvqueue獲取投票
        this.messenger = new Messenger(manager);
    }
    ...
    public Vote lookForLeader() throws InterruptedException {
        ...
        try {
 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); // 投票箱 // 如當前ZK服務器新加入到集羣時,選舉已經結束且當前選舉輪數落後
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
            ...
            synchronized(this){
                logicalclock++; // 選舉輪數 + 1
                // 初始推薦Leader爲本身,即初始投票投給本身
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }
            ...
 sendNotifications(); // 廣播本身的投票 // 循環直至選舉出Leader
            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
                // 獲取一個其它ZK服務器的投票(超時時間爲200毫秒)
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); if(n == null){
                    // 從新廣播本身的投票,增長超時時間
                    ...
                }
                else if(self.getVotingView().containsKey(n.sid)) {
                    switch (n.state) {
                    case LOOKING: // 選舉進行中
                        if (n.electionEpoch > logicalclock) { // 當前選舉輪數落後
                            logicalclock = n.electionEpoch; // 更新選舉輪數
                            recvset.clear(); // 清空票箱
                            // 對方投票 > 當前投票(對方推薦的LeaderID > 本身推薦的LeaderID)
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch); // 跟隨對方投票
                            } else {
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); // 堅持當前投票
                            }
 sendNotifications(); // 從新廣播本身的投票
                        } else if (n.electionEpoch < logicalclock) { // 當前選舉輪數領先
                            // 紀錄日誌,不作其它處理
                            ...
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch); // 跟隨對方投票
 sendNotifications(); // 從新廣播本身的投票
                        }
                        ...
                        // 更新票箱
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 對方投票過半
                        if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // 等待200ms接收投票
                            while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                // 對方投票 > 當前投票,則選舉繼續
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }
                            if (n == null) { // 200ms內未接收到新的投票
                                // 服務器狀態由LOOKING轉爲LEADING/FOLLOWING/OBSERVING
                                self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); // 確認和返回最終投票
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch);                                 leaveInstance(endVote);
                                return endVote;                             }
                        }
                        break;
                    case OBSERVING: break;
                    case FOLLOWING:  case LEADING: // 如當前ZK服務器新加入到集羣時,選舉已經結束
                        if(n.electionEpoch == logicalclock) { // ??爲什麼選舉輪數落後則使用outofelection
                            // 更新recvset票箱,並查找當前集羣Leader
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            if(ooePredicate(recvset, outofelection, n)) {
                                self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState());
                                Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        // 更新outofelection票箱,並查找當前集羣Leader
                        outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        if(ooePredicate(outofelection, outofelection, n)) {
                            synchronized(this){
                                logicalclock = n.electionEpoch;
                                self.setPeerState((n.leader == self.getId()) ? erverState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    ...
                    }
                }
                ...
            }
            return null;
        }
        ...
    }
}
相關文章
相關標籤/搜索