Zookeeper的選舉算法和腦裂問題深度講解

ZK介紹

ZK = zookeeperhtml

ZK是微服務解決方案中擁有服務註冊發現最爲核心的環境,是微服務的基石。做爲服務註冊發現模塊,並非只有ZK一種產品,目前獲得行業承認的還有:Eureka、Consul。java

這裏咱們只聊ZK,這個工具自己很小zip包就幾兆,安裝很是傻瓜,可以支持集羣部署。git

官網地址:https://zookeeper.apache.org/github

背景

在集羣環境下ZK的leader&follower的概念,已經節點異常ZK面臨的問題以及如何解決。ZK自己是java語言開發,也開源到Github上但官方文檔對內部介紹的不多,零散的博客不少,有些寫的很不錯。算法

提問:apache

ZK節點狀態角色

ZK集羣單節點狀態(每一個節點有且只有一個狀態),ZK的定位必定須要一個leader節點處於lading狀態。服務器

  • looking:尋找leader狀態,當前集羣沒有leader,進入leader選舉流程。
  • following:跟隨者狀態,接受leading節點同步和指揮。
  • leading:領導者狀態。
  • observing:觀察者狀態,表名當前服務器是observer。

ZAB協議(原子廣播)

Zookeeper專門設計了一種名爲原子廣播(ZAB)的支持崩潰恢復的一致性協議。ZK實現了一種主從模式的系統架構來保持集羣中各個副本之間的數據一致性,全部的寫操做都必須經過Leader完成,Leader寫入本地日誌後再複製到全部的Follower節點。一旦Leader節點沒法工做,ZAB協議可以自動從Follower節點中從新選出一個合適的替代者,即新的Leader,該過程即爲領導選舉。markdown

ZK集羣中事務處理是leader負責,follower會轉發到leader來統一處理。簡單理解就是ZK的寫統一leader來作,讀能夠follower處理,這也就是CAP理論中ZK更適合讀多寫少的服務。架構

過半選舉算法

ZK投票處理策略運維

投票信息包含 :所選舉leader的Serverid,Zxid,SelectionEpoch

  • Epoch判斷,自身logicEpoch與SelectionEpoch判斷:大於、小於、等於。
  • 優先檢查ZXID。ZXID比較大的服務器優先做爲Leader。
  • 若是ZXID相同,那麼就比較myid。myid較大的服務器做爲Leader服務器。

ZK中有三種選舉算法,分別是LeaderElection,FastLeaderElection,AuthLeaderElection,FastLeaderElection和AuthLeaderElection是相似的選舉算法,惟一區別是後者加入了認證信息, FastLeaderElection比LeaderElection更高效,後續的版本只保留FastLeaderElection。

理解:

在集羣環境下多個節點啓動,ZK首先須要在多個節點中選出一個節點做爲leader並處於Leading狀態,這樣就面臨一個選舉問題,同時選舉規則是什麼樣的。「過半選舉算法」:投票選舉中得到票數過半的節點勝出,即狀態從looking變爲leading,效率更高。

官網資料描述:Clustered (Multi-Server) Setup,以下圖:

As long as a majority of the ensemble are up, the service will be available. Because Zookeeper requires a majority, it is best to use an odd number of machines. For example, with four machines ZooKeeper can only handle the failure of a single machine; if two machines fail, the remaining two machines do not constitute a majority. However, with five machines ZooKeeper can handle the failure of two machines.

以5臺服務器講解思路:

  1. 服務器1啓動,此時只有它一臺服務器啓動了,它發出去的Vote沒有任何響應,因此它的選舉狀態一直是LOOKING狀態;
  2. 服務器2啓動,它與最開始啓動的服務器1進行通訊,互相交換本身的選舉結果,因爲二者都沒有歷史數據,因此id值較大的服務器2勝出,可是因爲沒有達到超過半數以上的服務器都贊成選舉它(這個例子中的半數以上是3),因此服務器1,2仍是繼續保持LOOKING狀態.
  3. 服務器3啓動,根據前面的理論,分析有三臺服務器選舉了它,服務器3成爲服務器1,2,3中的老大,因此它成爲了此次選舉的leader.
  4. 服務器4啓動,根據前面的分析,理論上服務器4應該是服務器1,2,3,4中最大的,可是因爲前面已經有半數以上的服務器選舉了服務器3,因此它只能接收當小弟的命了.
  5. 服務器5啓動,同4同樣,當小弟.

假設5臺中掛了2臺(三、4),其中leader也掛掉:

leader和follower間有檢查心跳,須要同步數據 Leader節點掛了,整個Zookeeper集羣將暫停對外服務,進入新一輪Leader選舉

