ZooKeeper系列之(四):跟隨者工做模式

當ZooKeeper集羣啓動以後,須要完成leader和follower之間的數據同步。數據庫

首先leader和observer有一個共同的父類learner,裏面定義了一些公共方法。集羣正常運行後會有一個leader和多個follower(這裏observer就不單獨說了,和follower的行爲是相似的)。緩存

一、 註冊過程oop

follower在提供服務給客戶端以前必須完成註冊到leader的動做。this

 

註冊分爲如下3個主要步驟:設計

a) 調用connectToLeader方法鏈接到Leader。code

b) 調用registerWithLeader方法註冊到Leader,交換各自的sid、zxid和Epoch等信息,Leader以此決定事務同步的方式。server

c) 調用SyncWithLeader跟Leader進行事務數據同步,處理SNAP/DIFF/TRUNC包。blog

這3個方法都定義在父類Learner類中。下面咱們以Follower做爲例子說明註冊到Leader的完整流程。事務

二、connectToLeaderrem

connectToLeader方法功能較簡單,建立Socket鏈接到Leader。該方法定義在Follower的父類Learner中。它加了重試機制,具體的代碼這裏就不給出了。

最多能夠嘗試5次鏈接。鏈接成功後Leader會建立一個LearnerHandler專門處理與該Follower之間的QuorunPacket消息的傳遞。

三、registerWithLeader

Follower鏈接Leader成功以後,立刻調用registerWithLeader方法。

registerWithLeader方法首先發送FOLLOWERINFO包給Leader,告訴Leader本身的身份屬性(Follower的zxid,sid)。而後等待Leader回覆的LEADINFO包,獲取Leader的Epoch和zxid值,並更新Follower的Epoch和zxid值,以Leader信息爲準。

最後給Leader發ACKEPOCH包,告訴Leader此次Follower已經與Leader的zxid同步了。

這裏acceptedEpoch就是Leader的Epoch。

整個resigerWithLeader流程以下圖所示:

 

接下來Follower就要進入syncWithLeader方法來與Leader同步數據了。

四、SyncWithLeader

SyncWithLeader方法同步Leader的事務到Follower,該方法較長,這裏分段介紹其整個過程。

首先讀取同步數據包,主要代碼以下:

QuorumPacket qp = new QuorumPacket();
readPacket(qp);
if (qp.getType() == Leader.SNAP){
  zk.getZKDatabase().deserializeSnapshot(leaderIs);
}else if (qp.getType() == Leader. TRUNC) {     
  boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
      // not able to truncate the log
      LOG.error("Not able to truncate the log " + Long.toHexString(qp.getZxid()));
      System.exit(13);
   }
}else if (qp.getType() == Leader.DIFF) {
   LOG.info("Getting a diff from the leader 0x{}", ong.toHexString(qp.getZxid()));
   snapshotNeeded = false;
}

同步方式分紅3種:

  1. SNAP:快照模式,這種模式下Leader將整個完整數據庫傳給Follower。
  2. TRUNC:截斷模式,這種模式代表Follower的數據比Leader還多,爲了維持一致性須要將Follower多餘的數據刪除。
  3. DIFF:差別模式,說明Follower比Leader的事務少,須要給Follower補足,這時候Leader會將須要補充的事務生成PROPOSAL包和COMMIT包發給Follower執行。

當前面都執行完成後,還有一段代碼處理後續消息(這裏是QuorumPacket類型),好比:PROPOSAL、COMMIT、NEWLEADER等。例如PROPOSAL是指同步期間收到的leader發送的寫請求信息,緩存在packetsNotCommitted裏,等後續處理,這這部分代碼能夠先無論。

這部分的主要代碼是這樣的:

