在上一篇對 zookeeper 選舉實現分析以後,咱們知道 zookeeper 集羣在選舉結束以後,leader 節點將進入 LEADING 狀態,follower 節點將進入 FOLLOWING 狀態;此時集羣中節點將進行數據同步操做,以保證數據一致。 只有數據同步完成以後 zookeeper 集羣才具有對外提供服務的能力。java
當節點在選舉后角色確認爲 leader 後將會進入 LEADING 狀態,源碼以下:node
public void run() { try { /* * Main loop */ while (running) { switch (getPeerState()) { case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } } } finally { } }
QuorumPeer 在節點狀態變動爲 LEADING 以後會建立 leader 實例,並觸發 lead 過程。數據庫
void lead() throws IOException, InterruptedException { try { // 省略 /** * 開啓線程用於接收 follower 的鏈接請求 */ cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); readyToStart = true; /** * 阻塞等待計算新的 epoch 值,並設置 zxid */ long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); /** * 阻塞等待接收過半的 follower 節點發送的 ACKEPOCH 信息; 此時說明已經肯定了本輪選舉後 epoch 值 */ waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); try { /** * 阻塞等待 超過半數的節點 follower 發送了 NEWLEADER ACK 信息;此時說明過半的 follower 節點已經完成數據同步 */ waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT); } catch (InterruptedException e) { // 省略 } /** * 啓動 zk server,此時集羣能夠對外正式提供服務 */ startZkServer(); // 省略 }
從 lead 方法的實現可得知,leader 與 follower 在數據同步過程當中會執行以下過程:緩存
下面在看下 follower 節點進入 FOLLOWING 狀態後的操做:服務器
public void run() { try { /* * Main loop */ while (running) { switch (getPeerState()) { case LOOKING: // 省略 case OBSERVING: // 省略 case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; } } } finally { } }
QuorumPeer 在節點狀態變動爲 FOLLOWING 以後會建立 follower 實例,並觸發 followLeader 過程。併發
void followLeader() throws InterruptedException { // 省略 try { QuorumServer leaderServer = findLeader(); try { /** * follower 與 leader 創建鏈接 */ connectToLeader(leaderServer.addr, leaderServer.hostname); /** * follower 向 leader 提交節點信息用於計算新的 epoch 值 */ long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); /** * follower 與 leader 數據同步 */ syncWithLeader(newEpochZxid); // 省略 } catch (Exception e) { // 省略 } } finally { // 省略 } }
從 followLeader 方法的實現可得知,follower 與 leader 在數據同步過程當中會執行以下過程:app
下面咱們看下在各個環節的實現細節;less
protected QuorumServer findLeader() { QuorumServer leaderServer = null; // Find the leader by id Vote current = self.getCurrentVote(); for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { // Ensure we have the leader's correct IP address before // attempting to connect. s.recreateSocketAddresses(); leaderServer = s; break; } } if (leaderServer == null) { LOG.warn("Couldn't find the leader with id = " + current.getId()); } return leaderServer; }
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, ConnectException, InterruptedException { sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); for (int tries = 0; tries < 5; tries++) { try { sock.connect(addr, self.tickTime * self.syncLimit); sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { if (tries == 4) { LOG.error("Unexpected exception",e); throw e; } else { LOG.warn("Unexpected exception, tries="+tries+ ", connecting to " + addr,e); sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); } } Thread.sleep(1000); } self.authLearner.authenticate(sock, hostname); leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }
follower 會經過選舉後的投票信息確認 leader 節點地址,併發起鏈接(總共有 5 次嘗試鏈接的機會,若鏈接不通則從新進入選舉過程)ide
class LearnerCnxAcceptor extends ZooKeeperThread{ private volatile boolean stop = false; public LearnerCnxAcceptor() { super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress()); } @Override public void run() { try { while (!stop) { try{ /** * 接收 follower 的鏈接,並開啓 LearnerHandler 線程用於處理兩者之間的通訊 */ Socket s = ss.accept(); s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream( s.getInputStream()); LearnerHandler fh = new LearnerHandler(s, is, Leader.this); fh.start(); } catch (SocketException e) { // 省略 } catch (SaslException e){ LOG.error("Exception while connecting to quorum learner", e); } } } catch (Exception e) { LOG.warn("Exception while accepting follower", e); } } }
從 LearnerCnxAcceptor 實現能夠看出 leader 節點在爲每一個 follower 節點鏈接創建以後都會爲之分配一個 LearnerHandler 線程用於處理兩者之間的通訊。oop
follower 在與 leader 創建鏈接以後,會發出 FOLLOWERINFO 信息
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
protected long registerWithLeader(int pktType) throws IOException{ /** * 發送 follower info 信息,包括 last zxid 和 sid */ long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); /* * Add sid to payload */ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); /** * follower 向 leader 發送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version */ writePacket(qp, true); // 省略 }
接下來咱們看下 leader 在接收到 FOLLOWERINFO 信息以後作什麼(參考 LearnerHandler)
public void run() { try { // 省略 /** * leader 接收 follower 發送的 FOLLOWERINFO 信息,包括 follower 節點的 zxid,sid,protocol version * @see Learner.registerWithleader() */ QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); byte learnerInfoData[] = qp.getData(); if (learnerInfoData != null) { if (learnerInfoData.length == 8) { // 省略 } else { /** * 高版本的 learnerInfoData 包括 long 類型的 sid, int 類型的 protocol version 佔用 12 字節 */ LearnerInfo li = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li); this.sid = li.getServerid(); this.version = li.getProtocolVersion(); } } /** * 經過 follower 發送的 zxid,解析出 foloower 節點的 epoch 值 */ long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); /** * 阻塞等待計算新的 epoch 值 */ long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); // 省略 }
從上述代碼可知,leader 在接收到 follower 發送的 FOLLOWERINFO 信息以後,會解析出 follower 節點的 acceptedEpoch 值並參與到新的 epoch 值計算中。 (具體計算邏輯參考方法 getEpochToPropose)
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { return epoch; } // epoch 用來記錄計算後的選舉週期值 // follower 或 leader 的 acceptedEpoch 值與 epoch 比較;若前者大則將其加一 if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch+1; } // connectingFollowers 用來記錄與 leader 已鏈接的 follower connectingFollowers.add(sid); QuorumVerifier verifier = self.getQuorumVerifier(); // 判斷是否已計算出新的 epoch 值的條件是 leader 已經參與了 epoch 值計算,以及超過一半的節點參與了計算 if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { // 將 waitingForNewEpoch 設置爲 false 說明不須要等待計算新的 epoch 值了 waitingForNewEpoch = false; // 設置 leader 的 acceptedEpoch 值 self.setAcceptedEpoch(epoch); // 喚醒 connectingFollowers wait 的線程 connectingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(waitingForNewEpoch && cur < end) { // 若未完成新的 epoch 值計算則阻塞等待 connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); } } return epoch; } }
從方法 getEpochToPropose 可知 leader 會收集集羣中過半的 follower acceptedEpoch 信息後,選出一個最大值而後加 1 就是 newEpoch 值; 在此過程當中 leader 會進入阻塞狀態直到過半的 follower 參與到計算纔會進入下一階段。
leader 在計算出新的 newEpoch 值後,會進入下一階段發送 LEADERINFO 信息 (一樣參考 LearnerHandler)
public void run() { try { // 省略 /** * 阻塞等待計算新的 epoch 值 */ long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); if (this.getVersion() < 0x10000) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message leader.waitForEpochAck(this.getSid(), ss); } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); /** * 計算出新的 epoch 值後,leader 向 follower 發送 LEADERINFO 信息;包括新的 newEpoch */ QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); // 省略 } } // 省略 }
protected long registerWithLeader(int pktType) throws IOException{ // 省略 /** * follower 向 leader 發送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version */ writePacket(qp, true); /** * follower 接收 leader 發送的 LEADERINFO 信息 */ readPacket(qp); /** * 解析 leader 發送的 new epoch 值 */ final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); if (qp.getType() == Leader.LEADERINFO) { // we are connected to a 1.0 server so accept the new epoch and read the next packet leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); /** * new epoch > current accepted epoch 則更新 acceptedEpoch 值 */ if (newEpoch > self.getAcceptedEpoch()) { wrappedEpochBytes.putInt((int)self.getCurrentEpoch()); self.setAcceptedEpoch(newEpoch); } else if (newEpoch == self.getAcceptedEpoch()) { wrappedEpochBytes.putInt(-1); } else { throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch()); } /** * follower 向 leader 發送 ACKEPOCH 信息,包括 last zxid */ QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); writePacket(ackNewEpoch, true); return ZxidUtils.makeZxid(newEpoch, 0); } }
從上述代碼能夠看出在完成 newEpoch 值計算後的 leader 與 follower 的交互過程:
LearnerHandler 中 leader 在收到過半的 ACKEPOCH 信息以後將進入數據同步階段
public void run() { try { // 省略 // peerLastZxid 爲 follower 的 last zxid peerLastZxid = ss.getLastZxid(); /* the default to send to the follower */ int packetToSend = Leader.SNAP; long zxidToSend = 0; long leaderLastZxid = 0; /** the packets that the follower needs to get updates from **/ long updates = peerLastZxid; ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock(); ReadLock rl = lock.readLock(); try { rl.lock(); final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog(); final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog(); LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog(); if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) { /** * follower 與 leader 的 zxid 相同說明 兩者數據一致;同步方式爲差量同步 DIFF,同步的zxid 爲 peerLastZxid, 也就是不須要同步 */ packetToSend = Leader.DIFF; zxidToSend = peerLastZxid; } else if (proposals.size() != 0) { // peerLastZxid 介於 minCommittedLog ,maxCommittedLog 中間 if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { /** * 在遍歷 proposals 時,用來記錄上一個 proposal 的 zxid */ long prevProposalZxid = minCommittedLog; boolean firstPacket=true; packetToSend = Leader.DIFF; zxidToSend = maxCommittedLog; for (Proposal propose: proposals) { // 跳過 follower 已經存在的提案 if (propose.packet.getZxid() <= peerLastZxid) { prevProposalZxid = propose.packet.getZxid(); continue; } else { if (firstPacket) { firstPacket = false; if (prevProposalZxid < peerLastZxid) { /** * 此時說明有部分 proposals 提案在 leader 節點上不存在,則需告訴 follower 丟棄這部分 proposals * 也就是告訴 follower 先執行回滾 TRUNC ,須要回滾到 prevProposalZxid 處,也就是 follower 須要丟棄 prevProposalZxid ~ peerLastZxid 範圍內的數據 * 剩餘的 proposals 則經過 DIFF 進行同步 */ packetToSend = Leader.TRUNC; zxidToSend = prevProposalZxid; updates = zxidToSend; } } /** * 將剩餘待 DIFF 同步的提案放入到隊列中,等待發送 */ queuePacket(propose.packet); /** * 每一個提案後對應一個 COMMIT 報文 */ QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), null, null); queuePacket(qcommit); } } } else if (peerLastZxid > maxCommittedLog) { /** * follower 的 zxid 比 leader 大 ,則告訴 follower 執行 TRUNC 回滾 */ packetToSend = Leader.TRUNC; zxidToSend = maxCommittedLog; updates = zxidToSend; } else { } } } finally { rl.unlock(); } QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null); if (getVersion() < 0x10000) { oa.writeRecord(newLeaderQP, "packet"); } else { // 數據同步完成以後會發送 NEWLEADER 信息 queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); //Need to set the zxidToSend to the latest zxid if (packetToSend == Leader.SNAP) { zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); } /** * 發送數據同步方式信息,告訴 follower 按什麼方式進行數據同步 */ oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); bufferedOutput.flush(); /* if we are not truncating or sending a diff just send a snapshot */ if (packetToSend == Leader.SNAP) { /** * 若是是全量同步的話,則將 leader 本地數據序列化寫入 follower 的輸出流 */ leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); } bufferedOutput.flush(); /** * 開啓個線程執行 packet 發送 */ sendPackets(); /** * 接收 follower ack 響應 */ qp = new QuorumPacket(); ia.readRecord(qp, "packet"); /** * 阻塞等待過半的 follower ack */ leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType()); /** * leader 向 follower 發送 UPTODATE,告知其可對外提供服務 */ queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); // 省略 } }
從上述代碼能夠看出 leader 和 follower 在進行數據同步時會經過 peerLastZxid 與 maxCommittedLog, minCommittedLog 兩個值比較最終決定數據同步方式。
此時說明 follower 與 leader 數據一致,採用 DIFF 方式同步,也便是無需同步
此時說明 follower 與 leader 數據存在差別,需對差別的部分進行同步;首先 leader 會向 follower 發送 DIFF 報文告知其同步方式,隨後會發送差別的提案及提案提交報文
交互流程以下:
Leader Follower | DIFF | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> |
示例: 假設 leader 節點的提案緩存隊列對應的 zxid 依次是:
0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005
而 follower 節點的 peerLastZxid 爲 0x500000003,則須要將 0x500000004, 0x500000005 兩個提案進行同步;那麼數據包發送過程以下表:
報文類型 | ZXID |
---|---|
DIFF | 0x500000005 |
PROPOSAL | 0x500000004 |
COMMIT | 0x500000004 |
PROPOSAL | 0x500000005 |
COMMIT | 0x500000005 |
在上文 DIFF 差別化同步時會存在一個特殊場景就是 雖然 follower 的 peerLastZxid 介於 maxCommittedLog, minCommittedLog 二者之間,可是 follower 的 peerLastZxid 在 leader 節點中不存在; 此時 leader 需告知 follower 先回滾到 peerLastZxid 的前一個 zxid, 回滾後再進行差別化同步。
交互流程以下:
Leader Follower | TRUNC | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> |
示例: 假設集羣中三臺節點 A, B, C 某一時刻 A 爲 Leader 選舉週期爲 5, zxid 包括: (0x500000004, 0x500000005, 0x500000006); 假設某一時刻 leader A 節點在處理完事務爲 0x500000007 的請求進行廣播時 leader A 節點服務器宕機致使 0x500000007 該事物沒有被同步出去;在集羣進行下一輪選舉以後 B 節點成爲新的 leader,選舉週期爲 6 對外提供服務處理了新的事務請求包括 0x600000001, 0x600000002;
集羣節點 | ZXID 列表 |
---|---|
A | 0x500000004, 0x500000005, 0x500000006, 0x500000007 |
B | 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002 |
C | 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002 |
此時節點 A 在重啓加入集羣后,在與 leader B 節點進行數據同步時會發現事務 0x500000007 在 leader 節點中並不存在,此時 leader 告知 A 需先回滾事務到 0x500000006,在差別同步事務 0x600000001,0x600000002;那麼數據包發送過程以下表:
報文類型 | ZXID |
---|---|
TRUNC | 0x500000006 |
PROPOSAL | 0x600000001 |
COMMIT | 0x600000001 |
PROPOSAL | 0x600000002 |
COMMIT | 0x600000002 |
若 follower 的 peerLastZxid 大於 leader 的 maxCommittedLog,則告知 follower 回滾至 maxCommittedLog; 該場景能夠認爲是 TRUNC+DIFF 的簡化模式
交互流程以下:
Leader Follower | TRUNC | | --------------------> |
若 follower 的 peerLastZxid 小於 leader 的 minCommittedLog 或者 leader 節點上不存在提案緩存隊列時,將採用 SNAP 全量同步方式。 該模式下 leader 首先會向 follower 發送 SNAP 報文,隨後從內存數據庫中獲取全量數據序列化傳輸給 follower, follower 在接收全量數據後會進行反序列化加載到內存數據庫中。
交互流程以下:
Leader Follower | SNAP | | --------------------> | | DATA | | --------------------> |
leader 在完成數據同步以後,會向 follower 發送 NEWLEADER 報文,在收到過半的 follower 響應的 ACK 以後此時說明過半的節點完成了數據同步,接下來 leader 會向 follower 發送 UPTODATE 報文告知 follower 節點能夠對外提供服務了,此時 leader 會啓動 zk server 開始對外提供服務。
下面咱們在看下數據同步階段 FOLLOWER 是如何處理的,參考 Learner.syncWithLeader
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); /** * 接收 leader 發送的數據同步方式報文 */ readPacket(qp); synchronized (zk) { if (qp.getType() == Leader.DIFF) { } else if (qp.getType() == Leader.SNAP) { // 執行加載全量數據 } else if (qp.getType() == Leader.TRUNC) { // 執行回滾 } else { } outerLoop: while (self.isRunning()) { readPacket(qp); switch(qp.getType()) { case Leader.PROPOSAL: // 處理提案 break; case Leader.COMMIT: // commit proposal break; case Leader.INFORM: // 忽略 break; case Leader.UPTODATE: // 設置 zk server self.cnxnFactory.setZooKeeperServer(zk); // 退出循環 break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery /** * follower 響應 NEWLEADER ACK */ writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); // 啓動 zk server zk.startup(); }
從上述代碼中能夠看出 follower 在數據同步階段的處理流程以下:
最後用一張圖總結下 zk 在完成選舉後數據同步的過程以下圖所示: