zookeeper-數據同步源碼分析

在上一篇對 zookeeper 選舉實現分析以後,咱們知道 zookeeper 集羣在選舉結束以後,leader 節點將進入 LEADING 狀態,follower 節點將進入 FOLLOWING 狀態;此時集羣中節點將進行數據同步操做,以保證數據一致。 只有數據同步完成以後 zookeeper 集羣才具有對外提供服務的能力。java

LEADING

當節點在選舉后角色確認爲 leader 後將會進入 LEADING 狀態,源碼以下:node

public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        
    }
}

QuorumPeer 在節點狀態變動爲 LEADING 以後會建立 leader 實例,並觸發 lead 過程。數據庫

void lead() throws IOException, InterruptedException {
    try {
        // 省略

        /**
         * 開啓線程用於接收 follower 的鏈接請求
         */
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        
        readyToStart = true;

        /**
         * 阻塞等待計算新的 epoch 值,並設置 zxid
         */
        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());          
        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
        
        
        /**
         * 阻塞等待接收過半的 follower 節點發送的 ACKEPOCH 信息; 此時說明已經肯定了本輪選舉後 epoch 值
         */
        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);

        try {
            /**
             * 阻塞等待 超過半數的節點 follower 發送了 NEWLEADER ACK 信息;此時說明過半的 follower 節點已經完成數據同步
             */
            waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
        } catch (InterruptedException e) {
            // 省略
        }

        /**
         * 啓動 zk server,此時集羣能夠對外正式提供服務
         */
        startZkServer();

        // 省略
}

lead 方法的實現可得知,leaderfollower 在數據同步過程當中會執行以下過程:緩存

  • 接收 follower 鏈接
  • 計算新的 epoch 值
  • 通知統一 epoch 值
  • 數據同步
  • 啓動 zk server 對外提供服務

FOLLOWING

下面在看下 follower 節點進入 FOLLOWING 狀態後的操做:服務器

public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                // 省略
            case OBSERVING:
                // 省略
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        
    }
}

QuorumPeer 在節點狀態變動爲 FOLLOWING 以後會建立 follower 實例,並觸發 followLeader 過程。併發

void followLeader() throws InterruptedException {
    // 省略
    try {
        QuorumServer leaderServer = findLeader();            
        try {
            /**
             * follower 與 leader 創建鏈接
             */
            connectToLeader(leaderServer.addr, leaderServer.hostname);

            /**
             * follower 向 leader 提交節點信息用於計算新的 epoch 值
             */
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            
            /**
             * follower 與 leader 數據同步
             */
            syncWithLeader(newEpochZxid);                
            
             // 省略

        } catch (Exception e) {
             // 省略
        }
    } finally {
        // 省略
    }
}

followLeader 方法的實現可得知,followerleader 在數據同步過程當中會執行以下過程:app

  • 請求鏈接 leader
  • 提交節點信息計算新的 epoch 值
  • 數據同步

下面咱們看下在各個環節的實現細節;less

Leader Follower 創建通訊

