領導者就是Leader,是整個集羣的寫事務流程負責人。數據庫
一輪選舉結束時產生新的Leader,而且Epoch加1。同時新的Leader先將本身的zxid設置爲Epoch左移32位(Epoch<32),將是集羣中最大的zxid。session
Leader監聽Socket等待Follower的鏈接請求,每次新的Follower鏈接的時候都會啓動一個LearnerHandler線程專門處理與該Follower的交互。LearnerHandler循環接收Follower的消息包,並交給Leader進行處理。併發
leader啓動流程:函數
1. Leader選舉完成以後,Peer確認了本身是Leader的身份,在QuromPeer的主線程中執行Leader的邏輯this
2. 建立Leader對象,並建立Server綁定在QuorumAddress上,用於和其餘Follower之間相互通訊線程
3. 調用Leader::lead函數,執行Leader的真正的邏輯code
a) 調用ZooKeeperServer::loadData,從磁盤中恢復數據和session列表對象
b) 啓用新的epoch,zookeeper中的zxid是64位,用於惟一標示一個操做,zxid的高32位是epoch,每次Leader切換加1,低32位爲序列號,每次操做加1事務
c) 啓動綁定在QuorumAddress上的Server,爲每一個Follower的鏈接創建一個LearnerHandler,用於和Follower作交互,這裏的邏輯另外單獨論述get
d) 向全部的Follower發送一個NEWLEADER包,宣告本身額Leader身份,並在initLimit時間內等待大多數的Follower完成和Leader的同步,併發送ACK包,表示Follower已經和Leader完成同步並能夠對外提供服務
e) 這時Leader和Client之間的交互在cnxnFactory的Server中,Leader和Follower之間的交互在LearnerHandler所屬的線程中
f) 而後調用Leader::lead函數的QuromPeer線程在每一個tickTime中都會發送ping消息給其餘的follower,follower在接收到ping消息後會回覆一個ping消息,並附帶上follower的session tracker裏的全部session信息,leader收到follower的ping消息後,根據傳回的session信息更新本身的session信息 。
Leader在接收到Follower的註冊請求以後(Follower調用connectToLeader方法),等待收到FOLLOWERINFO包:
QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); leader.waitForEpochAck(this.getSid(), ss); peerLastZxid = ss.getLastZxid();
1) lastAcceptedEpoch:是Follower的Epoch值。
2) Zxid:是Follower的zxid
3) newEpoch:Leader根據FOLLOWERINF的值計算出新的Epoch
4) newLeaderZxid:根據新的Epoch計算新的Leader的zxid
而後給Follower發送LEADERINFO包,將新的zxid和Epoch告訴Follower,好讓Follower知道應該要同步哪些數據。
Leader而後發送快照包給Follower,Follower根據快照包將本地數據庫恢復到與Leader相同。
若是Follower的事務比Leader少一些(在minCommittedLog 和maxCommittedLog之間),則不需發SNAP包,而是發DIFF包,同時將需補充的事務經過PROPOSAL和COMMIT發給Follower執行。相關邏輯在syncFollower,queueCommittedProposals,startSendingPackets等方法中實現。這部分主要代碼以下所示:
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null); queuedPackets.add(newLeaderQP); if (needSnap) { try { long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); bufferedOutput.flush(); leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush(); } finally { snapshot.close(); } } startSendingPackets();
startSendingPackets將須要同步的事務發送給Follower,事務同步完成後,Leader發送NEWLEADER包給Follower。
而後等Follower回覆第一個ACK包。收到ACK以後調用Leader的waitForNewLeaderAck方法告訴Leader該Follower已經完成同步。
當Leader收到足夠多的waitForNewLeaderAck方法調用後(一般超過半數),知道大部分Follower已經註冊到本Leader上來了,這時候Leader才能確保正式發揮Leader的做用了。