FinalRequestProcessor是請求處理鏈中最後的一個處理器。數據庫
public class FinalRequestProcessor implements RequestProcessor { ZooKeeperServer zks; }
FinalRequestProcessor只實現了RequestProcessor接口,須要實現process Request方法和shutdown方法。服務器
核心屬性爲zks,表示Zookeeper服務器,能夠經過zks訪問到Zookeeper內存數據庫。session
咱們看一下核心方法process Request代碼:less
synchronized (zks.outstandingChanges) { // Need to process local session requests // 當前節點,處理請求,若爲事務性請求,則提交到ZooKeeper內存數據庫中。 // 對於processTxn函數而言,其最終會調用DataTree的processTxn rc = zks.processTxn(request); // request.hdr is set for write requests, which are the only ones // that add to outstandingChanges. //只有寫請求才會有消息頭 if (request.getHdr() != null) { TxnHeader hdr = request.getHdr(); Record txn = request.getTxn(); long zxid = hdr.getZxid(); //當outstandingChanges不爲空且其首元素的zxid小於等於請求的zxid時, // 就會一直從outstandingChanges中取出首元素,而且對outstandingChangesForPath作相應的操做 while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) { ChangeRecord cr = zks.outstandingChanges.remove(); if (cr.zxid < zxid) { LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid); } if (zks.outstandingChangesForPath.get(cr.path) == cr) { zks.outstandingChangesForPath.remove(cr.path); } } } // do not add non quorum packets to the queue. //判斷是否爲事務性請求則是經過調用isQuorum函數 //只將quorum包(事務性請求)添加進隊列 //addCommittedProposal函數將請求添加至ZKDatabase的committedLog結構中 if (request.isQuorum()) { zks.getZKDatabase().addCommittedProposal(request); } }
根據請求的建立時間來更新Zookeeper服務器的延遲,updateLatency函數中會記錄最大延遲、最小延遲、總的延遲和延遲次數。
而後更新響應中的狀態,如請求建立到響應該請求總共花費的時間、最後的操做類型等。而後設置響應後返回ide
case OpCode.ping: { //更新延遲 zks.serverStats().updateLatency(request.createTime); lastOp = "PING"; // 更新響應的狀態 cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, request.createTime, Time.currentElapsedTime()); // 設置響應 cnxn.sendResponse(new ReplyHeader(-2, zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response"); return; }
其餘請求與此相似,
最後會根據其餘請求再次更新服務器的延遲,設置響應的狀態等函數
// 獲取最後處理的zxid long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); // 響應頭 ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); // 更新服務器延遲 zks.serverStats().updateLatency(request.createTime); // 更新狀態 cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());
最後使用sendResponse函數將響應發送給請求方。code
try { //返回相應 cnxn.sendResponse(hdr, rsp, "response"); if (request.type == OpCode.closeSession) { //關閉會話 cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG",e); }