follower 請求鏈接
protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = "
                + current.getId());
    }
    return leaderServer;
}
protected void connectToLeader(InetSocketAddress addr, String hostname)
            throws IOException, ConnectException, InterruptedException {
    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);
    for (int tries = 0; tries < 5; tries++) {
        try {
            sock.connect(addr, self.tickTime * self.syncLimit);
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            if (tries == 4) {
                LOG.error("Unexpected exception",e);
                throw e;
            } else {
                LOG.warn("Unexpected exception, tries="+tries+
                        ", connecting to " + addr,e);
                sock = new Socket();
                sock.setSoTimeout(self.tickTime * self.initLimit);
            }
        }
        Thread.sleep(1000);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
            sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

follower 會經過選舉後的投票信息確認 leader 節點地址,併發起鏈接(總共有 5 次嘗試鏈接的機會,若鏈接不通則從新進入選舉過程)ide

leader 接收鏈接
class LearnerCnxAcceptor extends ZooKeeperThread{
    private volatile boolean stop = false;

    public LearnerCnxAcceptor() {
        super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
    }

    @Override
    public void run() {
        try {
            while (!stop) {
                try{
                    /**
                     * 接收 follower 的鏈接,並開啓 LearnerHandler 線程用於處理兩者之間的通訊
                     */
                    Socket s = ss.accept();
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    // 省略
                } catch (SaslException e){
                    LOG.error("Exception while connecting to quorum learner", e);
                }
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e);
        }
    }
}

LearnerCnxAcceptor 實現能夠看出 leader 節點在爲每一個 follower 節點鏈接創建以後都會爲之分配一個 LearnerHandler 線程用於處理兩者之間的通訊。oop

計算新的 epoch 值

follower 在與 leader 創建鏈接以後,會發出 FOLLOWERINFO 信息
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
protected long registerWithLeader(int pktType) throws IOException{
    /**
     * 發送 follower info 信息,包括 last zxid 和 sid
     */
    long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();                
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
    
    /*
     * Add sid to payload
     */
    LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());
    
    /**
     * follower 向 leader 發送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
     */
    writePacket(qp, true);
    
    // 省略
}

接下來咱們看下 leader 在接收到 FOLLOWERINFO 信息以後作什麼(參考 LearnerHandler)

