ZooKeeper系列之(七):領導者調用鏈

Leader經過啓動LeaderZooKeeperServer來接收客戶端請求。node

首先看下它的處理鏈的定義,從源碼看出LeaderZooKeeperServer的處理鏈順序以下:算法

 

  • LeaderRequestProcessor :Leader調用鏈開始
  • PrepRequestProcessor:寫操做預準備,建立Request內容
  • ProposalRequestProcessor:將寫請求封裝成PROPOSAL包廣播到全部的follower
  • CommitProcessor :判斷寫操做是否能夠提交真正執行
  • toBeAppliedProcessor:將COMMIT後的Request移到,等待發送給FinalRequestProcessor。
  • FinalRequestProcessor:真正的執行Zookeeper命令的地方
  • SyncRequestProcessor:將寫操做持久化到磁盤
  • AckRequestProcessor:當proposal持久化到磁盤,向本機發送ACK包

SyncRequestProcessor和AckRequestProcessor是在ProposalRequestProcessor內部被建立的,當Leader要處理PROPOSAL命令時先本身調用SyncRequestProcessor持久化,而後經過AckRequestProcessor直接告訴Leader處理ACK邏輯(不通過QuorumPacket傳遞)。數據庫

一、 LeaderRequestProcessor

LeaderRequestProcessor是leader處理請求的第一個processor,所以他不須要轉發到其餘機器,只須要往下執行就好了。這裏就是簡單的調用nextProcessor的processRequest方法。session

主要處理代碼以下:this

public void processRequest(Request request) throws RequestProcessorException {       
    Request upgradeRequest = null;
    try {
        upgradeRequest = lzks.checkUpgradeSession(request);
    } catch (KeeperException ke) {        
        request.setException(ke);
     }
    if (upgradeRequest != null) {
        nextProcessor.processRequest(upgradeRequest);
     }
     nextProcessor.processRequest(request);
}

二、 submitLearnerRequest

Leader收到Follower的REQUEST請求後調用submitLearnerRequest處理該寫事務。spa

該方法就一條語句,調用PrepRequestProcessor的processRequest方法處理Request請求。方法定義以下:日誌

public void submitLearnerRequest(Request request) {      
     prepRequestProcessor.processRequest(request);
}

三、 寫請求

寫請求是要廣播到整個集羣作數據一致性的,因此涉及到多臺集羣的交互。code

這裏咱們以create事務爲例說明。orm

首先客戶端鏈接到Leader,而後Leader發送proposal消息給集羣中全部Follower,proposal消息帶有本次create事務的數據。server

Follower收到proposal後首先保存到磁盤防止proposal丟失,而後回覆ACK給Leader。

Leader收集到足夠多的ACK後再次發送COMMIT給全部Follower,同時Leader也在本地提交proposal執行,Follower收到COMMIT以後也在本地執行proposal事務。

Leader將執行結果返回給客戶端。

經過以上過程集羣中全部機器都會維護同一個完整的數據庫,保證了數據一致性。

這裏須要注意的一點是:寫操做時Zk會調用Sync過程將寫操做持久化到磁盤。

具體流程以下:

  • 1. 首先,客戶端建立Socket鏈接到Leader,發送create請求給LeaderZooKeeperServer。
  • 2. LeaderZooKeeperServer中firstProcessor被觸發,執行processRequest方法。
  • 3. 調用鏈執行到ProposalRequestProcessor,觸發Leader的proposal方法被執行。
  • 4. proposal方法發送PROPOSAL消息給集羣中全部的Follower。
  • 5. Follower的processPacket方法判斷是PROPOSAL消息,則調用FollowerZooKeeperServer的logRequest方法記錄WAL日誌。
  • 6. Follwer接着的nextProcessor是SyncRequestProcessor,SendAckRequestProcessor類發送ACK消息給Leader。
  • 7. Leader端收到ACK消息,調用Leader的processAck方法處理ACK消息。
  • 8. 調用CommitProcessor處理,判斷是否能夠發送COMMIT給Follower,若是收全了Follower的ACK消息,則發送COMMIT消息給Follower;同時Leader執行到最後一個處理即FinalRequestProcessor,準備結束本次請求,Znode修改完成。
  • 9. Follower收到COMMIT消息,執行FollowerZooKeeperServer的commit方法,最終也執行到FinalRequestProcessor。

上述就是leader處理寫Request請求的完整過程。

leader收到REQUEST包時,會調用submitLearnerRequest方法;收到ACK包時,會調用processAck方法,leader接收包的部分代碼以下。

while (true) {
     qp = new QuorumPacket();
     ia.readRecord(qp, "packet");
     ByteBuffer bb;
     long sessionId;
     int cxid;
     int type;
     switch (qp.getType()) {
         case Leader.ACK:
            if (this.learnerType == LearnerType.OBSERVER) {                
            }
            syncLimitCheck.updateAck(qp.getZxid());
            leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
            break; 
        case Leader.REQUEST:
            bb = ByteBuffer.wrap(qp.getData());
            sessionId = bb.getLong();
            cxid = bb.getInt();
            type = bb.getInt();
            bb = bb.slice();
            Request si;
            if(type == OpCode.sync){
                si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
            } else {
                si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
            }
          si.setOwner(this);
            leader.zk.submitLearnerRequest(si);
            break;
       default:
            LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
            break;
     }
 }

