zookeeper(16)源碼分析-ZAB協議

Zookeeper使用了Zookeeper Atomic Broadcast(ZAB,Zookeeper原子消息廣播協議)的協議做爲其數據一致性的核心算法。ZAB協議是爲Zookeeper專門設計的一種支持崩潰恢復的原子廣播協議。算法

ZAB理論

ZAB協議的核心是定義了對於那些會改變Zookeeper服務器數據狀態的事務請求的處理方式,即:全部事務請求必須由一個全局惟一的服務器來協調處理,這樣的服務器被稱爲Leader服務器,餘下的服務器則稱爲Follower服務器,Leader服務器負責將一個客戶端事務請求轉化成一個事務Proposal(提議),並將該Proposal分發給集羣中全部的Follower服務器,以後Leader服務器須要等待全部Follower服務器的反饋,一旦超過半數的Follower服務器進行了正確的反饋後,那麼Leader就會再次向全部的Follower服務器分發Commit消息,要求其將前一個Proposal進行提交。數據庫

ZAB一些包括兩種基本的模式:崩潰恢復消息廣播服務器

一、當整個服務框架啓動過程當中或Leader服務器出現網絡中斷、崩潰退出與重啓等異常狀況時,ZAB協議就會進入恢復模式並選舉產生新的Leader服務器。當選舉產生了新的Leader服務器,同時集羣中已經有過半的機器與該Leader服務器完成了狀態同步以後,ZAB協議就會退出恢復模式,狀態同步是指數據同步,用來保證集羣在過半的機器可以和Leader服務器的數據狀態保持一致。網絡

二、當集羣中已經有過半的Follower服務器完成了和Leader服務器的狀態同步,那麼整個服務框架就能夠進入消息廣播模式,當一臺一樣遵照ZAB協議的服務器啓動後加入到集羣中,若是此時集羣中已經存在一個Leader服務器在負責進行消息廣播,那麼加入的服務器就會自覺地進入數據恢復模式:找到Leader所在的服務器,並與其進行數據同步,而後一塊兒參與到消息廣播流程中去。Zookeeper只容許惟一的一個Leader服務器來進行事務請求的處理,Leader服務器在接收到客戶端的事務請求後,會生成對應的事務提議併發起一輪廣播協議,而若是集羣中的其餘機器收到客戶端的事務請求後,那麼這些非Leader服務器會首先將這個事務請求轉發給Leader服務器。併發

三、當Leader服務器出現崩潰或者機器重啓、集羣中已經不存在過半的服務器與Leader服務器保持正常通訊時,那麼在從新開始新的一輪的原子廣播事務操做以前,全部進程首先會使用崩潰恢復協議來使彼此到達一致狀態,因而整個ZAB流程就會從消息廣播模式進入到崩潰恢復模式。一個機器要成爲新的Leader,必須得到過半機器的支持,同時因爲每一個機器都有可能會崩潰,所以,ZAB協議運行過程當中,先後會出現多個Leader,而且每臺機器也有可能會屢次成爲Leader,進入崩潰恢復模式後,只要集羣中存在過半的服務器可以彼此進行正常通訊,那麼就能夠產生一個新的Leader並再次進入消息廣播模式。如一個由三臺機器組成的ZAB服務,一般由一個Leader、2個Follower服務器組成,某一個時刻,加入其中一個Follower掛了,整個ZAB集羣是不會中斷服務的。app

① 消息廣播

Zab協議消息廣播有如下4個步驟組成:框架

  1. Leader發送PROPOSAL給集羣中全部的節點。
  2. 節點在收到PROPOSAL以後,把PROPOSAL落盤,發送一個ACK給Leader。
  3. Leader在收到大多數節點的ACK以後,發送COMMIT給集羣中全部的Follower節點。
  4. 若是存在Observer節點,Leader同時發送INFORM信息給Observer服務節點同步數據,Observer只接收Leader的INFORM消息同步數據,不參與Leader選舉和事務提交。

zookeeper(16)源碼分析-ZAB協議

② 崩潰恢復