public void run() {
    try {
        // 省略
        /**
         * leader 接收 follower 發送的 FOLLOWERINFO 信息,包括 follower 節點的 zxid,sid,protocol version
         * @see Learner.registerWithleader()
         */
        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        byte learnerInfoData[] = qp.getData();
        if (learnerInfoData != null) {
            if (learnerInfoData.length == 8) {
                // 省略
            } else {
                /**
                 * 高版本的 learnerInfoData 包括 long 類型的 sid, int 類型的 protocol version 佔用 12 字節
                 */
                LearnerInfo li = new LearnerInfo();
                ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
                this.sid = li.getServerid();
                this.version = li.getProtocolVersion();
            }
        }

        /**
         * 經過 follower 發送的 zxid,解析出 foloower 節點的 epoch 值
         */
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        
        long peerLastZxid;
        StateSummary ss = null;
        long zxid = qp.getZxid();

        /**
         * 阻塞等待計算新的 epoch 值
         */
        long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
      
        // 省略
    }

從上述代碼可知,leader 在接收到 follower 發送的 FOLLOWERINFO 信息以後,會解析出 follower 節點的 acceptedEpoch 值並參與到新的 epoch 值計算中。 (具體計算邏輯參考方法 getEpochToPropose

public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
    synchronized(connectingFollowers) {
        if (!waitingForNewEpoch) {
            return epoch;
        }
        // epoch 用來記錄計算後的選舉週期值
        // follower 或 leader 的 acceptedEpoch 值與 epoch 比較;若前者大則將其加一
        if (lastAcceptedEpoch >= epoch) {
            epoch = lastAcceptedEpoch+1;
        }
        // connectingFollowers 用來記錄與 leader 已鏈接的 follower
        connectingFollowers.add(sid);
        QuorumVerifier verifier = self.getQuorumVerifier();
        // 判斷是否已計算出新的 epoch 值的條件是 leader 已經參與了 epoch 值計算,以及超過一半的節點參與了計算
        if (connectingFollowers.contains(self.getId()) && 
                                        verifier.containsQuorum(connectingFollowers)) {
            // 將 waitingForNewEpoch 設置爲 false 說明不須要等待計算新的 epoch 值了
            waitingForNewEpoch = false;
            // 設置 leader 的 acceptedEpoch 值
            self.setAcceptedEpoch(epoch);
            // 喚醒 connectingFollowers wait 的線程
            connectingFollowers.notifyAll();
        } else {
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.getInitLimit()*self.getTickTime();
            while(waitingForNewEpoch && cur < end) {
                // 若未完成新的 epoch 值計算則阻塞等待
                connectingFollowers.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (waitingForNewEpoch) {
                throw new InterruptedException("Timeout while waiting for epoch from quorum");        
            }
        }
        return epoch;
    }
}

從方法 getEpochToPropose 可知 leader 會收集集羣中過半的 follower acceptedEpoch 信息後,選出一個最大值而後加 1 就是 newEpoch 值; 在此過程當中 leader 會進入阻塞狀態直到過半的 follower 參與到計算纔會進入下一階段。

通知新的 epoch 值

leader 在計算出新的 newEpoch 值後,會進入下一階段發送 LEADERINFO 信息 (一樣參考 LearnerHandler

public void run() {
    try {
        // 省略

        /**
         * 阻塞等待計算新的 epoch 值
         */
        long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            
        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            /**
             * 計算出新的 epoch 值後,leader 向 follower 發送 LEADERINFO 信息;包括新的 newEpoch
             */
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            bufferedOutput.flush();

               // 省略
        }
    }
    // 省略
}
protected long registerWithLeader(int pktType) throws IOException{
    // 省略

    /**
     * follower 向 leader 發送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
     */
    writePacket(qp, true);

    /**
     * follower 接收 leader 發送的 LEADERINFO 信息
     */
    readPacket(qp);

    /**
     * 解析 leader 發送的 new epoch 值
     */        
    final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    if (qp.getType() == Leader.LEADERINFO) {
        // we are connected to a 1.0 server so accept the new epoch and read the next packet
        leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
        byte epochBytes[] = new byte[4];
        final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);

        /**
         * new epoch > current accepted epoch 則更新 acceptedEpoch 值
         */
        if (newEpoch > self.getAcceptedEpoch()) {
            wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
            self.setAcceptedEpoch(newEpoch);
        } else if (newEpoch == self.getAcceptedEpoch()) {           
            wrappedEpochBytes.putInt(-1);
        } else {
            throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
        }

        /**
         * follower 向 leader 發送 ACKEPOCH 信息,包括 last zxid
         */
        QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
        writePacket(ackNewEpoch, true);
        return ZxidUtils.makeZxid(newEpoch, 0);
    } 
}

從上述代碼能夠看出在完成 newEpoch 值計算後的 leaderfollower 的交互過程:

  • leaderfollower 發送 LEADERINFO 信息,告知 follower 新的 epoch
  • follower 接收解析 LEADERINFO 信息,若 new epoch 值大於 current accepted epoch 值則更新 acceptedEpoch
  • followerleader 發送 ACKEPOCH 信息,反饋 leader 已收到新的 epoch 值,並附帶 follower 節點的 last zxid

數據同步

LearnerHandler 中 leader 在收到過半的 ACKEPOCH 信息以後將進入數據同步階段
public void run() {
        try {
            // 省略
            // peerLastZxid 爲 follower 的 last zxid
            peerLastZxid = ss.getLastZxid();
            
            /* the default to send to the follower */
            int packetToSend = Leader.SNAP;
            long zxidToSend = 0;
            long leaderLastZxid = 0;
            /** the packets that the follower needs to get updates from **/
            long updates = peerLastZxid;
           
            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
            ReadLock rl = lock.readLock();
            try {
                rl.lock();        
                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();

                LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();

                if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                    /**
                     * follower 與 leader 的 zxid 相同說明 兩者數據一致;同步方式爲差量同步 DIFF,同步的zxid 爲 peerLastZxid, 也就是不須要同步
                     */
                    packetToSend = Leader.DIFF;
                    zxidToSend = peerLastZxid;
                } else if (proposals.size() != 0) {
                    // peerLastZxid 介於 minCommittedLog ,maxCommittedLog 中間
                    if ((maxCommittedLog >= peerLastZxid)
                            && (minCommittedLog <= peerLastZxid)) {
                        /**
                         * 在遍歷 proposals 時,用來記錄上一個 proposal 的 zxid
                         */
                        long prevProposalZxid = minCommittedLog;

                        boolean firstPacket=true;
                        packetToSend = Leader.DIFF;
                        zxidToSend = maxCommittedLog;

                        for (Proposal propose: proposals) {
                            // 跳過 follower 已經存在的提案
                            if (propose.packet.getZxid() <= peerLastZxid) {
                                prevProposalZxid = propose.packet.getZxid();
                                continue;
                            } else {
                                if (firstPacket) {
                                    firstPacket = false;
                                    if (prevProposalZxid < peerLastZxid) {
                                        /**
                                         * 此時說明有部分 proposals 提案在 leader 節點上不存在,則需告訴 follower 丟棄這部分 proposals
                                         * 也就是告訴 follower 先執行回滾 TRUNC ,須要回滾到 prevProposalZxid 處,也就是 follower 須要丟棄 prevProposalZxid ~ peerLastZxid 範圍內的數據
                                         * 剩餘的 proposals 則經過 DIFF 進行同步
                                         */
                                        packetToSend = Leader.TRUNC;                                        
                                        zxidToSend = prevProposalZxid;
                                        updates = zxidToSend;
                                    }
                                }

                                /**
                                 * 將剩餘待 DIFF 同步的提案放入到隊列中,等待發送
                                 */
                                queuePacket(propose.packet);
                                /**
                                 * 每一個提案後對應一個 COMMIT 報文
                                 */
                                QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                        null, null);
                                queuePacket(qcommit);
                            }
                        }
                    } else if (peerLastZxid > maxCommittedLog) {                    
                        /**
                         * follower 的 zxid 比 leader 大 ,則告訴 follower 執行 TRUNC 回滾
                         */
                        packetToSend = Leader.TRUNC;
                        zxidToSend = maxCommittedLog;
                        updates = zxidToSend;
                    } else {
                    }
                } 

            } finally {
                rl.unlock();
            }

             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    ZxidUtils.makeZxid(newEpoch, 0), null, null);
             if (getVersion() < 0x10000) {
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                 // 數據同步完成以後會發送 NEWLEADER 信息
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();
            //Need to set the zxidToSend to the latest zxid
            if (packetToSend == Leader.SNAP) {
                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
            }
            /**
             * 發送數據同步方式信息,告訴 follower 按什麼方式進行數據同步
             */
            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
            bufferedOutput.flush();
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (packetToSend == Leader.SNAP) {
                /**
                 * 若是是全量同步的話,則將 leader 本地數據序列化寫入 follower 的輸出流
                 */
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
            }
            bufferedOutput.flush();
            
            /**
             * 開啓個線程執行 packet 發送
             */
            sendPackets();
            
            /**
             * 接收 follower ack 響應
             */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            /**
             * 阻塞等待過半的 follower ack
             */
            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());

            /**
             * leader 向 follower 發送 UPTODATE,告知其可對外提供服務
             */
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

            // 省略
        } 
    }

從上述代碼能夠看出 leaderfollower 在進行數據同步時會經過 peerLastZxidmaxCommittedLogminCommittedLog 兩個值比較最終決定數據同步方式。

DIFF(差別化同步)

  • followerpeerLastZxid 等於 leaderpeerLastZxid
此時說明 followerleader 數據一致,採用 DIFF 方式同步,也便是無需同步
  • followerpeerLastZxid 介於 maxCommittedLogminCommittedLog 二者之間
此時說明 followerleader 數據存在差別,需對差別的部分進行同步;首先 leader 會向 follower 發送 DIFF 報文告知其同步方式,隨後會發送差別的提案及提案提交報文

交互流程以下:

Leader                 Follower

      |          DIFF         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |

示例: 假設 leader 節點的提案緩存隊列對應的 zxid 依次是:

0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005

follower 節點的 peerLastZxid0x500000003,則須要將 0x5000000040x500000005 兩個提案進行同步;那麼數據包發送過程以下表:

報文類型 ZXID
DIFF 0x500000005
PROPOSAL 0x500000004
COMMIT 0x500000004
PROPOSAL 0x500000005
COMMIT 0x500000005

TRUNC+DIFF(先回滾再差別化同步)

在上文 DIFF 差別化同步時會存在一個特殊場景就是 雖然 followerpeerLastZxid 介於 maxCommittedLogminCommittedLog 二者之間,可是 followerpeerLastZxidleader 節點中不存在; 此時 leader 需告知 follower 先回滾到 peerLastZxid 的前一個 zxid, 回滾後再進行差別化同步。