while (self.isRunning()) {
     readPacket(qp);
     switch(qp.getType()) {
        case Leader.PROPOSAL:                  
             packetsNotCommitted.add(pif);
             break;
        case Leader.COMMIT:
        case Leader.COMMITANDACTIVATE:
             pif = packetsNotCommitted.peekFirst();                   
             if (!writeToTxnLog) {
                 zk.processTxn(pif.hdr, pif.rec);
                 packetsNotCommitted.remove();
             } else {
                 packetsCommitted.add(qp.getZxid());
             }
             break;
        case Leader.INFORM:
        case Leader.INFORMANDACTIVATE: 
             if (!writeToTxnLog) {
               // Apply to db directly if we haven't taken the snapshot
                 zk.processTxn(packet.hdr, packet.rec);
             } else {
                 packetsNotCommitted.add(packet);
                 packetsCommitted.add(qp.getZxid());
             }
             break;                
         case Leader.UPTODATE: 
             if (isPreZAB1_0) {
                 zk.takeSnapshot();
                 self.setCurrentEpoch(newEpoch);
              }
              self.setZooKeeperServer(zk);
              self.adminServer.setZooKeeperServer(zk);
              break outerLoop;
        case Leader.NEWLEADER:                   
             if (snapshotNeeded) {
                zk.takeSnapshot();
              }
              self.setCurrentEpoch(newEpoch);
              writeToTxnLog = true;
              isPreZAB1_0 = false;
              writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
              break;
     }
}

而後Leader會發送NEWLEADER包,Follower收到NEWLEADER包後回覆ACK給Leader。

最後Leader發UPTODATE包表示同步完成,Follower這時啓動服務端並跳出本次循環,準備結束整個註冊過程。

五、 Follower主流程

Follower是Learner的子類,Follower的啓動方法就是followLeader。

followLeader的主要代碼片斷以下:

connectToLeader(addr);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
syncWithLeader(newEpochZxid);                
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
      readPacket(qp);
      processPacket(qp);
}

啓動時首先與Leader同步數據,而後啓動FollowerZooKeeperServer,在FollowerZooKeeperServer運行的同時,額外啓動while循環等待Peer的QuorumPacket包,調用processPacket方法處理這些包。

processPacket處理QuorumPeer傳送的QuorumPacket,最主要是處理兩種QuorumPacket:PROPOSAL和COMMIT。固然還有PING、COMMITANDACTIVATE等包類型,爲便於簡化梳理代碼設計思路,這裏就再也不詳述了。

該方法在收到Leader發送過來的QuorumPacket時被調用,主要是響應PROPOSAL和COMMIT兩種類型的消息。

PROPOSAL是Leader將要執行的寫事務命令;COMMIT是提交命令。Follower只有在收到COMMIT消息後纔會讓PROPOSAL命令的內容生效。

同一個寫事務命令會在Leader和多個Follower上都執行一次,保證集羣數據的一致性。

代碼片斷:

case Leader.PROPOSAL:           
      TxnHeader hdr = new TxnHeader();
      Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);     
      lastQueued = hdr.getZxid();
      fzk.logRequest(hdr, txn);
      break;
case Leader.COMMIT:
      fzk.commit(qp.getZxid());
      break;

Follower收到PROPOSAL消息後調用FollowerZooKeeperServer的logRequest方法;收到COMMIT消息後調用FollowerZooKeeperServer的commit方法。

  • PROPOSAL包

Leader發送給集羣中全部follower的寫請求包。

Leader執行寫操做時須要告之集羣中的Learner,讓你們也執行寫操做,保證集羣數據的一致性。PROPOSAL是嚴格按照順序執行的,這也是ZOOKEEPER的核心設計思想之一。

  • COMMIT包

當Leader認爲一個Proposal已被大多數Follower持久化並等待執行後會發送COMMIT包,通知各Follower能夠提交執行該Proposal了,最後調用到FinalRequestProcessor執行寫操做,經過這種機制保證寫操做能被大半數集羣機器執行。

六、 Observer主流程

Observer和Follower功能相似,主要的差異就是不參與選舉。

Observer的入口方法是observerLeader。當QuorumPeer的狀態是OBSERVING時會啓動Observer並調用observerLeader方法。

observerLeader同Follower的followLeader方法相似,首先註冊到Leader,事務同步後進入QuorumPacket包循環處理過程,調用processPacket方法處理QuorumPacket。

processPacket比Follower要簡單許多,最主要是處理INFORM包來執行Leader的寫請求命令。

這裏處理的是INFORM消息,Leader羣發寫事務時,給Follower發的是PROPOSAL並要等待Follower確認;而給Observer發的則是INFORM消息而且不須要Obverver回覆ACK消息來確認。

相關文章
相關標籤/搜索