Leader經過啓動LeaderZooKeeperServer來接收客戶端請求。node
首先看下它的處理鏈的定義,從源碼看出LeaderZooKeeperServer的處理鏈順序以下:算法
SyncRequestProcessor和AckRequestProcessor是在ProposalRequestProcessor內部被建立的,當Leader要處理PROPOSAL命令時先本身調用SyncRequestProcessor持久化,而後經過AckRequestProcessor直接告訴Leader處理ACK邏輯(不通過QuorumPacket傳遞)。數據庫
LeaderRequestProcessor是leader處理請求的第一個processor,所以他不須要轉發到其餘機器,只須要往下執行就好了。這裏就是簡單的調用nextProcessor的processRequest方法。session
主要處理代碼以下:this
public void processRequest(Request request) throws RequestProcessorException { Request upgradeRequest = null; try { upgradeRequest = lzks.checkUpgradeSession(request); } catch (KeeperException ke) { request.setException(ke); } if (upgradeRequest != null) { nextProcessor.processRequest(upgradeRequest); } nextProcessor.processRequest(request); }
Leader收到Follower的REQUEST請求後調用submitLearnerRequest處理該寫事務。spa
該方法就一條語句,調用PrepRequestProcessor的processRequest方法處理Request請求。方法定義以下:日誌
public void submitLearnerRequest(Request request) { prepRequestProcessor.processRequest(request); }
寫請求是要廣播到整個集羣作數據一致性的,因此涉及到多臺集羣的交互。code
這裏咱們以create事務爲例說明。orm
首先客戶端鏈接到Leader,而後Leader發送proposal消息給集羣中全部Follower,proposal消息帶有本次create事務的數據。server
Follower收到proposal後首先保存到磁盤防止proposal丟失,而後回覆ACK給Leader。
Leader收集到足夠多的ACK後再次發送COMMIT給全部Follower,同時Leader也在本地提交proposal執行,Follower收到COMMIT以後也在本地執行proposal事務。
Leader將執行結果返回給客戶端。
經過以上過程集羣中全部機器都會維護同一個完整的數據庫,保證了數據一致性。
這裏須要注意的一點是:寫操做時Zk會調用Sync過程將寫操做持久化到磁盤。
具體流程以下:
上述就是leader處理寫Request請求的完整過程。
leader收到REQUEST包時,會調用submitLearnerRequest方法;收到ACK包時,會調用processAck方法,leader接收包的部分代碼以下。
while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); ByteBuffer bb; long sessionId; int cxid; int type; switch (qp.getType()) { case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { } syncLimitCheck.updateAck(qp.getZxid()); leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData()); sessionId = bb.getLong(); cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); Request si; if(type == OpCode.sync){ si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); } else { si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); } si.setOwner(this); leader.zk.submitLearnerRequest(si); break; default: LOG.warn("unexpected quorum packet, type: {}", packetToString(qp)); break; } }
proposal
當LeaderZooKeeperServer收到客戶端寫事務請求時,會觸發Leader的proposal方法執行,發送PROPOSAL消息給Follower。同時維護一個outstandingProposals字典表保存PROPOSA消息。
proposal方法的主要代碼以下:
ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.getHdr().serialize(boa, "hdr"); if (request.getTxn() != null) { request.getTxn().serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.warn("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized(this) { p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig){ self.setLastSeenQuorumVerifier(request.qv, true); } lastProposed = p.packet.getZxid(); outstandingProposals.put(lastProposed, p); sendPacket(pp); }
inform
和proposal相似,只不過是將寫事務發給Observer,而且不須要Observer回覆ACK。
public void inform(Proposal proposal) { QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null); sendObserverPacket(qp); }
processAck
Follower持久化PROPOSAL寫事務請求到磁盤後會回覆ACK消息給Leader,Leader經過LearnerHandler接收該包,並觸發Leader的processAck方法。
Leader發送到Follower的PROPOSAL消息,需等待全部Follower回覆Ack消息後判斷是否知足COMMIT條件,若是知足COMMIT條件則發送COMMIT消息給全部Follower,Leader再往下執行。
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { if (!allowedToCommit) return; if ((zxid & 0xffffffffL) == 0) { return; } if (lastCommitted >= zxid) { return; } Proposal p = outstandingProposals.get(zxid); p.addAck(sid); boolean hasCommitted = tryToCommit(p, zxid, followerAddr); if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){ long curZxid = zxid; while (allowedToCommit && hasCommitted && p!=null){ curZxid++; p = outstandingProposals.get(curZxid); if (p !=null) hasCommitted = tryToCommit(p, curZxid, null); } } }
ProcessAck會統計多少Follower回覆了Leader.ACK類型的QuorumPacket包,而後就走到了tryToCommit方法,tryToCommit判斷是否知足COMMIT條件。
下面看看tryToCommit方法執行了哪些操做。
tryToCommit
確保寫操做是按順序被確認的。
嘗試對Proposal按順序進行Commit。Commit過的事務纔是真正有效的事務。
事務確認必須按順序進行,outstandingProposals中記錄了全部等待確認的事務,只要前一條事務還未確認,則以後的事務都禁止確認,以確保事務的按順序進行。
確認過的事務放入toBeApplied隊列中等待下一步處理。
if (outstandingProposals.containsKey(zxid - 1)) return false; if (!p.hasAllQuorums()) { return false; } outstandingProposals.remove(zxid); if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commmit null: " + p); } else if (p.request.getHdr().getType() == OpCode.reconfig) { Long designatedLeader = getDesignatedLeader(p, zxid); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { allowedToCommit = false; } commitAndActivate(zxid, designatedLeader); informAndActivate(p, designatedLeader); } else { commit(zxid); inform(p); } zk.commitProcessor.commit(p.request); if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); } }
commit
通知Follower提交proposal執行。
一般只有當Leader從Follower收到超過半數(默認算法,固然你也能夠實現本身的判斷邏輯,好比改爲超過2/3人數)的ACK纔會發起commit,發送Leader.COMMIT包給Follower。
public void commit(long zxid) { synchronized(this){ lastCommitted = zxid; } QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); sendPacket(qp); }
讀操做不涉及數據和狀態的變動,所以不須要維護集羣數據的一致性,流程相對於寫操做要簡單些。
具體讀操做流程:
1. Request會發送到Leader的firstProcessor處理,這裏是LeaderRequestProcessor
2. LeaderRequestProcessor對讀操做會進入到PrepRequestProcessor
3. PrepRequestProcessor對讀操做不執行事務相關處理,而後處理鏈一樣進入ProposalRequestProcessor
4. ProposalRequestProcessor調用Leader的propose方法,propose方法會直接將讀操做請求送達nextProcessor。
5. nextProcessor到達CommitProcessor。
6. CommitProcessor一樣會直接將讀操做請求送達toBeAppliedProcessor。
7. toBeAppliedProcessor將讀操做請求直接發送到下一步FinalRequestProcessor。
8. FinalRequestProcessor在本機執行最終的create命令,調用ZkDataBase的processTxn方法,通過DataTree完成最終的節點建立。