咱們知道,zk就是一個個處理鏈組成的。node
可是,這些處理鏈是在什麼建立的呢?apache
ZooKeeper 中有三種角色的服務節點存在: Leader, Follower, Observer . 設計模式
而每一個服務節點的承擔的任務是不同的,因此處理任務的邏輯是不同的。而在ZK中,則是巧妙的經過責任鏈模式將各自節點的處理能力創建起來的。服務器
而這個建立時機是在何時呢?服務一啓動的時候?仍是每一個請求進來的時候?session
其實ZK服務節點的處理鏈路是在角色被肯定下來了以後,才建立的!咱們一塊兒看一下過程!架構
QuorumPeer 線程會一直循環檢查當前節點的狀態,噹噹前節點的決定確認以後,其處理鏈天然就定了!app
因此,咱們須要先來看一下他是如何處理當前角色的先!框架
// org.apache.zookeeper.server.quorum.QuorumPeer#run @Override public void run() { updateThreadName(); // 註冊jmx 度量信息 LOG.debug("Starting quorum peer"); try { jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for (QuorumServer s : getView().values()) { ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { RemotePeerBean rBean = new RemotePeerBean(this, s); try { MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); jmxRemotePeerBean.put(s.id, rBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxQuorumBean = null; } try { /* * Main loop */ // 死循環,一直檢測當前節點的狀態,當確認角色後,進行正式工做循環 // 當狀態再次變動時,會拋出相應異常,從而進行從新選舉操做 while (running) { switch (getPeerState()) { // LOOKING 將觸發選舉,很關鍵,但咱們後續再來解析這塊東西 case LOOKING: LOG.info("LOOKING"); ServerMetrics.getMetrics().LOOKING_COUNT.add(1); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); // Instead of starting roZk immediately, wait some grace // period before we decide we're partitioned. // // Thread is used here because otherwise it would require // changes in each of election strategy classes which is // unnecessary code coupling. Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; // OBSERVING 節點只會接收來自 Leader 的數據 case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { observer.shutdown(); setObserver(null); updateServerState(); // Add delay jitter before we switch to LOOKING // state to reduce the load of ObserverMaster if (isRunning()) { Observer.waitForObserverElectionDelay(); } } break; // FOLLOWING 節點將參與數據的決策 case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; // LEADING 負責數據的寫入,以及維護各Follower的數據同步等 case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } } } finally { // 最終退出,多是異常退出,也多是主動關閉了(running=false),總之作好記錄、清理工做 LOG.warn("QuorumPeer main thread exited"); MBeanRegistry instance = MBeanRegistry.getInstance(); instance.unregister(jmxQuorumBean); instance.unregister(jmxLocalPeerBean); for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) { instance.unregister(remotePeerBean); } jmxQuorumBean = null; jmxLocalPeerBean = null; jmxRemotePeerBean = null; } }
整體來講就是,始終會有一個後臺線程,一直去檢查當前節點的狀態,而後根據狀態去分派任務。less
當任務分派下去以後,沒有變化就不要返回了,即一直處理本身任務便可。只要不發生異常,它會一直保持下去。ide
總共有四個狀態: LOOKING, OBSERVING, FOLLOWING, LEADING, 也字如其義,作各自的工做。
// 以下: 1. 建立 Follower 實例; 2. 調用 followLeader() 執行業務邏輯; case FOLLOWING: try { LOG.info("FOLLOWING"); // 先建立 Follower 實例 setFollower(makeFollower(logFactory)); // 而後調用 followLeader() 進行處理 // 固然,它通常會進行循環處理任務,而不是處理完一次後就退出 follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; // org.apache.zookeeper.server.quorum.QuorumPeer#makeFollower protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { // FollowerZooKeeperServer 是個關鍵的實例 // 將上下文傳遞到 Follower 中,備用 return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb)); } // org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#FollowerZooKeeperServer FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); // 使用 ConcurrentLinkedQueue 隊列進行消息傳遞 this.pendingSyncs = new ConcurrentLinkedQueue<Request>(); } // 建立好 follower 實例後,就調用其主要方法 followLeader() // 這裏承擔了全部的 follower 的動做指令 /** * the main method called by the follower to follow the leader * * @throws InterruptedException */ void followLeader() throws InterruptedException { self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken); LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT); self.start_fle = 0; self.end_fle = 0; fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); long connectionTime = 0; boolean completedSync = false; // 先作一些準備工做 try { self.setZabState(QuorumPeer.ZabState.DISCOVERY); QuorumServer leaderServer = findLeader(); try { connectToLeader(leaderServer.addr, leaderServer.hostname); connectionTime = System.currentTimeMillis(); long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); if (self.isReconfigStateChange()) { throw new Exception("learned about role change"); } //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } long startTime = Time.currentElapsedTime(); try { self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId()); self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); // 與 Leader 創建同步,即發起 Socket 鏈接 syncWithLeader(newEpochZxid); self.setZabState(QuorumPeer.ZabState.BROADCAST); completedSync = true; } finally { long syncTime = Time.currentElapsedTime() - startTime; ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime); } // 若是設置了 observeMasterPort() 則開啓相應處理線程 if (self.getObserverMasterPort() > 0) { LOG.info("Starting ObserverMaster"); om = new ObserverMaster(self, fzk, self.getObserverMasterPort()); om.start(); } else { om = null; } // create a reusable packet to reduce gc impact QuorumPacket qp = new QuorumPacket(); // 會在此處一直循環等待 leader 節點的信息,進行任務處理,直到發生異常退出 while (this.isRunning()) { readPacket(qp); processPacket(qp); } } catch (Exception e) { LOG.warn("Exception when following the leader", e); closeSocket(); // clear pending revalidations pendingRevalidations.clear(); } } finally { if (om != null) { om.stop(); } zk.unregisterJMX(this); if (connectionTime != 0) { long connectionDuration = System.currentTimeMillis() - connectionTime; LOG.info( "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}", leaderAddr, connectionDuration, completedSync); messageTracker.dumpToLog(leaderAddr.toString()); } } } // org.apache.zookeeper.server.quorum.Learner#syncWithLeader /** * Finally, synchronize our history with the Leader (if Follower) * or the LearnerMaster (if Observer). * @param newLeaderZxid * @throws IOException * @throws InterruptedException */ protected void syncWithLeader(long newLeaderZxid) throws Exception { QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); QuorumVerifier newLeaderQV = null; // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot // For SNAP and TRUNC the snapshot is needed to save that history boolean snapshotNeeded = true; boolean syncSnapshot = false; readPacket(qp); Deque<Long> packetsCommitted = new ArrayDeque<>(); Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); self.setSyncMode(QuorumPeer.SyncMode.DIFF); snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { self.setSyncMode(QuorumPeer.SyncMode.SNAP); LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); // The leader is going to dump the database // db is clear as part of deserializeSnapshot() zk.getZKDatabase().deserializeSnapshot(leaderIs); // ZOOKEEPER-2819: overwrite config node content extracted // from leader snapshot with local config, to avoid potential // inconsistency of config node content during rolling restart. if (!QuorumPeerConfig.isReconfigEnabled()) { LOG.debug("Reset config node content from local config after deserialization of snapshot."); zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); } String signature = leaderIs.readString("signature"); if (!signature.equals("BenWasHere")) { LOG.error("Missing signature. Got {}", signature); throw new IOException("Missing signature"); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); // immediately persist the latest snapshot when there is txn log gap syncSnapshot = true; } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader self.setSyncMode(QuorumPeer.SyncMode.TRUNC); LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { // not able to truncate the log LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); System.exit(ExitCode.QUORUM_PACKET_ERROR.getValue()); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else { LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); System.exit(ExitCode.QUORUM_PACKET_ERROR.getValue()); } zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); long lastQueued = 0; // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. boolean isPreZAB1_0 = true; //If we are not going to take the snapshot be sure the transactions are not applied in memory // but written out to the transaction log boolean writeToTxnLog = !snapshotNeeded; // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { readPacket(qp); switch (qp.getType()) { case Leader.PROPOSAL: PacketInFlight pif = new PacketInFlight(); pif.hdr = new TxnHeader(); pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr); if (pif.hdr.getZxid() != lastQueued + 1) { LOG.warn( "Got zxid 0x{} expected 0x{}", Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued + 1)); } lastQueued = pif.hdr.getZxid(); if (pif.hdr.getType() == OpCode.reconfig) { SetDataTxn setDataTxn = (SetDataTxn) pif.rec; QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData())); self.setLastSeenQuorumVerifier(qv, true); } packetsNotCommitted.add(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: pif = packetsNotCommitted.peekFirst(); if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) { QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData())); boolean majorChange = self.processReconfig( qv, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), true); if (majorChange) { throw new Exception("changes proposed in reconfig"); } } if (!writeToTxnLog) { if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn( "Committing 0x{}, but next proposal is 0x{}", Long.toHexString(qp.getZxid()), Long.toHexString(pif.hdr.getZxid())); } else { zk.processTxn(pif.hdr, pif.rec); packetsNotCommitted.remove(); } } else { packetsCommitted.add(qp.getZxid()); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: PacketInFlight packet = new PacketInFlight(); packet.hdr = new TxnHeader(); if (qp.getType() == Leader.INFORMANDACTIVATE) { ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); byte[] remainingdata = new byte[buffer.remaining()]; buffer.get(remainingdata); packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr); QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData())); boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); if (majorChange) { throw new Exception("changes proposed in reconfig"); } } else { packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr); // Log warning message if txn comes out-of-order if (packet.hdr.getZxid() != lastQueued + 1) { LOG.warn( "Got zxid 0x{} expected 0x{}", Long.toHexString(packet.hdr.getZxid()), Long.toHexString(lastQueued + 1)); } lastQueued = packet.hdr.getZxid(); } if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { packetsNotCommitted.add(packet); packetsCommitted.add(qp.getZxid()); } break; // 無需更新數據時,直接退出 while case Leader.UPTODATE: LOG.info("Learner received UPTODATE message"); if (newLeaderQV != null) { boolean majorChange = self.processReconfig(newLeaderQV, null, null, true); if (majorChange) { throw new Exception("changes proposed in reconfig"); } } if (isPreZAB1_0) { zk.takeSnapshot(syncSnapshot); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery // means this is Zab 1.0 LOG.info("Learner received NEWLEADER message"); if (qp.getData() != null && qp.getData().length > 1) { try { QuorumVerifier qv = self.configFromString(new String(qp.getData())); self.setLastSeenQuorumVerifier(qv, true); newLeaderQV = qv; } catch (Exception e) { e.printStackTrace(); } } if (snapshotNeeded) { zk.takeSnapshot(syncSnapshot); } self.setCurrentEpoch(newEpoch); writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); sock.setSoTimeout(self.tickTime * self.syncLimit); self.setSyncMode(QuorumPeer.SyncMode.NONE); // 將 zk 拉起來 zk.startup(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and * send leader election notifications to the ensemble. * * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 */ self.updateElectionVote(newEpoch); // We need to log the stuff that came in between the snapshot and the uptodate if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { fzk.logRequest(p.hdr, p.rec); } for (Long zxid : packetsCommitted) { fzk.commit(zxid); } } else if (zk instanceof ObserverZooKeeperServer) { // Similar to follower, we need to log requests between the snapshot // and UPTODATE ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { Long zxid = packetsCommitted.peekFirst(); if (p.hdr.getZxid() != zxid) { // log warning message if there is no matching commit // old leader send outstanding proposal to observer LOG.warn( "Committing 0x{}, but next proposal is 0x{}", Long.toHexString(zxid), Long.toHexString(p.hdr.getZxid())); continue; } packetsCommitted.remove(); Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); request.setTxn(p.rec); request.setHdr(p.hdr); ozk.commitRequest(request); } } else { // New server type need to handle in-flight packets throw new UnsupportedOperationException("Unknown server type"); } } // org.apache.zookeeper.server.ZooKeeperServer#startup public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); // 此實現由子類實現,建立處理鏈 setupRequestProcessors(); startRequestThrottler(); registerJMX(); startJvmPauseMonitor(); registerMetrics(); setState(State.RUNNING); requestPathMetricsCollector.start(); localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); notifyAll(); } // org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#setupRequestProcessors @Override protected void setupRequestProcessors() { //最後一個處理器是 FinalRequestProcessor RequestProcessor finalProcessor = new FinalRequestProcessor(this); // 倒數第二個處理器是 CommitProcessor commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); // 倒數第三個處理器是 FollowerRequestProcessor // 因此,最終的調用鏈是 FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); // 另外還有一個 SyncRequestProcessor 單獨啓動 syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower())); syncProcessor.start(); }
// LEADING 時,建立 Leader 實例, 調用 lead() 處理,仍然會存在內部處理,拋出異常後進行從新獲取狀態斷定 case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } // org.apache.zookeeper.server.quorum.QuorumPeer#makeLeader protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception { // LeaderZooKeeperServer 負責進行通訊類,數據類操做 // Leader 自己則只進行調度服務 return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb)); } // org.apache.zookeeper.server.quorum.Leader#lead /** * This method is main function that is called to lead * * @throws IOException * @throws InterruptedException */ void lead() throws IOException, InterruptedException { self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken); LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT); self.start_fle = 0; self.end_fle = 0; // 環境初始化 zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean); try { self.setZabState(QuorumPeer.ZabState.DISCOVERY); self.tick.set(0); zk.loadData(); leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); // Start thread that waits for connection requests from // new followers. // 開啓服務端接收數據線程, sock 端口已在構造函數中初始化好了 cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); synchronized (this) { lastProposed = zk.getZxid(); } newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) { LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid())); } QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier(); QuorumVerifier curQV = self.getQuorumVerifier(); if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) { // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly // specified by the user; the lack of version in a config file is interpreted as version=0). // As soon as a config is established we would like to increase its version so that it // takes presedence over other initial configs that were not established (such as a config // of a server trying to join the ensemble, which may be a partial view of the system, not the full config). // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE, // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier, // and there's still no agreement on the new version that we'd like to use. Instead, we use // lastSeenQuorumVerifier which is being sent with NEWLEADER message // so its a good way to let followers know about the new version. (The original reason for sending // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs // that it finds before starting to propose operations. Here we're reusing the same code path for // reaching consensus on the new version number.) // It is important that this is done before the leader executes waitForEpochAck, // so before LearnerHandlers return from their waitForEpochAck // hence before they construct the NEWLEADER message containing // the last-seen-quorumverifier of the leader, which we change below try { QuorumVerifier newQV = self.configFromString(curQV.toString()); newQV.setVersion(zk.getZxid()); self.setLastSeenQuorumVerifier(newQV, true); } catch (Exception e) { throw new IOException(e); } } newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) { newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } // We have to get at least a majority of servers in sync with // us. We do this by waiting for the NEWLEADER packet to get // acknowledged waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId()); self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); try { waitForNewLeaderAck(self.getId(), zk.getZxid()); } catch (InterruptedException e) { shutdown("Waiting for a quorum of followers, only synced with sids: [ " + newLeaderProposal.ackSetsToString() + " ]"); HashSet<Long> followerSet = new HashSet<Long>(); for (LearnerHandler f : getLearners()) { if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) { followerSet.add(f.getSid()); } } boolean initTicksShouldBeIncreased = true; for (Proposal.QuorumVerifierAcksetPair qvAckset : newLeaderProposal.qvAcksetPairs) { if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) { initTicksShouldBeIncreased = false; break; } } if (initTicksShouldBeIncreased) { LOG.warn("Enough followers present. Perhaps the initTicks need to be increased."); } return; } // 開啓服務器,進入鏈路建立 startZkServer(); /** * WARNING: do not use this for anything other than QA testing * on a real cluster. Specifically to enable verification that quorum * can handle the lower 32bit roll-over issue identified in * ZOOKEEPER-1277. Without this option it would take a very long * time (on order of a month say) to see the 4 billion writes * necessary to cause the roll-over to occur. * * This field allows you to override the zxid of the server. Typically * you'll want to set it to something like 0xfffffff0 and then * start the quorum, run some operations and see the re-election. */ String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid"); if (initialZxid != null) { long zxid = Long.parseLong(initialZxid); zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid); } if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) { self.setZooKeeperServer(zk); } self.setZabState(QuorumPeer.ZabState.BROADCAST); self.adminServer.setZooKeeperServer(zk); // Everything is a go, simply start counting the ticks // WARNING: I couldn't find any wait statement on a synchronized // block that would be notified by this notifyAll() call, so // I commented it out //synchronized (this) { // notifyAll(); //} // We ping twice a tick, so we only update the tick every other // iteration boolean tickSkip = true; // If not null then shutdown this leader String shutdownMessage = null; while (true) { synchronized (this) { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.tickTime / 2; while (cur < end) { wait(end - cur); cur = Time.currentElapsedTime(); } if (!tickSkip) { self.tick.incrementAndGet(); } // We use an instance of SyncedLearnerTracker to // track synced learners to make sure we still have a // quorum of current (and potentially next pending) view. SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker(); syncedAckSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) { syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } syncedAckSet.addAck(self.getId()); for (LearnerHandler f : getLearners()) { if (f.synced()) { syncedAckSet.addAck(f.getSid()); } } // check leader running status if (!this.isRunning()) { // set shutdown flag shutdownMessage = "Unexpected internal error"; break; } if (!tickSkip && !syncedAckSet.hasAllQuorums()) { // Lost quorum of last committed and/or last proposed // config, set shutdown flag shutdownMessage = "Not sufficient followers synced, only synced with sids: [ " + syncedAckSet.ackSetsToString() + " ]"; break; } tickSkip = !tickSkip; } for (LearnerHandler f : getLearners()) { f.ping(); } } if (shutdownMessage != null) { shutdown(shutdownMessage); // leader goes in looking state } } finally { zk.unregisterJMX(this); } } // org.apache.zookeeper.server.quorum.Leader#startZkServer /** * Start up Leader ZooKeeper server and initialize zxid to the new epoch */ private synchronized void startZkServer() { // Update lastCommitted and Db's zxid to a value representing the new epoch lastCommitted = zk.getZxid(); LOG.info("Have quorum of supporters, sids: [{}]; starting up and setting last processed zxid: 0x{}", newLeaderProposal.ackSetsToString(), Long.toHexString(zk.getZxid())); /* * ZOOKEEPER-1324. the leader sends the new config it must complete * to others inside a NEWLEADER message (see LearnerHandler where * the NEWLEADER message is constructed), and once it has enough * acks we must execute the following code so that it applies the * config to itself. */ QuorumVerifier newQV = self.getLastSeenQuorumVerifier(); Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid()); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { allowedToCommit = false; } leaderStartTime = Time.currentElapsedTime(); // 仍然調用 zk 進行服務建立 zk.startup(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and * send leader election notifications to the ensemble. * * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 */ self.updateElectionVote(getEpoch()); zk.getZKDatabase().setlastProcessedZxid(zk.getZxid()); } // org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#startup @Override public synchronized void startup() { super.startup(); if (containerManager != null) { containerManager.start(); } } // org.apache.zookeeper.server.ZooKeeperServer#startup // 父類框架 public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); startRequestThrottler(); registerJMX(); startJvmPauseMonitor(); registerMetrics(); setState(State.RUNNING); requestPathMetricsCollector.start(); localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); notifyAll(); } // org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors @Override protected void setupRequestProcessors() { // 最後一個處理器是 FinalRequestProcessor RequestProcessor finalProcessor = new FinalRequestProcessor(this); // 倒數第二個處理器是 ToBeAppliedRequestProcessor RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); // 倒數第三個處理器是 CommitProcessor commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); // 倒數第四個處理器是 ProposalRequestProcessor ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); // 第二個處理器是 PrepRequestProcessor prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); // 第一個處理器是 LeaderRequestProcessor firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); // 因此最終的鏈路是 LeaderRequestProcessor -> PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor setupContainerManager(); } private synchronized void setupContainerManager() { containerManager = new ContainerManager( getZKDatabase(), prepRequestProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000)); }
// 建立 Observer, 調用 observeLeader() 方法 case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { observer.shutdown(); setObserver(null); updateServerState(); // Add delay jitter before we switch to LOOKING // state to reduce the load of ObserverMaster if (isRunning()) { Observer.waitForObserverElectionDelay(); } } break; // org.apache.zookeeper.server.quorum.Observer#observeLeader /** * the main method called by the observer to observe the leader * @throws Exception */ void observeLeader() throws Exception { zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean); long connectTime = 0; boolean completedSync = false; try { self.setZabState(QuorumPeer.ZabState.DISCOVERY); QuorumServer master = findLearnerMaster(); try { connectToLeader(master.addr, master.hostname); connectTime = System.currentTimeMillis(); long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO); if (self.isReconfigStateChange()) { throw new Exception("learned about role change"); } self.setLeaderAddressAndId(master.addr, master.getId()); self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); // 與 Leader 同步時,創建處理鏈路,調用 父類 Learner 的方法 syncWithLeader(newLeaderZxid); self.setZabState(QuorumPeer.ZabState.BROADCAST); completedSync = true; QuorumPacket qp = new QuorumPacket(); // 一直循環處理 while (this.isRunning() && nextLearnerMaster.get() == null) { readPacket(qp); processPacket(qp); } } catch (Exception e) { LOG.warn("Exception when observing the leader", e); closeSocket(); // clear pending revalidations pendingRevalidations.clear(); } } finally { currentLearnerMaster = null; zk.unregisterJMX(this); if (connectTime != 0) { long connectionDuration = System.currentTimeMillis() - connectTime; LOG.info( "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}", leaderAddr, connectionDuration, completedSync); messageTracker.dumpToLog(leaderAddr.toString()); } } } // org.apache.zookeeper.server.quorum.Learner#syncWithLeader /** * Finally, synchronize our history with the Leader (if Follower) * or the LearnerMaster (if Observer). * @param newLeaderZxid * @throws IOException * @throws InterruptedException */ protected void syncWithLeader(long newLeaderZxid) throws Exception { QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); QuorumVerifier newLeaderQV = null; // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot // For SNAP and TRUNC the snapshot is needed to save that history boolean snapshotNeeded = true; boolean syncSnapshot = false; readPacket(qp); Deque<Long> packetsCommitted = new ArrayDeque<>(); Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); self.setSyncMode(QuorumPeer.SyncMode.DIFF); snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { self.setSyncMode(QuorumPeer.SyncMode.SNAP); LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); // The leader is going to dump the database // db is clear as part of deserializeSnapshot() zk.getZKDatabase().deserializeSnapshot(leaderIs); // ZOOKEEPER-2819: overwrite config node content extracted // from leader snapshot with local config, to avoid potential // inconsistency of config node content during rolling restart. if (!QuorumPeerConfig.isReconfigEnabled()) { LOG.debug("Reset config node content from local config after deserialization of snapshot."); zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); } String signature = leaderIs.readString("signature"); if (!signature.equals("BenWasHere")) { LOG.error("Missing signature. Got {}", signature); throw new IOException("Missing signature"); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); // immediately persist the latest snapshot when there is txn log gap syncSnapshot = true; } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader self.setSyncMode(QuorumPeer.SyncMode.TRUNC); LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { // not able to truncate the log LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); System.exit(ExitCode.QUORUM_PACKET_ERROR.getValue()); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else { LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); System.exit(ExitCode.QUORUM_PACKET_ERROR.getValue()); } zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); long lastQueued = 0; // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. boolean isPreZAB1_0 = true; //If we are not going to take the snapshot be sure the transactions are not applied in memory // but written out to the transaction log boolean writeToTxnLog = !snapshotNeeded; // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { readPacket(qp); switch (qp.getType()) { case Leader.PROPOSAL: PacketInFlight pif = new PacketInFlight(); pif.hdr = new TxnHeader(); pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr); if (pif.hdr.getZxid() != lastQueued + 1) { LOG.warn( "Got zxid 0x{} expected 0x{}", Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued + 1)); } lastQueued = pif.hdr.getZxid(); if (pif.hdr.getType() == OpCode.reconfig) { SetDataTxn setDataTxn = (SetDataTxn) pif.rec; QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData())); self.setLastSeenQuorumVerifier(qv, true); } packetsNotCommitted.add(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: pif = packetsNotCommitted.peekFirst(); if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) { QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData())); boolean majorChange = self.processReconfig( qv, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), true); if (majorChange) { throw new Exception("changes proposed in reconfig"); } } if (!writeToTxnLog) { if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn( "Committing 0x{}, but next proposal is 0x{}", Long.toHexString(qp.getZxid()), Long.toHexString(pif.hdr.getZxid())); } else { zk.processTxn(pif.hdr, pif.rec); packetsNotCommitted.remove(); } } else { packetsCommitted.add(qp.getZxid()); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: PacketInFlight packet = new PacketInFlight(); packet.hdr = new TxnHeader(); if (qp.getType() == Leader.INFORMANDACTIVATE) { ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); byte[] remainingdata = new byte[buffer.remaining()]; buffer.get(remainingdata); packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr); QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData())); boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); if (majorChange) { throw new Exception("changes proposed in reconfig"); } } else { packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr); // Log warning message if txn comes out-of-order if (packet.hdr.getZxid() != lastQueued + 1) { LOG.warn( "Got zxid 0x{} expected 0x{}", Long.toHexString(packet.hdr.getZxid()), Long.toHexString(lastQueued + 1)); } lastQueued = packet.hdr.getZxid(); } if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { packetsNotCommitted.add(packet); packetsCommitted.add(qp.getZxid()); } break; case Leader.UPTODATE: LOG.info("Learner received UPTODATE message"); if (newLeaderQV != null) { boolean majorChange = self.processReconfig(newLeaderQV, null, null, true); if (majorChange) { throw new Exception("changes proposed in reconfig"); } } if (isPreZAB1_0) { zk.takeSnapshot(syncSnapshot); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery // means this is Zab 1.0 LOG.info("Learner received NEWLEADER message"); if (qp.getData() != null && qp.getData().length > 1) { try { QuorumVerifier qv = self.configFromString(new String(qp.getData())); self.setLastSeenQuorumVerifier(qv, true); newLeaderQV = qv; } catch (Exception e) { e.printStackTrace(); } } if (snapshotNeeded) { zk.takeSnapshot(syncSnapshot); } self.setCurrentEpoch(newEpoch); writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); sock.setSoTimeout(self.tickTime * self.syncLimit); self.setSyncMode(QuorumPeer.SyncMode.NONE); zk.startup(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and * send leader election notifications to the ensemble. * * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 */ self.updateElectionVote(newEpoch); // We need to log the stuff that came in between the snapshot and the uptodate if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { fzk.logRequest(p.hdr, p.rec); } for (Long zxid : packetsCommitted) { fzk.commit(zxid); } } else if (zk instanceof ObserverZooKeeperServer) { // Similar to follower, we need to log requests between the snapshot // and UPTODATE ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { Long zxid = packetsCommitted.peekFirst(); if (p.hdr.getZxid() != zxid) { // log warning message if there is no matching commit // old leader send outstanding proposal to observer LOG.warn( "Committing 0x{}, but next proposal is 0x{}", Long.toHexString(zxid), Long.toHexString(p.hdr.getZxid())); continue; } packetsCommitted.remove(); Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); request.setTxn(p.rec); request.setHdr(p.hdr); ozk.commitRequest(request); } } else { // New server type need to handle in-flight packets throw new UnsupportedOperationException("Unknown server type"); } } // org.apache.zookeeper.server.ZooKeeperServer#startup public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); startRequestThrottler(); registerJMX(); startJvmPauseMonitor(); registerMetrics(); setState(State.RUNNING); requestPathMetricsCollector.start(); localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); notifyAll(); } // org.apache.zookeeper.server.quorum.ObserverZooKeeperServer#setupRequestProcessors /** * Set up the request processors for an Observer: * firstProcesor->commitProcessor->finalProcessor */ @Override protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); // 最終的處理鏈路是 ObserverRequestProcessor -> CommitProcessor -> FinalRequestProcessor firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
AckRequestProcessor: 將前一階段的請求做爲ACK轉發給Leader。
CommitProcessor: 將到來的請求與本地提交的請求進行匹配,這是由於改變系統狀態的本地請求的返回結果是到來的請求。
FinalRequestProcessor: 一般是請求處理鏈的最後一個處理器。
FollowerRequestProcessor: 將修改了系統狀態的請求轉發給Leader。
ObserverRequestProcessor: 同FollowerRequestProcessor同樣,將修改了系統狀態的請求轉發給Leader。
PrepRequestProcessor: 一般是請求處理鏈的第一個處理器。
ProposalRequestProcessor: 將請求轉發給AckRequestProcessor和SyncRequestProcessor。
ReadOnlyRequestProcessor: 是ReadOnlyZooKeeperServer請求處理鏈的第一個處理器,將只讀請求傳遞給下個處理器,拋棄改變狀態的請求。
SendAckRequestProcessor: 發送ACK請求的處理器。
SyncRequestProcessor: 發送Sync請求的處理器。
ToBeAppliedRequestProcessor: 維護toBeApplied列表,下個處理器必須是FinalRequestProcessor而且FinalRequestProcessor必須同步處理請求。
在這個建立過程,咱們能夠看到 幾個設計模式的身影: 責任鏈模式、模板方法模式、狀態模式。 以及如何及時處理來自來自服務端的請求。
看完各個處理器的總體架構以後,後續我再從各個細節看問題,相信會更容易理解全局。