ZK = zookeeperhtml
ZK是微服務解決方案中擁有服務註冊發現最爲核心的環境,是微服務的基石。做爲服務註冊發現模塊,並非只有ZK一種產品,目前獲得行業承認的還有:Eureka、Consul。java
這裏咱們只聊ZK,這個工具自己很小zip包就幾兆,安裝很是傻瓜,可以支持集羣部署。git
官網地址:https://zookeeper.apache.org/github
在集羣環境下ZK的leader&follower的概念,已經節點異常ZK面臨的問題以及如何解決。ZK自己是java語言開發,也開源到Github上但官方文檔對內部介紹的不多,零散的博客不少,有些寫的很不錯。算法
提問:apache
ZK集羣單節點狀態(每一個節點有且只有一個狀態),ZK的定位必定須要一個leader節點處於lading狀態。服務器
Zookeeper專門設計了一種名爲原子廣播(ZAB)的支持崩潰恢復的一致性協議。ZK實現了一種主從模式的系統架構來保持集羣中各個副本之間的數據一致性,全部的寫操做都必須經過Leader完成,Leader寫入本地日誌後再複製到全部的Follower節點。一旦Leader節點沒法工做,ZAB協議可以自動從Follower節點中從新選出一個合適的替代者,即新的Leader,該過程即爲領導選舉。markdown
ZK集羣中事務處理是leader負責,follower會轉發到leader來統一處理。簡單理解就是ZK的寫統一leader來作,讀能夠follower處理,這也就是CAP理論中ZK更適合讀多寫少的服務。架構
ZK投票處理策略運維
投票信息包含 :所選舉leader的Serverid,Zxid,SelectionEpoch
ZK中有三種選舉算法,分別是LeaderElection,FastLeaderElection,AuthLeaderElection,FastLeaderElection和AuthLeaderElection是相似的選舉算法,惟一區別是後者加入了認證信息, FastLeaderElection比LeaderElection更高效,後續的版本只保留FastLeaderElection。
理解:
在集羣環境下多個節點啓動,ZK首先須要在多個節點中選出一個節點做爲leader並處於Leading狀態,這樣就面臨一個選舉問題,同時選舉規則是什麼樣的。「過半選舉算法」:投票選舉中得到票數過半的節點勝出,即狀態從looking變爲leading,效率更高。
官網資料描述:Clustered (Multi-Server) Setup,以下圖:
As long as a majority of the ensemble are up, the service will be available. Because Zookeeper requires a majority, it is best to use an odd number of machines. For example, with four machines ZooKeeper can only handle the failure of a single machine; if two machines fail, the remaining two machines do not constitute a majority. However, with five machines ZooKeeper can handle the failure of two machines.
以5臺服務器講解思路:
假設5臺中掛了2臺(三、4),其中leader也掛掉:
leader和follower間有檢查心跳,須要同步數據 Leader節點掛了,整個Zookeeper集羣將暫停對外服務,進入新一輪Leader選舉
1)服務器一、二、5發現與leader失聯,狀態轉爲looking,開始新的投票 2)服務器一、二、5分別開始投票並廣播投票信息,自身Epoch自增; 3) 服務器一、二、5分別處理投票,判斷出leader分別廣播 4)根據投票處理邏輯會選出一臺(2票過半) 5)各自服務器從新變動爲leader、follower狀態 6)從新提供服務
源碼解析:
URL: FastLeaderElection
/** * Starts a new round of leader election. Whenever our QuorumPeer * changes its state to LOOKING, this method is invoked, and it * sends notifications to all other peers. */ public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } self.start_fle = Time.currentElapsedTime(); try { Map<Long, Vote> recvset = new HashMap<Long, Vote>(); Map<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = minNotificationInterval; synchronized (this) { logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications(); SyncedLearnerTracker voteSet; /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { /* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if (n == null) { if (manager.haveDelivered()) { sendNotifications(); } else { manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout * 2; notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING: if (getInitLastLoggedZxid() == -1) { LOG.debug("Ignoring notification as our zxid is -1"); break; } if (n.zxid == -1) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid); break; } // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if (LOG.isDebugEnabled()) { LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); if (voteSet.hasAllQuorums()) { // Verify if there is any change in the proposed leader while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify that * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { logicalclock.set(n.electionEpoch); setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)"); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if (self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); } }
/* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
腦裂問題出如今集羣中leader死掉,follower選出了新leader而原leader又復活了的狀況下,由於ZK的過半機制是容許損失必定數量的機器而扔能正常提供給服務,當leader死亡判斷不一致時就會出現多個leader。
方案:
ZK的過半機制必定程度上也減小了腦裂狀況的出現,起碼不會出現三個leader同時。ZK中的Epoch機制(時鐘)每次選舉都是遞增+1,當通訊時須要判斷epoch是否一致,小於本身的則拋棄,大於本身則重置本身,等於則選舉;
// If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if (LOG.isDebugEnabled()) { LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); }
在平常的ZK運維時須要注意以上場景在極端狀況下出現問題,特別是腦裂的出現,能夠採用:
過半選舉策略下部署原則:
詳細的算法邏輯是很複雜要考慮不少狀況,其中有個Epoch的概念(自增加),分爲:LogicEpoch和ElectionEpoch,每次投票都有判斷每一個投票週期是否一致等等。
在思考ZK策略時常常遇到這樣的問題(上文中兩塊),梳理了一下思路以便於理解也做爲後續回顧,特別感謝下面幾篇博文的支持,感謝分享;
做者:Owen Jia
能夠關注他的博客:Owen Blog
參考博文資料: