ZooKeeper服務端在收到客戶端請求以後,根據leader或follower角色不一樣,執行一系列不一樣的操做。ZooKeeper設計者將這些操做抽象成了接口叫processRequest,並提供了該接口的多個實現類。leader或follower將這些實現類串聯在一塊兒構成一個執行序列,leader或follower的執行序列不盡相同。經過這種接口抽象的方式提升了代碼的複用性。session
服務端處理的客戶端消息叫Request,Request消息的處理分散在LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer中。app
ZooKeeperServer收到客戶端的操做請求後生成Request,而後是經過調用鏈的方式來處理Request。調用鏈定義了處理單元的鏈式執行。每一個處理單元繼承了Processor接口,而且可設置nextProcessor。異步
服務端調用鏈都是從firstProcessor開始往下順序執行,每一個調用模塊執行完成以後沿着nextProcessor的方向依次執行下一個調用處理單元。函數
不一樣類型的ZooKeeperServer的調用鏈是不一樣的。主要有三種不一樣的調用鏈方式:oop
A. LeaderZooKeeperServer的調用鏈,它的firstProcessor是LeaderRequestProcessor。this
B. FollowerZooKeeperServer的調用鏈,它的firstProcessor是FollowerRequestProcessor。線程
C. Observer ZooKeeperServer的調用鏈,它的firstProcessor是ObserverRequestProcessor。設計
ZooKeeperServer中定義的調用單元有如下這些類,它們都實現了Processor接口的processRequest方法。日誌
調用單元列表:code
Ø LeaderRequesrProcessor:Leader的第一個processor
Ø FollowerRequesrProcessor: Follower的第一個processor
Ø ObserverRequesrProcessor:Observer的第一個processor
Ø ProposalProcessor:命令提議,羣發Proposal到Follower
Ø CommitProcessor:事務提交,
Ø SyncRequestProcessor:持久化Proposal到磁盤
Ø AckRequestProcessor:Leader給本身ACK消息
Ø SendAckRequestProcessor:Learner回覆ACK給Leader
Ø toBeAppliedProcessor:等待執行
Ø PrepRequestProcessor:Leader使用,定義在LeaderRequestProcessor和ProposalRequestProcessor之間。將寫操做記錄到outstandingChanges,在FinalRequestProcessor中確保按順序執行Request,防止後續讀操做在前一個寫操做還未完成的狀況下就被調用。
Ø FinalRequestProcessor:最後的processor,調用ZkDatabase方法執行最終讀寫操做
下面咱們來具體看看各個Processor的實現邏輯。
一、ProposalProcessor
用於Leader,處理寫操做命令,生成PROPOSAL包通知Follower同步寫操做。
兩段主要代碼:
1) ProposalRequestProcessor()
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
建立SyncProcessor,當生成PROPOSAL包時,首先經過SyncProcessor持久化到磁盤,而後經過AckRequestProcessor回覆確認(ACK包)。
2)processRequest
nextProcessor.processRequest(request); if (request.getHdr() != null) { try { zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); }
這裏的nextProcessor在Leader中的定義是CommitProcessor,等待COMMIT。
而後調用Leader的proposal方法來處理Request。
最後調用SyncProcessor將PROPOSAL持久化,並給本身發ACK。
二、 CommitProcessor
CommitProcessor是一個很是重要的Processor,一句話歸納其做用就是:判斷請求處理是否能夠往下執行?
對於讀請求,通常CommitProcessor直接將請求包往下執行就能夠了;而對於寫請求,CommitProcessor則須要等待leader發送COMMIT請求才能往下執行,那麼leader什麼狀況下會發送COMMIT包呢?
leader在超過半數的follower已經成功寫WAL日誌,收到這些超過半數的follower的ACK包時,纔會給這些follower發送COMMIT消息。
CommitProcessor是異步工做模式,所以設計了Queue來保存和處理Request。
對Leader發送過來的Proposal進行確認,而且確保Leader發送過來的多條寫事務請求按順序依次執行,只有前一條寫事務執行完畢才繼續執行下一條寫事務。
CommitProcessor是處理須要確認的寫事務請求,讀操做不須要確認會直接忽略掉。它的nextProcessor是FinalRequestProcessor,執行具體的寫事務。
定義了3個隊列:
1) LinkedBlockingQueue<Request> queuedRequests ;
請求隊列,全部請求經過processPacket方法首先放入該隊列,等待後續處理。
2) LinkedBlockingQueue<Request> committedRequests ;
能夠commit的請求包,這裏的數據是能夠往下走的。
3) HashMap<Long, LinkedList<Request>> pendingRequests ;
當FollowerZooKeeperServer收到Leader發來的COMMIT包時,會觸發commit方法被執行,此時請求包會放入commitedRequests隊列等待下一步的處理。
是否須要commit
needCommit方法判斷Request是否須要Commit。只有寫操做纔會須要Commit過程,不須要Commit的讀操做直接進入nextProcessor處理。
protected boolean needCommit(Request request) { switch (request.type) { case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.multi: case OpCode.setACL: return true; case OpCode.sync: return matchSyncs; case OpCode.createSession: case OpCode.closeSession: return !request.isLocalSession(); default: return false; } }
請求包處理
有Request到達時,先調用processRequest方法將請求放入queuedRequest隊列。
CommitProcessor有個run方法。run方法是個循環,不斷處理這三個隊列。對於queuedRequest隊列,則將請求包放入peddingRequest隊列。
若是commit方法被觸發,則會將入參的請求包放入committedRequests隊列,等待下一步FinalRequestProcessor的處理。
run的主要邏輯代碼歸納以下:
public void run() { try { int requestsToProcess = 0; boolean commitIsWaiting = false; do { commitIsWaiting = !committedRequests.isEmpty(); requestsToProcess = queuedRequests.size(); // Avoid sync if we have something to do if (requestsToProcess == 0 && !commitIsWaiting){ // Waiting for requests to process synchronized (this) { while (!stopped && requestsToProcess == 0 && !commitIsWaiting) { wait(); commitIsWaiting = !committedRequests.isEmpty(); requestsToProcess = queuedRequests.size(); } } } Request request = null; while (!stopped && requestsToProcess > 0 && (request = queuedRequests.poll()) != null) { requestsToProcess--; if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) { // Add request to pending LinkedList<Request> requests =pendingRequests.get(request.sessionId); if (requests == null) { requests = new LinkedList<Request>(); pendingRequests.put(request.sessionId, requests); } requests.addLast(request); } else { sendToNextProcessor(request); } if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){ commitIsWaiting = true; break; } } // Handle a single committed request if (commitIsWaiting && !stopped){ waitForEmptyPool(); if (stopped){ return; } // Process committed head if ((request = committedRequests.poll()) == null) { throw new IOException("Error: committed head is null"); } LinkedList<Request> sessionQueue = pendingRequests .get(request.sessionId); if (sessionQueue != null) { // If session queue != null, then it is also not empty. Request topPending = sessionQueue.poll(); topPending.setHdr(request.getHdr()); topPending.setTxn(request.getTxn()); topPending.zxid = request.zxid; request = topPending; } sendToNextProcessor(request); waitForEmptyPool(); if (sessionQueue != null) { while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) { sendToNextProcessor(sessionQueue.poll()); } // Remove empty queues if (sessionQueue.isEmpty()) { pendingRequests.remove(request.sessionId); } } } } while (!stoppedMainLoop); } catch (Throwable e) { handleException(this.getName(), e); } LOG.info("CommitProcessor exited loop!"); }
1) 首先等待Request的到來
2) 而後判斷是否須要COMMIT,若是須要則將Request放入pendingRequests等待被commit;不然直接送給FinalRequestProcessor處理。
3) 若是pendingRequests和committedRequests都不爲空,則跳出小循環準備執行committedRequests中的請求。
4) 根據committedRequests中的Request找到該Requests對應的Session,並從該Session的pendingRequests中找出第一個Request執行,這樣保證Request是按照Session的順序執行的,而不會出現後續Request反而先被執行的場景。
5) 最後再按Session中的順序執行讀操做,防止後續讀操做在寫操做以前被執行,從而破壞了事務的順序性。
三、 SyncRequestProcessor
將寫事務請求持久化到磁盤,防止事務丟失。它的nextProcessor分紅3種場景設置:
(1)Leader
這時候nextProcessor設置爲AckRequestProcessor,AckRequestProcessor直接調用Leader的processAck方法。
(2)Follower
這時候nextProcessor設置爲SendAckRequestProcessor,SendAckRequestProcessor向Leader發送Ack確認包。
(3)Observer
這時候nextProcessor設置爲null,表示不用回覆ACK給Leader。
SyncRequestProcessor類的processRequest方法定義以下:
public void processRequest(Request request) { queuedRequests.add(request); }
它的邏輯是將Request放入queueRequests隊列,等待run主線程處理:
run線程主要邏輯:
while (true) { Request si = null; if (toFlush.isEmpty()) { si = queuedRequests.take(); } else { si = queuedRequests.poll(); if (si == null) { flush(toFlush); continue; } } if (si == requestOfDeath) { break; } if (si != null) { if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } } else if (toFlush.isEmpty()) { if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } toFlush.add(si); if (toFlush.size() > 1000) { flush(toFlush); } }
將Request寫到事務日誌文件中,根據條件決定是否重啓新的日誌滾動文件;日誌文件寫入完成後調用nextProcessor處理Request。
四、 AckRequestProcessor
專門給Leader使用的ACK處理單元,當Follower收到Proposal時都是經過SendAckRequestProcessor發送ACK包給Leader,而Leader在發出Proposal命令時,會調用Sync過程將Proposal寫到Leader的磁盤,而後Sync過程調用AckRequestProcessor直接通知Leader收到Leader本身的ACK包,直接調用Leader的processAck方法。
public void processRequest(Request request) { QuorumPeer self = leader.self; if(self != null) leader.processAck(self.getId(), request.zxid, null); else LOG.error("Null QuorumPeer"); }
五、SendAckRequestProcessor
SyncRequestProcessor的nextProcessor是SendAckRequestProcessor。
當Follower持久化寫事務請求到磁盤後,會當即調用SendAckRequestProcessor的processRequest方法給Leader發送ACK消息,等待Leader的COMMIT消息。
SendAckRequestProcessor的代碼示例:
public void processRequest(Request si) { if(si.type != OpCode.sync){ QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null); try { learner.writePacket(qp, false); } catch (IOException e) { } } }
六、 toBeAppliedProcessor
最簡單的一個處理單元,只是維護一個toBeApplied列表,而後每次將列表中一條記錄發送到FinalRequestProcess or處理,同時刪除該記錄。
七、 PrepRequestProcessor
主要目的是將寫操做請求記錄在ZooKeeperServer的outstandingChanges隊列中, outstandingChanges主要用於FinalRequestProcessor的processRequest方法中,用來確保FinalRequestProcessor是按照outstandingChanges的順序來執行Request。
八、FinalRequestProcessor
最終的一個調用鏈處理單元。
該類用於執行最終的命令,經過ZkDatabase相關接口完成操做。
不論是Leader仍是Follower,最後的處理單元必定是FinalRequestProcessor。
主函數是processRequest,真正處理客戶端請求的代碼。
主要代碼可分紅兩部分。
第一部分是寫操做,代碼邏輯片斷以下:
synchronized (zks.outstandingChanges) { rc = zks.processTxn(request); }
該方法主要是調用ZooKeeperServer的processTxn方法來處理。具體邏輯可參考ZooKeeperServer一節。
第二部分是讀操做,直接經過ZKDatabase等接口實現,好比getData和getChildren兩個讀操做的代碼邏輯以下:
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request,getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; } case OpCode.getChildren: { lastOp = "GETC"; GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest); DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath()); List<String> children = zks.getZKDatabase().getChildren( getChildrenRequest.getPath(), null, getChildrenRequest .getWatch() ? cnxn : null); rsp = new GetChildrenResponse(children); break; }