proposal

當LeaderZooKeeperServer收到客戶端寫事務請求時,會觸發Leader的proposal方法執行,發送PROPOSAL消息給Follower。同時維護一個outstandingProposals字典表保存PROPOSA消息。

proposal方法的主要代碼以下:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
     request.getHdr().serialize(boa, "hdr");
     if (request.getTxn() != null) {
         request.getTxn().serialize(boa, "txn");
     }
     baos.close();
} catch (IOException e) {
     LOG.warn("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
               baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;                
synchronized(this) {
     p.addQuorumVerifier(self.getQuorumVerifier());
    if (request.getHdr().getType() == OpCode.reconfig){
         self.setLastSeenQuorumVerifier(request.qv, true);                       
    }
    lastProposed = p.packet.getZxid();
    outstandingProposals.put(lastProposed, p);
    sendPacket(pp);
}

inform

和proposal相似,只不過是將寫事務發給Observer,而且不須要Observer回覆ACK。

public void inform(Proposal proposal) {
    QuorumPacket qp = new QuorumPacket(Leader.INFORM, 
proposal.request.zxid, proposal.packet.getData(), null);
    sendObserverPacket(qp);
}

processAck

Follower持久化PROPOSAL寫事務請求到磁盤後會回覆ACK消息給Leader,Leader經過LearnerHandler接收該包,並觸發Leader的processAck方法。

Leader發送到Follower的PROPOSAL消息,需等待全部Follower回覆Ack消息後判斷是否知足COMMIT條件,若是知足COMMIT條件則發送COMMIT消息給全部Follower,Leader再往下執行。

synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        
   if (!allowedToCommit) return;        
   if ((zxid & 0xffffffffL) == 0) {           
       return;
   }   
   if (lastCommitted >= zxid) {
      return;
   }
   Proposal p = outstandingProposals.get(zxid);  
   p.addAck(sid);   
   boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
   if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){
     long curZxid = zxid;
     while (allowedToCommit && hasCommitted && p!=null){
         curZxid++;
         p = outstandingProposals.get(curZxid);
         if (p !=null) 
             hasCommitted = tryToCommit(p, curZxid, null);             
     }
   }
}

ProcessAck會統計多少Follower回覆了Leader.ACK類型的QuorumPacket包,而後就走到了tryToCommit方法,tryToCommit判斷是否知足COMMIT條件。

下面看看tryToCommit方法執行了哪些操做。

tryToCommit

確保寫操做是按順序被確認的。

嘗試對Proposal按順序進行Commit。Commit過的事務纔是真正有效的事務。

事務確認必須按順序進行,outstandingProposals中記錄了全部等待確認的事務,只要前一條事務還未確認,則以後的事務都禁止確認,以確保事務的按順序進行。

確認過的事務放入toBeApplied隊列中等待下一步處理。

if (outstandingProposals.containsKey(zxid - 1)) 
     return false;
if (!p.hasAllQuorums()) {
     return false;                 
}
outstandingProposals.remove(zxid);
if (p.request != null) {
     toBeApplied.add(p);
}
if (p.request == null) {
     LOG.warn("Going to commmit null: " + p);
} else if (p.request.getHdr().getType() == OpCode.reconfig) {                                   
     Long designatedLeader = getDesignatedLeader(p, zxid);
     QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
     self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
     if (designatedLeader != self.getId()) {
          allowedToCommit = false;
     }
     commitAndActivate(zxid, designatedLeader);
     informAndActivate(p, designatedLeader);
} else {
     commit(zxid);
     inform(p);
}
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
     for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
          sendSync(r);
     }               
}

commit

通知Follower提交proposal執行。

一般只有當Leader從Follower收到超過半數(默認算法,固然你也能夠實現本身的判斷邏輯,好比改爲超過2/3人數)的ACK纔會發起commit,發送Leader.COMMIT包給Follower。

public void commit(long zxid) {
     synchronized(this){
         lastCommitted = zxid;
     }
     QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
     sendPacket(qp);
}

四、 讀請求

讀操做不涉及數據和狀態的變動,所以不須要維護集羣數據的一致性,流程相對於寫操做要簡單些。

具體讀操做流程:

1. Request會發送到Leader的firstProcessor處理,這裏是LeaderRequestProcessor

2. LeaderRequestProcessor對讀操做會進入到PrepRequestProcessor

3. PrepRequestProcessor對讀操做不執行事務相關處理,而後處理鏈一樣進入ProposalRequestProcessor

4. ProposalRequestProcessor調用Leader的propose方法,propose方法會直接將讀操做請求送達nextProcessor。

5. nextProcessor到達CommitProcessor。

6. CommitProcessor一樣會直接將讀操做請求送達toBeAppliedProcessor。

7. toBeAppliedProcessor將讀操做請求直接發送到下一步FinalRequestProcessor。

8. FinalRequestProcessor在本機執行最終的create命令,調用ZkDataBase的processTxn方法,通過DataTree完成最終的節點建立。

相關文章
相關標籤/搜索