ZooKeeper之FastLeaderElection算法詳解

當咱們把zookeeper服務啓動時,首先須要作的一件事就是leader選舉,zookeeper中leader選舉的算法有3種,包括LeaderElection算法、AuthFastLeaderElection算法以及FastLeaderElection算法,其中FastLeadElection算法是默認的,固然,咱們也能夠在配置文件中修改配置項:electionAlg。java

一、當zookeeper服務啓動時,在類QuorumPeerMain中的入口函數main,主線程啓動:算法

public class QuorumPeerMain {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);

    private static final String USAGE = "Usage: QuorumPeerMain configfile";

    protected QuorumPeer quorumPeer;

    /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();

二、而後即是QuorumPeer重寫Thread.start方法,啓動:服務器

          quorumPeer.start();
          quorumPeer.join();

在類QuorumPeer中app

   @Override
    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        loadDataBase();
        cnxnFactory.start();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        startLeaderElection();
        super.start();
    }
三、能夠從上面的源碼中看到,quorumPeer線程啓動後,首先作的是數據恢復,它會讀取保存在磁盤中的數據:

 private void loadDataBase() {
        try {
            //從本地文件中恢復db
            zkDb.loadDataBase();

            // load the epochs
            /*
            從最新的zxid恢復epoch變量
            其中zxid爲long型,前32位表明epoch值,後32位表明zxid值,
            這個zxid(ZooKeeper Transaction Id),即事務id,zookeeper每次更,zxid都會增大
            所以越大表明數據越新
            */
            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
            long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
            try {
                currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
            } catch(FileNotFoundException e) {
            	// pick a reasonable epoch number
            	// this should only happen once when moving to a
            	// new code version
            	currentEpoch = epochOfZxid;
                //....

四、而後即是初始化選舉,一開始選舉本身,默認使用的算法是FastLeaderElection:

synchronized public void startLeaderElection() {
       try {
            /*
            先投本身
            */
           if (getPeerState() == ServerState.LOOKING) {
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }

       // if (!getView().containsKey(myid)) {
      //      throw new RuntimeException("My id " + myid + " not in the peer list");
        //}
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);
    }
五、而後即是綁定選舉端口,FastLeaderElection初始化:

protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;

        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = new QuorumCnxManager(this);
            /*
            綁定選舉端口,等待集羣其它機器鏈接
            */
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                //基於TCP的選舉算法
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

六、QuorumPeer線程啓動:

private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;

        /*
        業務層發送隊列,業務對象ToSend
        業務層接收隊列,業務對象Notification
        */
        sendqueue = new LinkedBlockingQueue<ToSend>();
        recvqueue = new LinkedBlockingQueue<Notification>();
        this.messenger = new Messenger(manager);

    }
在FastLeaderElection.java文件中:
Messenger(QuorumCnxManager manager) {

            this.ws = new WorkerSender(manager);

            this.wsThread = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
            this.wsThread.setDaemon(true);

            this.wr = new WorkerReceiver(manager);

            this.wrThread = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
            this.wrThread.setDaemon(true);
        }
七、在進行選舉的過程當中,每臺zookeeper server服務器有如下四種狀態:LOOKING、FOLLOWING、LEADING、OBSERVING,其中出於OBSERVING狀態的server不參加投票過程,只有出於LOOKING狀態的機子才參加投票過程,一旦投票結束,server的狀態就會變成FOLLOWER或者LEADER。

下面先說一下leader選舉過程:ide

步驟1:對於處於LOOKING狀態的server來講,首先判斷一個被稱爲邏輯時鐘值(logicalclock),若是收到的logicalclock的值大於當前server自身的logicalclock值,說明這是更新的一次選舉,此時須要更新自身server的logicalclock值,而且將以前收到的來自其餘server的投票結果清空,而後判斷是否須要更新自身的投票,判斷的標準是先看epoch值的大小,而後再判斷zxid的大小,最後再看server id的大小(固然,針對這種狀況,server確定會更新自身的投票,由於當前server的epoch值小於收到的epoch值嘛),而後將自身的投票廣播給其餘server。函數

在FastLeaderElection.java文件中:this

 protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */

        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }
步驟2:若是是自身的logicalclock值大於接收的logicalclock值,那麼就直接break;若是恰好相等, 就根據epoch、zxid以及server id來判斷是否須要更新,而後再把本身的投票廣播給其餘server,最後要把收到投票加入到當前server接收的投票隊伍中。

 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

在FastLeaderElection.java文件的lookForLeader函數中:.net

case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            //清空以前收到的投票結果
                            recvset.clear();
                            //判斷是否須要更新自身投票
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            if(LOG.isDebugEnabled()){
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break; 
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            //廣播
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }

                        //加入投票隊伍
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

步驟3:服務器判斷投票是否結束,結束的條件是:是否某個leader獲得了半數以上的server的支持,若是是,則嘗試再等一下子(200ms)看是否收到更新數據,若是沒有收到,則設置自身的角色(follower Or leader),而後退出選舉流程,不然繼續。

FastLeaderElection.java文件中;線程

//判斷投票是否結束
    private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        /*
         * First make the views consistent. Sometimes peers will have different
         * zxids for a server depending on timing.
         */
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                voteSet.addAck(entry.getKey());
            }
        }

        return voteSet.hasAllQuorums();
    }

在lookForLeader函數中:

 //判讀投票是否結束
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            //再等一下子,看是否有新的投票
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            //若是沒有發生新的投票,則結束選舉過程
                            //設置自身狀態
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

步驟4:以上咱們討論的是數據發送server的狀態是LOOKING狀態,若是數據發送方的狀態是FOLLOWING或是LEADING狀態,那麼若是logicalclock相同,則將數據保存到recvset中,若是對方server自稱是leader的話,那麼就判斷是否有半數以上的server支持它,若是是,則設置自身選舉狀態而且退出選舉;

 case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        //當前server與發送方server的logicalclock相同
                        if(n.electionEpoch == logicalclock.get()){
                            //加入到recvset中
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            if(termPredicate(recvset, new Vote(n.leader,
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

步驟5:若是收到的數據的logicalclock值與當前server的logicalclock不相等,那麼說明在另一個選舉中已經有了選舉結果,因而加入outofelection集合中,而且在outofelection集合中判斷時候支持過半,若是是,則更新自身的投票,而且設置自身的狀態:

 outofelection.put(n.sid, new Vote(n.leader, 
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                        if (termPredicate(outofelection, new Vote(n.leader,
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                                && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                            synchronized(this){
                                logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }

總結:這就是zookeeper的FastLeaderElection選舉的大體過程。

參考博客:debug

http://blog.csdn.net/xhh198781/article/details/6619203

http://iwinit.iteye.com/blog/1773531

相關文章
相關標籤/搜索