本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 | 日期 | 備註 |
---|---|---|
1.0 | 2020.5.23 | 文章首發 |
當咱們向zk發出一個數據更新請求時,這個請求會通過怎樣的處理流程呢?zk又是使用了什麼共識算法來保證一致性呢?帶着這些問題,咱們進入今天的正文。java
在分析源碼以前,必須先和你們簡單的科普一下責任鏈模式,由於這和本文的內容密切相關。簡單的說:責任鏈模式將多個對象組成一條指責鏈,而後按照它們在職責鏈的順序一個個地找出到底誰來負責處理。node
那它的好處是什麼呢?仔細想一想,該設計模式像極面向對象版本的if...else if...else
(咱們都知道if...else if...else
屬於面向過程),但因爲面向對象的特性,會比面向過程更容易複用。算法
咱們先從ZooKeeperServer
這個類入手,查看其實現類。咱們須要關心的有(常見的zk服務器角色以下):sql
@Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); setupContainerManager(); }
整理一下順序:數據庫
接下來分兩個分支:apache
@Override public void processRequest(Request request) throws RequestProcessorException { // Check if this is a local session and we are trying to create // an ephemeral node, in which case we upgrade the session Request upgradeRequest = null; try { upgradeRequest = lzks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { LOG.debug("Updating header"); request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.info("Error creating upgrade request " + ke.getMessage()); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { nextProcessor.processRequest(upgradeRequest); } nextProcessor.processRequest(request); }
這段邏輯很清楚。因須要檢查會話是否過時,去建立一個臨時節點。若是失敗那麼就拋出異常。設計模式
該類有1000多行代碼,故此會挑出較爲典型的代碼進行剖析。在此以前,咱們先看註釋:服務器
This request processor is generally at the start of a RequestProcessor
change. It sets up any transactions associated with requests that change the
state of the system. It counts on ZooKeeperServer to update
outstandingRequests, so that it can take into account transactions that are
in the queue to be applied when generating a transaction.
簡單來講,它通常位於請求處理鏈的頭部,它會設置事務型請求(改變系統狀態的請求)。session
對於建立型請求邏輯大體爲:併發
case OpCode.create2: CreateRequest create2Request = new CreateRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break;
跳往pRequest2Txn
。
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); switch (type) { case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { pRequest2TxnCreate(type, request, record, deserialize); break; } //....多餘代碼再也不展現
跳往pRequest2TxnCreate
:
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { if (deserialize) { ByteBufferInputStream.byteBuffer2Record(request.request, record); } int flags; String path; List<ACL> acl; byte[] data; long ttl; if (type == OpCode.createTTL) { CreateTTLRequest createTtlRequest = (CreateTTLRequest)record; flags = createTtlRequest.getFlags(); path = createTtlRequest.getPath(); acl = createTtlRequest.getAcl(); data = createTtlRequest.getData(); ttl = createTtlRequest.getTtl(); } else { CreateRequest createRequest = (CreateRequest)record; flags = createRequest.getFlags(); path = createRequest.getPath(); acl = createRequest.getAcl(); data = createRequest.getData(); ttl = -1; } CreateMode createMode = CreateMode.fromFlag(flags); validateCreateRequest(path, createMode, request, ttl); String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, acl); ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL; if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); }
大體能夠總結下邏輯:
outstandingChanges
隊列事務型請求:
case OpCode.multi: MultiTransactionRecord multiRequest = new MultiTransactionRecord(); try { ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest); } catch(IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; } List<Txn> txns = new ArrayList<Txn>(); //Each op in a multi-op must have the same zxid! long zxid = zks.getNextZxid(); KeeperException ke = null; //Store off current pending change records in case we need to rollback Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest); for(Op op: multiRequest) { Record subrequest = op.toRequestRecord(); int type; Record txn; /* If we've already failed one of the ops, don't bother * trying the rest as we know it's going to fail and it * would be confusing in the logfiles. */ if (ke != null) { type = OpCode.error; txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue()); } /* Prep the request and convert to a Txn */ else { try { pRequest2Txn(op.getType(), zxid, request, subrequest, false); type = request.getHdr().getType(); txn = request.getTxn(); } catch (KeeperException e) { ke = e; type = OpCode.error; txn = new ErrorTxn(e.code().intValue()); if (e.code().intValue() > Code.APIERROR.intValue()) { LOG.info("Got user-level KeeperException when processing {} aborting" + " remaining multi ops. Error Path:{} Error:{}", request.toString(), e.getPath(), e.getMessage()); } request.setException(e); /* Rollback change records from failed multi-op */ rollbackPendingChanges(zxid, pendingChanges); } } //FIXME: I don't want to have to serialize it here and then // immediately deserialize in next processor. But I'm // not sure how else to get the txn stored into our list. ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); txn.serialize(boa, "request") ; ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); txns.add(new Txn(type, bb.array())); } request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type)); request.setTxn(new MultiTxn(txns)); break;
代碼雖然看起來很噁心,可是邏輯卻是挺簡單的:
//All the rest don't need to create a Txn - just verify session case OpCode.sync: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); break;
非事務型請求,校驗一下session就能夠發送至下一個Processor了。
其本質爲事務性請求進行一些預處理,如:建立事務頭、事務體、會話檢查、ACL檢查和版本檢查等等。
對於事務請求會發起Proposal,併發送給CommitProcessor。並且ProposalRequestProcessor還會將事務請求交付給SyncRequestProcessor。
public void processRequest(Request request) throws RequestProcessorException { // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " + // request.type + " id = " + request.sessionId); // request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower * handler adds it to syncHandler. Otherwise, if it is a client of * the leader that issued the sync command, then syncHandler won't * contain the handler. In this case, we add it to syncHandler, and * call processRequest on the next processor. */ if (request instanceof LearnerSyncRequest){ zks.getLeader().processSync((LearnerSyncRequest)request); } else { nextProcessor.processRequest(request); if (request.getHdr() != null) { // We need to sync and get consensus on any transactions try { zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); } } }
接着看propose:
/** * create a proposal and send it out to all the members * * @param request * @return the proposal that is queued to send to all the members */ public Proposal propose(Request request) throws XidRolloverException { /** * Address the rollover issue. All lower 32bits set indicate a new leader * election. Force a re-election instead. See ZOOKEEPER-1277 */ if ((request.zxid & 0xffffffffL) == 0xffffffffL) { String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; shutdown(msg); throw new XidRolloverException(msg); } byte[] data = SerializeUtils.serializeRequest(request); proposalStats.setLastBufferSize(data.length); QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized(this) { p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig){ self.setLastSeenQuorumVerifier(request.qv, true); } if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) { p.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } if (LOG.isDebugEnabled()) { LOG.debug("Proposing:: " + request); } lastProposed = p.packet.getZxid(); outstandingProposals.put(lastProposed, p); sendPacket(pp); } return p; }
此次提交的記錄是一個QuorumPacket
,其實現了Record
接口。指定了type爲PROPOSAL。咱們看一下注釋:
/** * This message type is sent by a leader to propose a mutation. */ public final static int PROPOSAL = 2;
很顯然,這個只有Leader才能夠發起的一種變化型請求。再簡單描述下邏輯:
outstandingProposals
的Map裏顧名思義,事務提交器。只關心事務請求——等待集羣內Proposal投票直到可被提交。有了CommitProcessor,每一個服務器均可以很好的對事務進行順序處理。
該部分的代碼實在簡陋,故不佔篇幅來分析。讀者朋友知道上述信息後,也能夠理解整個請求鏈是怎樣的。
邏輯很是的簡單,將請求記錄到事務日誌中,並嘗試觸發快照。
public void processRequest(Request request) { // request.addRQRec(">sync"); queuedRequests.add(request); } //線程的核心方法,會對queuedRequests這個隊列進行操做 @Override public void run() { try { int logCount = 0; // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time int randRoll = r.nextInt(snapCount/2); while (true) { Request si = null; if (toFlush.isEmpty()) { si = queuedRequests.take(); } else { si = queuedRequests.poll(); if (si == null) { flush(toFlush); continue; } } if (si == requestOfDeath) { break; } if (si != null) { // track the number of records written to the log if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } } else if (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } toFlush.add(si); if (toFlush.size() > 1000) { flush(toFlush); } } } } catch (Throwable t) { handleException(this.getName(), t); } finally{ running = false; } LOG.info("SyncRequestProcessor exited!"); }
該處理器的核心爲一個toBeApplied隊列,專門用來存儲那些已經被CommitProcessor處理過的可提交的Proposal——直到FinalRequestProcessor處理完後,纔會將其移除。
/* * (non-Javadoc) * * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request) */ public void processRequest(Request request) throws RequestProcessorException { next.processRequest(request); // The only requests that should be on toBeApplied are write // requests, for which we will have a hdr. We can't simply use // request.zxid here because that is set on read requests to equal // the zxid of the last write op. if (request.getHdr() != null) { long zxid = request.getHdr().getZxid(); Iterator<Proposal> iter = leader.toBeApplied.iterator(); if (iter.hasNext()) { Proposal p = iter.next(); if (p.request != null && p.request.zxid == zxid) { iter.remove(); return; } } LOG.error("Committed request not found on toBeApplied: " + request); } }
該類核心邏輯約有500多行,爲節約篇幅,在這裏僅描述下邏輯——既然是最後一個處理器,那麼總得回覆請求吧。並負責事務請求的生效——改變內存數據庫的狀態。
先看一下其組裝Processors的代碼:
@Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); }
能夠看到,這裏又兩對兒請求鏈:
那麼請求來的時候,是哪一個Processor來handle呢?這邊能夠大體跟蹤一下:
ZooKeeperServer
來調度,handle 請求logRequest
的入口進來。該類的由Learner
調度進來,handle leader的請求。看到這裏有人就要問了,這明明是個Observer,怎麼從Learner進來的呢?這就得看簽名了:
/** * This class is the superclass of two of the three main actors in a ZK * ensemble: Followers and Observers. Both Followers and Observers share * a good deal of code which is moved into Peer to avoid duplication. */ public class Learner {
爲了不重複代碼,就把一些共同的代碼抽取上來了。
Follower的正常處理器,會判斷是否是事務,是事務就發送給Leader,否則本身處理。
FollowerRequestProcessor.run
@Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this follower has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getFollower().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don't forward local sessions to the leader. if (!request.isLocalSession()) { zks.getFollower().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); }
而提交到CommitProcessor
說的很清楚,這樣就能夠收到回覆。該Processor在前文說過,用於等待集羣內Proposal投票直到可被提交。也是這個時候,該Follower的CommitProcessor已經在等待Proposal投票被提交了。
public void processRequest(Request si) { if(si.type != OpCode.sync){ QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null); try { learner.writePacket(qp, false); } catch (IOException e) { LOG.warn("Closing connection to leader, exception during packet send", e); try { if (!learner.sock.isClosed()) { learner.sock.close(); } } catch (IOException e1) { // Nothing to do, we are shutting things down, so an exception here is irrelevant LOG.debug("Ignoring error closing the connection", e1); } } } }
邏輯很是的簡單,用於反饋ACK
成功,表示自身完成了事務日誌的記錄。
/** * Set up the request processors for an Observer: * firstProcesor->commitProcessor->finalProcessor */ @Override protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
邏輯很清晰(大概是由於3.3.0後加入的代碼吧),正常的請求鏈爲:
若是syncRequestProcessorEnabled
開啓的狀況下(缺省爲開),這意味着Observer也會去記錄事務日誌以及作快照,這會給降低必定的性能,以及更多的內存要求。
而後再看下ObserverRequestProcessor
,簡直和FollowerRequestProcessor
一模一樣,難道他們不知道複用一下代碼嗎???
@Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this Observer has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getObserver().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getObserver().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don't forward local sessions to the leader. if (!request.isLocalSession()) { zks.getObserver().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("ObserverRequestProcessor exited loop!"); }
以上,就是源碼分析部分,基於3.5.7
版本。
上面和你們過了一下源碼,相信各位對ZK請求處理流程有了必定的瞭解。接下來,讓咱們理一理事務請求的過程。從Leader的ProposalRequestProcessor開始,大體會分爲三個階段,即:
主要由ProposalRequestProcessor
來作,參與Proposql的機器(Leader和Follower)都要記錄事務日誌。
每一個事務請求都要超過半數的投票承認(Leader + Follower)。
toBeApplied
隊列中。(見org.apache.zookeeper.server.quorum.Leader.tryToCommit)COMMIT
,而Observer的爲Inform
。這使它們提交該Proposal。(見org.apache.zookeeper.server.quorum.Leader.commit && inform)直到這裏,算是完成了SyncRequestProcessor -> AckRequestProcessor
接下來說CommitProcessor -> ToBeAppliedRequestProcessor ->FinalRequestProcessor
的過程。
committedRequests
這個隊列中,而後一個個發送至ToBeAppliedRequestProcessor裏去。toBeApplied
隊列中移除這個Proposal。PrepRequestProcessor
時添加元素的。若是是小於的話說明當前事務比以前的事務早到達了這裏,故移除掉(中途可能已經失敗了)。接着就是應用到內存數據庫了,該內存數據庫實例會維護一個默認上限爲500的committedLog——存放最近成功的Proposal,便於快速同步。