Follower使用的ZooKeeperServer的子類FollowerZooKeeperServer,同標準的ZooKeeperServer很類似,除了更換了調用鏈。node
首先看下它的調用鏈的定義:緩存
FollowerZooKeeperServer的調用鏈的初始化代碼以下:session
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); }
從源碼看出LeaderZooKeeperServer的調用鏈順序以下:異步
Follower的調用鏈第一個處理單元,FollowerZooKeeper的firstProcessor指向。FollowerRequestProcessor是異步方式處理請求,因此用本地隊列緩存一下。FollowerRequestProcessor首先將請求包轉發給leader處理,而後等待leader處理以後的返回結果。oop
它的processRequest方法只是將請求放入Queue,真正處理是run方法來執行的。run方法循環不斷的從Queue取出下一條請求,而後交給nextProcessor處理。this
public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (request == Request.requestOfDeath) { break; } nextProcessor.processRequest(request); switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getFollower().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don't forward local sessions to the leader. if (!request.isLocalSession()) { zks.getFollower().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); }
從FollowerZooKeeperServer的調用鏈設置方法中能夠看到,這個nextProcessor是CommitProcessor。spa
要注意的是若是請求是寫請求類(好比建立、設置、修改znode數據等),會額外增長一個動做,就是調用Follower類的的request方法(即父類Learner的request方法)處理。request方法會發送REQUEST消息給Leader。Leader收到REQUEST後觸發LeaderZooKeeperServer的submitLearnerRequestrequest方法執行。日誌
若是不是寫請求類,就不須要調用Follower的request方法。code
當客戶端鏈接的是FollowerZooKeeperServer時,寫事務請求首先須要被Follower轉換成REQUEST消息發給Leader,Leader收到REQUEST後從調用鏈的PrepRequestProcessor開始處理,後續流程就和LeaderZooKeeperServer同樣了。blog
全部Follower自己是不直接處理寫請求的!
讀操做不涉及數據和狀態的變動,所以不須要維護集羣數據的一致性,流程相對於寫操做要簡單些。
具體讀操做流程:
1. Request會發送到Follower的firstProcessor處理,這裏是FollowerRequestProcessor
2. FollowerRequestProcessor首先經過nextProcessor處理該請求,這裏nextProcessor是CommitProcessor,這裏CommitProcessor對讀操做不作處理直接將命令發送到下個處理環節(FinalRequestProcessor直接執行)。
3. FinalRequestProcessor執行讀操做請求,返回結果給客戶端,本次客戶端命令結束。
Follower接收到Leader發送的PROPOSAL包時,會觸發logRequest方法,記錄到WAL日誌。
logRequest中主要代碼片斷以下:
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); if ((request.zxid & 0xffffffffL) != 0) { pendingTxns.add(request); } syncProcessor.processRequest(request);
首先將PROPOSAL攜帶的事務添加到pendingTxns隊列等待執行(等待COMMIT消息),而後經過syncProcessor將PROPOSAL持久化到磁盤,接着調用syncProcessor的nextProcessor處理單元,這裏的nextProcessor是SendAckRequestProcessor。
SendAckRequestProcessor的processRequest方法回覆一條Leader.ACK類型的QuorumPacket包給Leader。Leader經過LearnerHandler收到ACK包並調用Leader的processAck方法處理。當Leader收到足夠的ACK後會發送COMMIT消息給Follower,此時Follower執行pendingTxns中等待被執行的proposal。