ZooKeeper系列之(六):請求調用鏈概述

ZooKeeper服務端在收到客戶端請求以後,根據leader或follower角色不一樣,執行一系列不一樣的操做。ZooKeeper設計者將這些操做抽象成了接口叫processRequest,並提供了該接口的多個實現類。leader或follower將這些實現類串聯在一塊兒構成一個執行序列,leader或follower的執行序列不盡相同。經過這種接口抽象的方式提升了代碼的複用性。session

服務端處理的客戶端消息叫Request,Request消息的處理分散在LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer中。app

ZooKeeperServer收到客戶端的操做請求後生成Request,而後是經過調用鏈的方式來處理Request。調用鏈定義了處理單元的鏈式執行。每一個處理單元繼承了Processor接口,而且可設置nextProcessor。異步

服務端調用鏈都是從firstProcessor開始往下順序執行,每一個調用模塊執行完成以後沿着nextProcessor的方向依次執行下一個調用處理單元。函數

不一樣類型的ZooKeeperServer的調用鏈是不一樣的。主要有三種不一樣的調用鏈方式:oop

A. LeaderZooKeeperServer的調用鏈,它的firstProcessor是LeaderRequestProcessor。this

B. FollowerZooKeeperServer的調用鏈,它的firstProcessor是FollowerRequestProcessor。線程

C. Observer ZooKeeperServer的調用鏈,它的firstProcessor是ObserverRequestProcessor。設計

ZooKeeperServer中定義的調用單元有如下這些類,它們都實現了Processor接口的processRequest方法。日誌

調用單元列表:code

Ø LeaderRequesrProcessor:Leader的第一個processor

Ø FollowerRequesrProcessor: Follower的第一個processor

Ø ObserverRequesrProcessor:Observer的第一個processor

Ø ProposalProcessor:命令提議,羣發Proposal到Follower

Ø CommitProcessor:事務提交,

Ø SyncRequestProcessor:持久化Proposal到磁盤

Ø AckRequestProcessor:Leader給本身ACK消息

Ø SendAckRequestProcessor:Learner回覆ACK給Leader

Ø toBeAppliedProcessor:等待執行

Ø PrepRequestProcessor:Leader使用,定義在LeaderRequestProcessor和ProposalRequestProcessor之間。將寫操做記錄到outstandingChanges,在FinalRequestProcessor中確保按順序執行Request,防止後續讀操做在前一個寫操做還未完成的狀況下就被調用。

Ø FinalRequestProcessor:最後的processor,調用ZkDatabase方法執行最終讀寫操做

下面咱們來具體看看各個Processor的實現邏輯。

一、ProposalProcessor

用於Leader,處理寫操做命令,生成PROPOSAL包通知Follower同步寫操做。

兩段主要代碼:

1) ProposalRequestProcessor()

AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);

建立SyncProcessor,當生成PROPOSAL包時,首先經過SyncProcessor持久化到磁盤,而後經過AckRequestProcessor回覆確認(ACK包)。

2)processRequest

nextProcessor.processRequest(request);
if (request.getHdr() != null) {
      try {
          zks.getLeader().propose(request);
      } catch (XidRolloverException e) {
          throw new RequestProcessorException(e.getMessage(), e);
      }
      syncProcessor.processRequest(request);
}

這裏的nextProcessor在Leader中的定義是CommitProcessor,等待COMMIT。

而後調用Leader的proposal方法來處理Request。

最後調用SyncProcessor將PROPOSAL持久化,並給本身發ACK。

二、 CommitProcessor

CommitProcessor是一個很是重要的Processor,一句話歸納其做用就是:判斷請求處理是否能夠往下執行?

對於讀請求,通常CommitProcessor直接將請求包往下執行就能夠了;而對於寫請求,CommitProcessor則須要等待leader發送COMMIT請求才能往下執行,那麼leader什麼狀況下會發送COMMIT包呢?

leader在超過半數的follower已經成功寫WAL日誌,收到這些超過半數的follower的ACK包時,纔會給這些follower發送COMMIT消息。

CommitProcessor是異步工做模式,所以設計了Queue來保存和處理Request。

對Leader發送過來的Proposal進行確認,而且確保Leader發送過來的多條寫事務請求按順序依次執行,只有前一條寫事務執行完畢才繼續執行下一條寫事務。

CommitProcessor是處理須要確認的寫事務請求,讀操做不須要確認會直接忽略掉。它的nextProcessor是FinalRequestProcessor,執行具體的寫事務。

定義了3個隊列:

1) LinkedBlockingQueue<Request> queuedRequests ;

請求隊列,全部請求經過processPacket方法首先放入該隊列,等待後續處理。

2) LinkedBlockingQueue<Request> committedRequests ;

能夠commit的請求包,這裏的數據是能夠往下走的。

3) HashMap<Long, LinkedList<Request>> pendingRequests ;

當FollowerZooKeeperServer收到Leader發來的COMMIT包時,會觸發commit方法被執行,此時請求包會放入commitedRequests隊列等待下一步的處理。