1)服務器一、二、5發現與leader失聯,狀態轉爲looking,開始新的投票 2)服務器一、二、5分別開始投票並廣播投票信息,自身Epoch自增; 3) 服務器一、二、5分別處理投票,判斷出leader分別廣播 4)根據投票處理邏輯會選出一臺(2票過半) 5)各自服務器從新變動爲leader、follower狀態 6)從新提供服務

源碼解析:

URL: FastLeaderElection

/**
 * Starts a new round of leader election. Whenever our QuorumPeer
 * changes its state to LOOKING, this method is invoked, and it
 * sends notifications to all other peers.
 */
public Vote lookForLeader() throws InterruptedException {
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }

    self.start_fle = Time.currentElapsedTime();
    try {
        Map<Long, Vote> recvset = new HashMap<Long, Vote>();

        Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = minNotificationInterval;

        synchronized (this) {
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        sendNotifications();

        SyncedLearnerTracker voteSet;

        /*
         * Loop in which we exchange notifications until we find a leader
         */

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if (n == null) {
                if (manager.haveDelivered()) {
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout * 2;
                notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                /*
                 * Only proceed if the vote comes from a replica in the current or next
                 * voting view for a replica in the current or next voting view.
                 */
                switch (n.state) {
                case LOOKING:
                    if (getInitLastLoggedZxid() == -1) {
                        LOG.debug("Ignoring notification as our zxid is -1");
                        break;
                    }
                    if (n.zxid == -1) {
                        LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                        break;
                    }
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(
                                "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
                                + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding vote: from=" + n.sid
                                  + ", proposed leader=" + n.leader
                                  + ", proposed zxid=0x" + Long.toHexString(n.zxid)
                                  + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }

                    // don't care about the version if it's in LOOKING state
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                    if (voteSet.hasAllQuorums()) {

                        // Verify if there is any change in the proposed leader
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: {}", n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if (n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            setPeerState(n.leader, voteSet);
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify that
                     * a majority are following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                    voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                        synchronized (this) {
                            logicalclock.set(n.electionEpoch);
                            setPeerState(n.leader, voteSet);
                        }
                        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");
                    break;
                }
            } else {
                if (!validVoter(n.leader)) {
                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                }
                if (!validVoter(n.sid)) {
                    LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                }
            }
        }
        return null;
    } finally {
        try {
            if (self.jmxLeaderElectionBean != null) {
                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
        LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
    }
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
*  as current zxid, but server id is higher.
*/

return ((newEpoch > curEpoch)
	|| ((newEpoch == curEpoch)
		&& ((newZxid > curZxid)
			|| ((newZxid == curZxid)
				&& (newId > curId)))));

腦裂問題

腦裂問題出如今集羣中leader死掉,follower選出了新leader而原leader又復活了的狀況下,由於ZK的過半機制是容許損失必定數量的機器而扔能正常提供給服務,當leader死亡判斷不一致時就會出現多個leader。

方案:

ZK的過半機制必定程度上也減小了腦裂狀況的出現,起碼不會出現三個leader同時。ZK中的Epoch機制(時鐘)每次選舉都是遞增+1,當通訊時須要判斷epoch是否一致,小於本身的則拋棄,大於本身則重置本身,等於則選舉;

// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
    logicalclock.set(n.electionEpoch);
    recvset.clear();
    if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
        updateProposal(n.leader, n.zxid, n.peerEpoch);
    } else {
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }
    sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
    if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
            + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
    }
    break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
    updateProposal(n.leader, n.zxid, n.peerEpoch);
    sendNotifications();
}

概括

在平常的ZK運維時須要注意以上場景在極端狀況下出現問題,特別是腦裂的出現,能夠採用:

過半選舉策略下部署原則:

  1. 服務器羣部署要單數,如:三、五、七、...,單數是最容易選出leader的配置量。
  2. ZK容許節點最大損失數,原則就是「保證過半選舉正常」,多了就是浪費。

詳細的算法邏輯是很複雜要考慮不少狀況,其中有個Epoch的概念(自增加),分爲:LogicEpoch和ElectionEpoch,每次投票都有判斷每一個投票週期是否一致等等。

在思考ZK策略時常常遇到這樣的問題(上文中兩塊),梳理了一下思路以便於理解也做爲後續回顧,特別感謝下面幾篇博文的支持,感謝分享;

做者:Owen Jia

能夠關注他的博客:Owen Blog

參考博文資料:

zookeeper3.3.5

理解zookeeper選舉機制

Zookeeper選舉算法原理

看完這篇文章你就清楚的知道 ZooKeeper的 概念了

腦裂是什麼?Zookeeper是如何解決的?

zookeeper腦裂問題

相關文章
相關標籤/搜索