當ZooKeeper集羣啓動以後,須要完成leader和follower之間的數據同步。數據庫
首先leader和observer有一個共同的父類learner,裏面定義了一些公共方法。集羣正常運行後會有一個leader和多個follower(這裏observer就不單獨說了,和follower的行爲是相似的)。緩存
一、 註冊過程oop
follower在提供服務給客戶端以前必須完成註冊到leader的動做。this
註冊分爲如下3個主要步驟:設計
a) 調用connectToLeader方法鏈接到Leader。code
b) 調用registerWithLeader方法註冊到Leader,交換各自的sid、zxid和Epoch等信息,Leader以此決定事務同步的方式。server
c) 調用SyncWithLeader跟Leader進行事務數據同步,處理SNAP/DIFF/TRUNC包。blog
這3個方法都定義在父類Learner類中。下面咱們以Follower做爲例子說明註冊到Leader的完整流程。事務
二、connectToLeaderrem
connectToLeader方法功能較簡單,建立Socket鏈接到Leader。該方法定義在Follower的父類Learner中。它加了重試機制,具體的代碼這裏就不給出了。
最多能夠嘗試5次鏈接。鏈接成功後Leader會建立一個LearnerHandler專門處理與該Follower之間的QuorunPacket消息的傳遞。
三、registerWithLeader
Follower鏈接Leader成功以後,立刻調用registerWithLeader方法。
registerWithLeader方法首先發送FOLLOWERINFO包給Leader,告訴Leader本身的身份屬性(Follower的zxid,sid)。而後等待Leader回覆的LEADINFO包,獲取Leader的Epoch和zxid值,並更新Follower的Epoch和zxid值,以Leader信息爲準。
最後給Leader發ACKEPOCH包,告訴Leader此次Follower已經與Leader的zxid同步了。
這裏acceptedEpoch就是Leader的Epoch。
整個resigerWithLeader流程以下圖所示:
接下來Follower就要進入syncWithLeader方法來與Leader同步數據了。
四、SyncWithLeader
SyncWithLeader方法同步Leader的事務到Follower,該方法較長,這裏分段介紹其整個過程。
首先讀取同步數據包,主要代碼以下:
QuorumPacket qp = new QuorumPacket(); readPacket(qp); if (qp.getType() == Leader.SNAP){ zk.getZKDatabase().deserializeSnapshot(leaderIs); }else if (qp.getType() == Leader. TRUNC) { boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { // not able to truncate the log LOG.error("Not able to truncate the log " + Long.toHexString(qp.getZxid())); System.exit(13); } }else if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", ong.toHexString(qp.getZxid())); snapshotNeeded = false; }
同步方式分紅3種:
當前面都執行完成後,還有一段代碼處理後續消息(這裏是QuorumPacket類型),好比:PROPOSAL、COMMIT、NEWLEADER等。例如PROPOSAL是指同步期間收到的leader發送的寫請求信息,緩存在packetsNotCommitted裏,等後續處理,這這部分代碼能夠先無論。
這部分的主要代碼是這樣的:
while (self.isRunning()) { readPacket(qp); switch(qp.getType()) { case Leader.PROPOSAL: packetsNotCommitted.add(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: pif = packetsNotCommitted.peekFirst(); if (!writeToTxnLog) { zk.processTxn(pif.hdr, pif.rec); packetsNotCommitted.remove(); } else { packetsCommitted.add(qp.getZxid()); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { packetsNotCommitted.add(packet); packetsCommitted.add(qp.getZxid()); } break; case Leader.UPTODATE: if (isPreZAB1_0) { zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: if (snapshotNeeded) { zk.takeSnapshot(); } self.setCurrentEpoch(newEpoch); writeToTxnLog = true; isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } }
而後Leader會發送NEWLEADER包,Follower收到NEWLEADER包後回覆ACK給Leader。
最後Leader發UPTODATE包表示同步完成,Follower這時啓動服務端並跳出本次循環,準備結束整個註冊過程。
五、 Follower主流程
Follower是Learner的子類,Follower的啓動方法就是followLeader。
followLeader的主要代碼片斷以下:
connectToLeader(addr); long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); processPacket(qp); }
啓動時首先與Leader同步數據,而後啓動FollowerZooKeeperServer,在FollowerZooKeeperServer運行的同時,額外啓動while循環等待Peer的QuorumPacket包,調用processPacket方法處理這些包。
processPacket處理QuorumPeer傳送的QuorumPacket,最主要是處理兩種QuorumPacket:PROPOSAL和COMMIT。固然還有PING、COMMITANDACTIVATE等包類型,爲便於簡化梳理代碼設計思路,這裏就再也不詳述了。
該方法在收到Leader發送過來的QuorumPacket時被調用,主要是響應PROPOSAL和COMMIT兩種類型的消息。
PROPOSAL是Leader將要執行的寫事務命令;COMMIT是提交命令。Follower只有在收到COMMIT消息後纔會讓PROPOSAL命令的內容生效。
同一個寫事務命令會在Leader和多個Follower上都執行一次,保證集羣數據的一致性。
代碼片斷:
case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break;
Follower收到PROPOSAL消息後調用FollowerZooKeeperServer的logRequest方法;收到COMMIT消息後調用FollowerZooKeeperServer的commit方法。
Leader發送給集羣中全部follower的寫請求包。
Leader執行寫操做時須要告之集羣中的Learner,讓你們也執行寫操做,保證集羣數據的一致性。PROPOSAL是嚴格按照順序執行的,這也是ZOOKEEPER的核心設計思想之一。
當Leader認爲一個Proposal已被大多數Follower持久化並等待執行後會發送COMMIT包,通知各Follower能夠提交執行該Proposal了,最後調用到FinalRequestProcessor執行寫操做,經過這種機制保證寫操做能被大半數集羣機器執行。
六、 Observer主流程
Observer和Follower功能相似,主要的差異就是不參與選舉。
Observer的入口方法是observerLeader。當QuorumPeer的狀態是OBSERVING時會啓動Observer並調用observerLeader方法。
observerLeader同Follower的followLeader方法相似,首先註冊到Leader,事務同步後進入QuorumPacket包循環處理過程,調用processPacket方法處理QuorumPacket。
processPacket比Follower要簡單許多,最主要是處理INFORM包來執行Leader的寫請求命令。
這裏處理的是INFORM消息,Leader羣發寫事務時,給Follower發的是PROPOSAL並要等待Follower確認;而給Observer發的則是INFORM消息而且不須要Obverver回覆ACK消息來確認。