是否須要commit

needCommit方法判斷Request是否須要Commit。只有寫操做纔會須要Commit過程,不須要Commit的讀操做直接進入nextProcessor處理。

protected boolean needCommit(Request request) {
     switch (request.type) {
         case OpCode.create:
         case OpCode.create2:
         case OpCode.createTTL:
         case OpCode.createContainer:
         case OpCode.delete:
         case OpCode.deleteContainer:
         case OpCode.setData:
         case OpCode.reconfig:
         case OpCode.multi:
         case OpCode.setACL:
             return true;
         case OpCode.sync:
             return matchSyncs;    
         case OpCode.createSession:
         case OpCode.closeSession:
             return !request.isLocalSession();
         default:
             return false;
     }
 }

請求包處理

有Request到達時,先調用processRequest方法將請求放入queuedRequest隊列。

CommitProcessor有個run方法。run方法是個循環,不斷處理這三個隊列。對於queuedRequest隊列,則將請求包放入peddingRequest隊列。

若是commit方法被觸發,則會將入參的請求包放入committedRequests隊列,等待下一步FinalRequestProcessor的處理。

run的主要邏輯代碼歸納以下:

public void run() {
        try {         
            int requestsToProcess = 0;
            boolean commitIsWaiting = false;
	do {              
                commitIsWaiting = !committedRequests.isEmpty();
                requestsToProcess =  queuedRequests.size();
                // Avoid sync if we have something to do
                if (requestsToProcess == 0 && !commitIsWaiting){
                    // Waiting for requests to process
                    synchronized (this) {
                        while (!stopped && requestsToProcess == 0
                                && !commitIsWaiting) {
                            wait();
                            commitIsWaiting = !committedRequests.isEmpty();
                            requestsToProcess = queuedRequests.size();
                        }
                    }
                }
               
                Request request = null;
                while (!stopped && requestsToProcess > 0
                        && (request = queuedRequests.poll()) != null) {
                    requestsToProcess--;
                    if (needCommit(request)
                            || pendingRequests.containsKey(request.sessionId)) {
                        // Add request to pending
                        LinkedList<Request> requests =pendingRequests.get(request.sessionId);
                        if (requests == null) {
                            requests = new LinkedList<Request>();
                            pendingRequests.put(request.sessionId, requests);
                        }
                        requests.addLast(request);
                    }
                    else {
                        sendToNextProcessor(request);
                    }                   
                    if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){                        
                        commitIsWaiting = true;
                        break;
                    }
                }
                // Handle a single committed request
                if (commitIsWaiting && !stopped){
                    waitForEmptyPool();
                    if (stopped){
                        return;
                    }
                    // Process committed head
                    if ((request = committedRequests.poll()) == null) {
                        throw new IOException("Error: committed head is null");
                    }                 
                    LinkedList<Request> sessionQueue = pendingRequests
                            .get(request.sessionId);
                    if (sessionQueue != null) {
                        // If session queue != null, then it is also not empty.
                        Request topPending = sessionQueue.poll(); 
                        topPending.setHdr(request.getHdr());
                        topPending.setTxn(request.getTxn());
                        topPending.zxid = request.zxid;
                        request = topPending;
                    }

                    sendToNextProcessor(request);
                    waitForEmptyPool();
                    if (sessionQueue != null) {
                        while (!stopped && !sessionQueue.isEmpty()
                                && !needCommit(sessionQueue.peek())) {
                            sendToNextProcessor(sessionQueue.poll());
                        }
                        // Remove empty queues
                        if (sessionQueue.isEmpty()) {
                            pendingRequests.remove(request.sessionId);
                        }
                    }
                }
            } while (!stoppedMainLoop);
        } catch (Throwable e) {
            handleException(this.getName(), e);
        }
        LOG.info("CommitProcessor exited loop!");
    }

1) 首先等待Request的到來

2) 而後判斷是否須要COMMIT,若是須要則將Request放入pendingRequests等待被commit;不然直接送給FinalRequestProcessor處理。

3) 若是pendingRequests和committedRequests都不爲空,則跳出小循環準備執行committedRequests中的請求。

4) 根據committedRequests中的Request找到該Requests對應的Session,並從該Session的pendingRequests中找出第一個Request執行,這樣保證Request是按照Session的順序執行的,而不會出現後續Request反而先被執行的場景。

5) 最後再按Session中的順序執行讀操做,防止後續讀操做在寫操做以前被執行,從而破壞了事務的順序性。

三、 SyncRequestProcessor

將寫事務請求持久化到磁盤,防止事務丟失。它的nextProcessor分紅3種場景設置:

(1)Leader

這時候nextProcessor設置爲AckRequestProcessor,AckRequestProcessor直接調用Leader的processAck方法。

(2)Follower

這時候nextProcessor設置爲SendAckRequestProcessor,SendAckRequestProcessor向Leader發送Ack確認包。

(3)Observer