在Leader服務器出現崩潰,或者因爲網絡緣由致使Leader服務器失去了與過半Follower的聯繫,那麼就會進入崩潰恢復模式,在ZAB協議中,爲了保證程序的正確運行,整個恢復過程結束後須要選舉出一個新的Leader服務器,所以,ZAB協議須要一個高效且可靠的Leader選舉算法,從而保證可以快速地選舉出新的Leader,同時,Leader選舉算法不只僅須要讓Leader自身知道已經被選舉爲Leader,同時還須要讓集羣中的全部其餘機器也可以快速地感知到選舉產生的新的Leader服務器。ide

③ ZAB基本特性

ZAB協議的基本原則oop

3.一、ZAB協議須要確保那些已經在Leader服務器上提交的事務最終被全部服務器都提交

假設一個事務在Leader服務器上被提交了,而且已經獲得了過半Follower服務器的Ack反饋,可是在它Commit消息發送給全部Follower機器以前,Leader服務掛了。以下圖所示:源碼分析

zookeeper(16)源碼分析-ZAB協議

在集羣正常運行過程當中的某一個時刻,Server1是Leader服務器,其前後廣播了P一、P二、C一、P三、C2(C2是Commit Of Proposal2的縮寫),其中,當Leader服務器發出C2後就當即崩潰退出了,針對這種狀況,ZAB協議就須要確保事務Proposal2最終可以在全部的服務器上都被提交成功,不然將出現不一致。

3.二、ZAB協議須要確保丟棄那些只在Leader服務器上被提出的事務。

若是在崩潰恢復過程當中出現一個須要被丟棄的提議,那麼在崩潰恢復結束後須要跳過該事務Proposal,以下圖所示:

zookeeper(16)源碼分析-ZAB協議

假設初始的Leader服務器Server1在提出一個事務Proposal3以後就崩潰退出了,從而致使集羣中的其餘服務器都沒有收到這個事務Proposal,因而,當Server1恢復過來再次加入到集羣中的時候,ZAB協議須要確保丟棄Proposal3這個事務。

3.三、ZAB協議必須的Leader選舉算法

可以確保提交已經被Leader提交的事務的Proposal,同時丟棄已經被跳過的事務Proposal。若是讓Leader選舉算法可以保證新選舉出來的Leader服務器擁有集羣中全部機器最高編號(ZXID最大)的事務Proposal,那麼就能夠保證這個新選舉出來的Leader必定具備全部已經提交的提議,更爲重要的是若是讓具備最高編號事務的Proposal機器稱爲Leader,就能夠省去Leader服務器查詢Proposal的提交和丟棄工做這一步驟了。

3.四、數據同步,一致性

完成Leader選舉後,在正式開始工做前,Leader服務器首先會確認日誌中的全部Proposal是否都已經被集羣中的過半機器提交了,便是否完成了數據同步。Leader服務器須要確全部的Follower服務器都可以接收到每一條事務Proposal,而且可以正確地將全部已經提交了的事務Proposal應用到內存數據庫中。Leader服務器會爲每一個Follower服務器維護一個隊列,並將那些沒有被各Follower服務器同步的事務以Proposal消息的形式逐個發送給Follower服務器,並在每個Proposal消息後面緊接着再發送一個Commit消息,以表示該事務已經被提交,等到Follower服務器將全部其還沒有同步的事務Proposal都從Leader服務器上同步過來併成功應用到本地數據庫後,Leader服務器就會將該Follower服務器加入到真正的可用Follower列表並開始以後的其餘流程。

④ ZAB總結

一、 發現,選舉產生Leader,產生最新的epoch(每次選舉產生新Leader的同時產生新epoch)。

二、 同步,各Follower和Leader完成數據同步。

三、廣播,Leader處理客戶端的寫操做,並將狀態變動廣播至Follower,Follower多數經過以後Leader發起將狀態變動落地Commit。

在正常運行過程當中,ZAB協議會一直運行於階段三來反覆進行消息廣播流程,若是出現崩潰或其餘緣由致使Leader缺失,那麼此時ZAB協議會再次進入發現階段,選舉新的Leader。