交互流程以下:

Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |

示例: 假設集羣中三臺節點 A, B, C 某一時刻 A 爲 Leader 選舉週期爲 5, zxid 包括: (0x500000004, 0x500000005, 0x500000006); 假設某一時刻 leader A 節點在處理完事務爲 0x500000007 的請求進行廣播時 leader A 節點服務器宕機致使 0x500000007 該事物沒有被同步出去;在集羣進行下一輪選舉以後 B 節點成爲新的 leader,選舉週期爲 6 對外提供服務處理了新的事務請求包括 0x600000001, 0x600000002;

集羣節點 ZXID 列表
A 0x500000004, 0x500000005, 0x500000006, 0x500000007
B 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002
C 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002

此時節點 A 在重啓加入集羣后,在與 leader B 節點進行數據同步時會發現事務 0x500000007 在 leader 節點中並不存在,此時 leader 告知 A 需先回滾事務到 0x500000006,在差別同步事務 0x600000001,0x600000002;那麼數據包發送過程以下表:

報文類型 ZXID
TRUNC 0x500000006
PROPOSAL 0x600000001
COMMIT 0x600000001
PROPOSAL 0x600000002
COMMIT 0x600000002

TRUNC(回滾同步)

followerpeerLastZxid 大於 leadermaxCommittedLog,則告知 follower 回滾至 maxCommittedLog; 該場景能夠認爲是 TRUNC+DIFF 的簡化模式

交互流程以下:

Leader                 Follower

      |         TRUNC         |  
      | --------------------> |

SNAP(全量同步)

followerpeerLastZxid 小於 leaderminCommittedLog 或者 leader 節點上不存在提案緩存隊列時,將採用 SNAP 全量同步方式。 該模式下 leader 首先會向 follower 發送 SNAP 報文,隨後從內存數據庫中獲取全量數據序列化傳輸給 followerfollower 在接收全量數據後會進行反序列化加載到內存數據庫中。

交互流程以下:

Leader                 Follower

      |         SNAP          |  
      | --------------------> |
      |         DATA          |  
      | --------------------> |

leader 在完成數據同步以後,會向 follower 發送 NEWLEADER 報文,在收到過半的 follower 響應的 ACK 以後此時說明過半的節點完成了數據同步,接下來 leader 會向 follower 發送 UPTODATE 報文告知 follower 節點能夠對外提供服務了,此時 leader 會啓動 zk server 開始對外提供服務。

FOLLOWER 數據同步

下面咱們在看下數據同步階段 FOLLOWER 是如何處理的,參考 Learner.syncWithLeader
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        /**
         * 接收 leader 發送的數據同步方式報文
         */
        readPacket(qp);
        
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
                
            }
            else if (qp.getType() == Leader.SNAP) {
                // 執行加載全量數據
            } else if (qp.getType() == Leader.TRUNC) {
                // 執行回滾
            }
            else {
            
            }
            
            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    // 處理提案
                    break;
                case Leader.COMMIT:
                    // commit proposal
                    break;
                case Leader.INFORM:
                    // 忽略
                    break;
                case Leader.UPTODATE:
                    // 設置 zk server
                    self.cnxnFactory.setZooKeeperServer(zk);
                    // 退出循環                
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                    /**
                     * follower 響應 NEWLEADER ACK
                     */
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }
        ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        // 啓動 zk server
        zk.startup();
        
    }

從上述代碼中能夠看出 follower 在數據同步階段的處理流程以下:

  • follower 接收 leader 發送的數據同步方式(DIFF/TRUNC/SANP)報文並進行相應處理
  • follower 收到 leader 發送的 NEWLEADER 報文後,會向 leader 響應 ACK (leader 在收到過半的 ACK 消息以後會發送 UPTODATE)
  • follower 收到 leader 發送的 UPTODATE 報文後,說明此時能夠對外提供服務,此時將啓動 zk server

小結

最後用一張圖總結下 zk 在完成選舉後數據同步的過程以下圖所示:

相關文章
相關標籤/搜索