這時候nextProcessor設置爲null,表示不用回覆ACK給Leader。

SyncRequestProcessor類的processRequest方法定義以下:

public void processRequest(Request request) {
     queuedRequests.add(request);
}

它的邏輯是將Request放入queueRequests隊列,等待run主線程處理:

run線程主要邏輯:

while (true) {
   Request si = null;
   if (toFlush.isEmpty()) {
        si = queuedRequests.take();
   } else {
       si = queuedRequests.poll();
       if (si == null) {
             flush(toFlush);
             continue;
       }
   }
   if (si == requestOfDeath) {
        break;
   }
   if (si != null) {
      if (zks.getZKDatabase().append(si)) {
          logCount++;
          if (logCount > (snapCount / 2 + randRoll)) {
             randRoll = r.nextInt(snapCount/2);
             // roll the log
             zks.getZKDatabase().rollLog();
             // take a snapshot
             if (snapInProcess != null && snapInProcess.isAlive()) {
                LOG.warn("Too busy to snap, skipping");
             } else {
                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                   public void run() {
                      try {
                           zks.takeSnapshot();
                      } catch(Exception e) {
                           LOG.warn("Unexpected exception", e);
                      }
                   }
            };
            snapInProcess.start();
         }
         logCount = 0;
      }
  } else if (toFlush.isEmpty()) {
        if (nextProcessor != null) {
            nextProcessor.processRequest(si);
            if (nextProcessor instanceof Flushable) {
                 ((Flushable)nextProcessor).flush();
            }
        }
        continue;
  }
  toFlush.add(si);
  if (toFlush.size() > 1000) {
       flush(toFlush);
  }
}

將Request寫到事務日誌文件中,根據條件決定是否重啓新的日誌滾動文件;日誌文件寫入完成後調用nextProcessor處理Request。

四、 AckRequestProcessor

專門給Leader使用的ACK處理單元,當Follower收到Proposal時都是經過SendAckRequestProcessor發送ACK包給Leader,而Leader在發出Proposal命令時,會調用Sync過程將Proposal寫到Leader的磁盤,而後Sync過程調用AckRequestProcessor直接通知Leader收到Leader本身的ACK包,直接調用Leader的processAck方法。

public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        if(self != null)
            leader.processAck(self.getId(), request.zxid, null);
        else
            LOG.error("Null QuorumPeer");
}

五、SendAckRequestProcessor

SyncRequestProcessor的nextProcessor是SendAckRequestProcessor。

當Follower持久化寫事務請求到磁盤後,會當即調用SendAckRequestProcessor的processRequest方法給Leader發送ACK消息,等待Leader的COMMIT消息。

SendAckRequestProcessor的代碼示例:

public void processRequest(Request si) {
   if(si.type != OpCode.sync){
       QuorumPacket qp = new QuorumPacket(Leader.ACK, 
                       si.getHdr().getZxid(), null,   null);
       try {
           learner.writePacket(qp, false);
       } catch (IOException e) {
       }
   }
}

六、 toBeAppliedProcessor

最簡單的一個處理單元,只是維護一個toBeApplied列表,而後每次將列表中一條記錄發送到FinalRequestProcess or處理,同時刪除該記錄。

七、 PrepRequestProcessor

主要目的是將寫操做請求記錄在ZooKeeperServer的outstandingChanges隊列中, outstandingChanges主要用於FinalRequestProcessor的processRequest方法中,用來確保FinalRequestProcessor是按照outstandingChanges的順序來執行Request。

八、FinalRequestProcessor

最終的一個調用鏈處理單元。

該類用於執行最終的命令,經過ZkDatabase相關接口完成操做。

不論是Leader仍是Follower,最後的處理單元必定是FinalRequestProcessor。

主函數是processRequest,真正處理客戶端請求的代碼。

主要代碼可分紅兩部分。

第一部分是寫操做,代碼邏輯片斷以下:

synchronized (zks.outstandingChanges) {
      rc = zks.processTxn(request);
}

該方法主要是調用ZooKeeperServer的processTxn方法來處理。具體邏輯可參考ZooKeeperServer一節。

第二部分是讀操做,直接經過ZKDatabase等接口實現,好比getData和getChildren兩個讀操做的代碼邏輯以下:

case OpCode.getData: {
     lastOp = "GETD";
     GetDataRequest getDataRequest = new GetDataRequest();
     ByteBufferInputStream.byteBuffer2Record(request.request,getDataRequest);
     DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());    
     Stat stat = new Stat();
     byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                   getDataRequest.getWatch() ? cnxn : null);
     rsp = new GetDataResponse(b, stat);
     break;
}
case OpCode.getChildren: {
     lastOp = "GETC";
     GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
     ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
     DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());     
     List<String> children = zks.getZKDatabase().getChildren(
                        getChildrenRequest.getPath(), null, getChildrenRequest
                                .getWatch() ? cnxn : null);
     rsp = new GetChildrenResponse(children);
     break;
}
相關文章
相關標籤/搜索