源碼分析

一、Leader發送PROPOSAL

ProposalRequestProcessor.proce***equest()方法發送PROPOSAL 給每個節點。它調用Leader.propose()方法把PROPOSAL
入隊到各個follower的queuedPackets,而後直接把PROPOSAL提交給leader節點本身的SyncRequestProcessor 。

如下是大概的代碼路徑:

ProposalRequestProcessor.proce***equest(request)
  zks.getLeader().propose(request)
        sendPacket(pp)
            for f in forwardingFollowers
                f.queuePacket(qp) 
                    queuedPackets.add(p)
  syncProcessor.proce***equest(request)

二、Leader處理PROPOSAL

SyncRequestProcessor先處理

SyncRequestProcessor.run() 
    zks.getZKDatabase().append(si) 
    flush(toFlush)
        zks.getZKDatabase().commit() 
            while (!toFlush.isEmpty())
                Request i = toFlush.remove()
                if (nextProcessor != null)
                    nextProcessor.proce***equest(i)

而後是Leader的ACK處理器處理,返回給Leader本身ACK結果

AckRequestProcessor.proce***equest()
    proce***equest()
        leader.processAck(self.getId(), request.zxid, null)

zookeeper(16)源碼分析-ZAB協議

三、Follower處理PROPOSAL

Follower. followLeader()方法處理接收到的QuorumPacket, case Leader.PROPOSAL分支處理的就是PROPOSAL。

Follower.followLeader() 
    loop
    readPacket(qp)
      leaderIs.readRecord(pp, "packet")
        processPacket(qp) 
            case Leader.PROPOSAL
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr)
                fzk.logRequest(hdr, txn)
                    syncProcessor.proce***equest(request) 
            case Leader.COMMIT:
                    fzk.commit(qp.getZxid())
                        commitProcessor.commit(request)

SyncRequestProcessor的處理邏輯

SyncRequestProcessor.run() 
    zks.getZKDatabase().append(si) 
    flush(toFlush)
        zks.getZKDatabase().commit()
        while (!toFlush.isEmpty())
            Request i = toFlush.remove() 
            if (nextProcessor != null)
                nextProcessor.proce***equest(i)
                    QuorumPacket qp = new QuorumPacket(Leader.ACK) 
                    learner.writePacket(qp, false)
                         leaderOs.writeRecord(pp, "packet")
         ((Flushable)nextProcessor).flush()
                learner.writePacket(null, true) 
                    bufferedOutput.flush()

zookeeper(16)源碼分析-ZAB協議

四、Leader的ACK處理

Leader的processAck()處理ACK消息,若是收到大多數節點的ACK,發送COMMIT給全部的follower節點,並調用leader本身 的CommitProcessor。 processAck()有兩個調用入口:1. LeaderHandler的run()方法處理來自follower的ACK。2. AckRequestProcessor的proce***equest方法處理leader本身的ACK。

Leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()) 
    Proposal p = outstandingProposals.get(zxid)
    p.addAck(sid)
    tryToCommit(p, zxid, followerAddr)
        if !p.hasAllQuorums() 
            return false;
        // Commit on all followers
        commit(zxid)
            QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null)
            sendPacket(qp)
        // Commit on Leader 
        zk.commitProcessor.commit(p.request)

五、Leader的COMMIT處理

CommitProcessor.run()
    request = queuedRequests.poll() 
    processCommitted()
        sendToNextProcessor(pending)

已經提交的請求,交給ToBeAppliedRequestProcessor準備應用到內存數據庫

ToBeAppliedRequestProcessor.proce***equest()
    next.proce***equest(request)

最後交給FinalRequestProcessor,返回響應結果

zookeeper(16)源碼分析-ZAB協議

六、Follower的COMMIT處理

CommitProcessor.run()
    request = queuedRequests.poll() 
    processCommitted()
        sendToNextProcessor(pending) 
//返回響應結果        
FinalRequestProcessor.proce***equest()

zookeeper(16)源碼分析-ZAB協議

相關文章
相關標籤/搜索