ZooKeeper系列之(五):領導者工做模式

領導者就是Leader,是整個集羣的寫事務流程負責人。數據庫

一輪選舉結束時產生新的Leader,而且Epoch加1。同時新的Leader先將本身的zxid設置爲Epoch左移32位(Epoch<32),將是集羣中最大的zxid。session

Leader監聽Socket等待Follower的鏈接請求,每次新的Follower鏈接的時候都會啓動一個LearnerHandler線程專門處理與該Follower的交互。LearnerHandler循環接收Follower的消息包,並交給Leader進行處理。併發

leader啓動流程:函數

1. Leader選舉完成以後,Peer確認了本身是Leader的身份,在QuromPeer的主線程中執行Leader的邏輯this

2. 建立Leader對象,並建立Server綁定在QuorumAddress上,用於和其餘Follower之間相互通訊線程

3. 調用Leader::lead函數,執行Leader的真正的邏輯code

a) 調用ZooKeeperServer::loadData,從磁盤中恢復數據和session列表對象

b) 啓用新的epoch,zookeeper中的zxid是64位,用於惟一標示一個操做,zxid的高32位是epoch,每次Leader切換加1,低32位爲序列號,每次操做加1事務

c) 啓動綁定在QuorumAddress上的Server,爲每一個Follower的鏈接創建一個LearnerHandler,用於和Follower作交互,這裏的邏輯另外單獨論述get

d) 向全部的Follower發送一個NEWLEADER包,宣告本身額Leader身份,並在initLimit時間內等待大多數的Follower完成和Leader的同步,併發送ACK包,表示Follower已經和Leader完成同步並能夠對外提供服務

e) 這時Leader和Client之間的交互在cnxnFactory的Server中,Leader和Follower之間的交互在LearnerHandler所屬的線程中

f) 而後調用Leader::lead函數的QuromPeer線程在每一個tickTime中都會發送ping消息給其餘的follower,follower在接收到ping消息後會回覆一個ping消息,並附帶上follower的session tracker裏的全部session信息,leader收到follower的ping消息後,根據傳回的session信息更新本身的session信息 。

Leader在接收到Follower的註冊請求以後(Follower調用connectToLeader方法),等待收到FOLLOWERINFO包:

QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if (qp.getType() == Leader.OBSERVERINFO) {
     learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
peerLastZxid = ss.getLastZxid();

1) lastAcceptedEpoch:是Follower的Epoch值。

2) Zxid:是Follower的zxid

3) newEpoch:Leader根據FOLLOWERINF的值計算出新的Epoch

4) newLeaderZxid:根據新的Epoch計算新的Leader的zxid

而後給Follower發送LEADERINFO包,將新的zxid和Epoch告訴Follower,好讓Follower知道應該要同步哪些數據。

Leader而後發送快照包給Follower,Follower根據快照包將本地數據庫恢復到與Leader相同。

若是Follower的事務比Leader少一些(在minCommittedLog 和maxCommittedLog之間),則不需發SNAP包,而是發DIFF包,同時將需補充的事務經過PROPOSAL和COMMIT發給Follower執行。相關邏輯在syncFollower,queueCommittedProposals,startSendingPackets等方法中實現。這部分主要代碼以下所示:

boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, 
leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
if (needSnap) {             
    try {
       long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
       oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
       bufferedOutput.flush();
       leader.zk.getZKDatabase().serializeSnapshot(oa);
       oa.writeString("BenWasHere", "signature");
       bufferedOutput.flush();
     } finally {
       snapshot.close();
     }
} 
startSendingPackets();

startSendingPackets將須要同步的事務發送給Follower,事務同步完成後,Leader發送NEWLEADER包給Follower。

而後等Follower回覆第一個ACK包。收到ACK以後調用Leader的waitForNewLeaderAck方法告訴Leader該Follower已經完成同步。

當Leader收到足夠多的waitForNewLeaderAck方法調用後(一般超過半數),知道大部分Follower已經註冊到本Leader上來了,這時候Leader才能確保正式發揮Leader的做用了。

相關文章
相關標籤/搜索