對於請求處理鏈而言,全部請求處理器的父接口爲RequestProcessor。node
RequestProcessor內部類RequestProcessorException,用來表示處理過程當中的出現的異常,而proceequest和shutdown方法則是核心方法,是子類必需要實現的方法,處理的主要邏輯在proceequest中,經過proce***equest方法能夠將請求傳遞到下個處理器。而shutdown表示關閉處理器,其意味着該處理器要關閉和其餘處理器的鏈接。服務器
public interface RequestProcessor { @SuppressWarnings("serial") public static class RequestProcessorException extends Exception { public RequestProcessorException(String msg, Throwable t) { super(msg, t); } } void proce***equest(Request request) throws RequestProcessorException; void shutdown(); }
實現RequestProcessor的processor有不少,PrepRequestProcessor,一般是請求處理鏈的第一個處理器。session
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {}
PrepRequestProcessor繼承了ZooKeeperCriticalThread類並實現了RequestProcessor接口,表示其能夠做爲線程使用。ide
//已提交的請求隊列 LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); //下一個處理器 private final RequestProcessor nextProcessor; // zk服務器 ZooKeeperServer zks;
while (true) { //從隊列獲取請求 Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } //requestOfDeath類型的請求,表明當前處理器已經關閉,再也不處理請求。 if (Request.requestOfDeath == request) { break; } //調用關鍵函數 pRequest(request); }
pRequest會肯定請求類型,並根據請求類型不一樣生成不一樣的請求對象,咱們以建立節點爲例子分析函數
//設置消息頭和事務爲空 request.setHdr(null); request.setTxn(null); try { switch (request.type) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: //建立節點請求 CreateRequest create2Request = new CreateRequest(); //處理請求 pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break; //省略其餘代碼 //給請求的zxid賦值 request.zxid = zks.getZxid(); //交給下一個處理器繼續處理 nextProcessor.proce***equest(request);
pRequest2Txn函數是實際的處理請求的函數,對於建立方法會調用pRequest2TxnCreate函數this
//設置請求頭 request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); switch (type) { case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { pRequest2TxnCreate(type, request, record, deserialize); break; }
pRequest2TxnCreate方法以下:線程
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { if (deserialize) { //反序列化,將ByteBuffer轉化爲Record ByteBufferInputStream.byteBuffer2Record(request.request, record); } int flags; String path; List<ACL> acl; byte[] data; long ttl; if (type == OpCode.createTTL) { CreateTTLRequest createTtlRequest = (CreateTTLRequest)record; flags = createTtlRequest.getFlags(); path = createTtlRequest.getPath(); acl = createTtlRequest.getAcl(); data = createTtlRequest.getData(); ttl = createTtlRequest.getTtl(); } else { //轉換createRequest對象 CreateRequest createRequest = (CreateRequest)record; flags = createRequest.getFlags(); path = createRequest.getPath(); acl = createRequest.getAcl(); data = createRequest.getData(); ttl = -1; } CreateMode createMode = CreateMode.fromFlag(flags); validateCreateRequest(path, createMode, request, ttl); //獲取父節點路徑 String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, acl); //獲取父節點的record ChangeRecord parentRecord = getRecordForPath(parentPath); //檢查ACL列表 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); //是否建立順序節點 if (createMode.isSequential()) { //子路徑後追加一串數字,順序的 path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL; //父節點不能是臨時節點 if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } //新的子節點版本號 int newCversion = parentRecord.stat.getCversion()+1; //新生事務 if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { //是否臨時節點 s.setEphemeralOwner(request.sessionId); } //拷貝 parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); //子節點數量+1 parentRecord.childCount++; //設置新版本號 parentRecord.stat.setCversion(newCversion); //將parentRecord添加至outstandingChanges和outstandingChangesForPath中 addChangeRecord(parentRecord); // 將新生成的ChangeRecord(包含了StatPersisted信息)添加至outstandingChanges和outstandingChangesForPath中 addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); }
addChangeRecord函數將ChangeRecord添加至ZooKeeperServer的outstandingChanges和outstandingChangesForPath中。code
private void addChangeRecord(ChangeRecord c) { synchronized (zks.outstandingChanges) { zks.outstandingChanges.add(c); zks.outstandingChangesForPath.put(c.path, c); } }
outstandingChanges 位於ZooKeeperServer 中,用於存放剛進行更改尚未同步到ZKDatabase中的節點信息。orm
znode節點會因爲用戶的讀寫操做頻繁發生變化,爲了提高數據的訪問效率,ZooKeeper中有一個三層的數據緩衝層用於存放節點數據。對象
outstandingChanges->ZKDatabase->FileSnap+FileTxnLog