投票選舉能夠說是ZooKeeper中的難點所在,也是精華所在了。數據庫
ZooKeeper服務端集羣在正常工做時須要一個Leader和無數個Follower,他們都對外提供統一的服務接口,客戶端鏈接任意一臺服務端發送命令其結果都是相同的,而且結果都是嚴格按照客戶端提交讀寫命令的順序來執行。服務器
其中Leader就負責維持整個集羣數據的一致性和寫操做的順序執行。網絡
固然集羣中老是可能發生故障,當某個Follower發生故障時,該Follower會從新啓動選舉流程並從新鏈接到Leader,成功鏈接後會同步Leader的數據,這樣就算以前有寫操做失敗了,經過同步過程最後會恢復完整的數據庫。Leader/Follower的數據同步過程會在後面詳細論述。數據結構
集羣經過投票選舉過程確認同一時刻集羣最多隻會存在一個Leader。當有新節點加入集羣時,新節點會啓動FastLeaderElection過程來從新選舉,尋找新的Leader。框架
1、QuorumPeerui
每臺服務端主機都會啓動一個QuorumPeer進程,它負責投票選舉,從QuorumPeerMain類中啓動QuorumPeer。spa
QuorumPeer共有4種工做模式,不一樣的模式啓動不一樣的代碼:線程
QuorumPeer剛啓動時首先設置爲LOOKING工做模式,它會啓動投票選舉過程,調用FastLeadElection類,啓動Messenger接收選舉信息。code
它的幾種工做狀態的切換流程是這樣的:server
QuorumPeer知道集羣各個機器的IP地址,當經過FastLeaderElection完成選舉過程後會肯定本身是Leader仍是Follower,若是是Leader則調用lead方法進入領導者工做流程;若是是Follower則調用followLeader方法進入追隨者工做流程。
QuorumPeer維持在4種狀態之間不停切換。在LOOKING狀態時有一個細節要關注一下,若是shuttingDownLE爲true,則表示要關閉FastLeaderElection線程從新啓動,這時候會經過Messenger來關閉消息接收,再也不接收集羣的選舉信息。
startLeaderElection方法是在LOOKING狀態啓動選舉過程的。四種狀態的切換過程代碼以下:
while (running) { switch (getPeerState()) { case LOOKING: try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { setPeerState(ServerState.LOOKING); } break; case OBSERVING: try { setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); updateServerState(); } break; case FOLLOWING: try { setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } }
2、QuorumCnxManager
QuorumCnxManager是專門用於選舉信息交換的Socket框架,Messenger接收的報文就是從QuorumCnxManager的接收隊列中獲取的。
QuorumCnxManager一般採用Netty框架負責底層Socket鏈接管理,這是一種非阻塞的Socket方式,提供Select方法在多個Socket之間輪詢數據,以先到先得的方式處理來自不一樣QuorumPeer的選舉交換消息。
這套東西和客戶端服務端的工做模式是同樣的,區別只是這裏是專門給選舉過程使用的。
發送線程定義爲SendWorker,接收線程定義爲RecvWorker。對於每個QuorumPeer鏈接都會建立一個SendWorkder和一個RecvWorker。
發送隊列是一個HashMap,定義以下:
ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
接收隊列定義爲:
ArrayBlockingQueue<Message> recvQueue;
Messenger發送消息時調用QuorumCnxManager的toSend方法,將發送消息添加到queueSendMap中,這裏咱們看到只有發送給遠方的消息纔會真正放入queueSendMap,發給本身的消息直接放到接收隊列recvQueue中。功能很清晰,具體代碼這裏就不看了。
3、Messenger
Messenger定義在FastLeaderElection類中,專用於QuorumPeer之間選舉消息的傳遞,交換toSend/Notification消息體。
它由FastLeaderElection負責啓動,負責選舉過程當中QuorumPeer間傳遞選舉消息。Messenger包括WorkerReceiver和WorkerSender兩個子線程,一個負責接收QuorumPeer消息,另外一個負責發送QuorumPeer消息。
WorkerSender使用ToSend結構發送消息,而WorkerReceiver使用Notification結構接收消息。toSend和Notification消息體的屬性基本是一致性的。
Messenger底層採用QuorumCnxManager做爲Socket鏈接池,每一個QuorumPeer都會建立一個QuorumCnxManager,用於與集羣其餘全部的QuorumPeer的信息交互。
Notification類用於接收Vote信息的消息接收體,在選舉過程當中集羣交換的包都是Notification類型。
ToSend類和Nofitication相似,發送Vote信息的消息定義體。
Notification和ToSend只在Messenger中處理,除此以外的地方看不到這兩種數據結構。
3.1 WorkerReceiver
WorkerReceiver是接收線程,專門接收參與選舉的服務器返回的通知,進行相應的處理。
從QuorumCnxManager獲取網絡接收到的Message包,而後組裝成Notification消息,放入recvqueue隊列,等待後續處理。
若是消息來自觀察者或者追隨者,則將當前的領導者的zxid和sid當即回覆給對方;
若是消息來自另外一個LOOKING狀態的參與選舉者,則將對方發來包裏的zxid和sid等信息組裝Nofitication消息,而後判斷:
其餘的包則忽略。
主要代碼以下:
while (!stop) { response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ recvqueue.offer(n); if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){ Vote v = getVote(); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } } else { Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } } }
3.2 WorkerSender
WorkerSender是發送線程,專門負責發送ToSend消息給參與選舉的其餘機器。
WorkerSender的流程比較簡單,就是將ToSend消息放入發送隊列,發送線程循環發送。
主要代碼以下:
public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); manager.toSend(m.sid, requestBuffer); }
manager就是指QuorumCnxManager。