ZooKeeper系列之(九):投票選舉(1)

投票選舉能夠說是ZooKeeper中的難點所在,也是精華所在了。數據庫

ZooKeeper服務端集羣在正常工做時須要一個Leader和無數個Follower,他們都對外提供統一的服務接口,客戶端鏈接任意一臺服務端發送命令其結果都是相同的,而且結果都是嚴格按照客戶端提交讀寫命令的順序來執行。服務器

其中Leader就負責維持整個集羣數據的一致性和寫操做的順序執行。網絡

固然集羣中老是可能發生故障,當某個Follower發生故障時,該Follower會從新啓動選舉流程並從新鏈接到Leader,成功鏈接後會同步Leader的數據,這樣就算以前有寫操做失敗了,經過同步過程最後會恢復完整的數據庫。Leader/Follower的數據同步過程會在後面詳細論述。數據結構

集羣經過投票選舉過程確認同一時刻集羣最多隻會存在一個Leader。當有新節點加入集羣時,新節點會啓動FastLeaderElection過程來從新選舉,尋找新的Leader。框架

1、QuorumPeerui

每臺服務端主機都會啓動一個QuorumPeer進程,它負責投票選舉,從QuorumPeerMain類中啓動QuorumPeer。spa

QuorumPeer共有4種工做模式,不一樣的模式啓動不一樣的代碼:線程

  1. LOOKING:選舉模式,啓動FastLeaderElection
  2. LEADING:領導者模式,啓動Leader
  3. FOLLOWING:跟隨者模式,啓動Follower
  4. OBSERVING:旁觀者模式,啓動Observer

QuorumPeer剛啓動時首先設置爲LOOKING工做模式,它會啓動投票選舉過程,調用FastLeadElection類,啓動Messenger接收選舉信息。code

它的幾種工做狀態的切換流程是這樣的:server

 

QuorumPeer知道集羣各個機器的IP地址,當經過FastLeaderElection完成選舉過程後會肯定本身是Leader仍是Follower,若是是Leader則調用lead方法進入領導者工做流程;若是是Follower則調用followLeader方法進入追隨者工做流程。

QuorumPeer維持在4種狀態之間不停切換。在LOOKING狀態時有一個細節要關注一下,若是shuttingDownLE爲true,則表示要關閉FastLeaderElection線程從新啓動,這時候會經過Messenger來關閉消息接收,再也不接收集羣的選舉信息。

startLeaderElection方法是在LOOKING狀態啓動選舉過程的。四種狀態的切換過程代碼以下:

while (running) {
       switch (getPeerState()) {
       case LOOKING:       
           try {
              reconfigFlagClear();
              if (shuttingDownLE) {
                    shuttingDownLE = false;
                    startLeaderElection();
              }
              setCurrentVote(makeLEStrategy().lookForLeader());
           } catch (Exception e) {
                  setPeerState(ServerState.LOOKING);
            }
            break;
       case OBSERVING:
            try {
                 setObserver(makeObserver(logFactory));
                 observer.observeLeader();
            } catch (Exception e) {
                 LOG.warn("Unexpected exception",e );
            } finally {
                 observer.shutdown();
                 setObserver(null);  
                 updateServerState();
            }
            break;
       case FOLLOWING:
            try {
                 setFollower(makeFollower(logFactory));
                 follower.followLeader();
            } catch (Exception e) {
                 LOG.warn("Unexpected exception",e);
            } finally {
                 follower.shutdown();
                 setFollower(null);
                 updateServerState();
            }
            break;
       case LEADING:
            try {
                 setLeader(makeLeader(logFactory));
                 leader.lead();
                 setLeader(null);
            } catch (Exception e) {
                 LOG.warn("Unexpected exception",e);
            } finally {
                if (leader != null) {
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                }
                updateServerState();
            }
            break;
     }
 }

2、QuorumCnxManager

QuorumCnxManager是專門用於選舉信息交換的Socket框架,Messenger接收的報文就是從QuorumCnxManager的接收隊列中獲取的。

QuorumCnxManager一般採用Netty框架負責底層Socket鏈接管理,這是一種非阻塞的Socket方式,提供Select方法在多個Socket之間輪詢數據,以先到先得的方式處理來自不一樣QuorumPeer的選舉交換消息。

這套東西和客戶端服務端的工做模式是同樣的,區別只是這裏是專門給選舉過程使用的。

發送線程定義爲SendWorker,接收線程定義爲RecvWorker。對於每個QuorumPeer鏈接都會建立一個SendWorkder和一個RecvWorker。

發送隊列是一個HashMap,定義以下:

ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;

接收隊列定義爲:

ArrayBlockingQueue<Message> recvQueue;

Messenger發送消息時調用QuorumCnxManager的toSend方法,將發送消息添加到queueSendMap中,這裏咱們看到只有發送給遠方的消息纔會真正放入queueSendMap,發給本身的消息直接放到接收隊列recvQueue中。功能很清晰,具體代碼這裏就不看了。

3、Messenger

Messenger定義在FastLeaderElection類中,專用於QuorumPeer之間選舉消息的傳遞,交換toSend/Notification消息體。

它由FastLeaderElection負責啓動,負責選舉過程當中QuorumPeer間傳遞選舉消息。Messenger包括WorkerReceiver和WorkerSender兩個子線程,一個負責接收QuorumPeer消息,另外一個負責發送QuorumPeer消息。

WorkerSender使用ToSend結構發送消息,而WorkerReceiver使用Notification結構接收消息。toSend和Notification消息體的屬性基本是一致性的。

Messenger底層採用QuorumCnxManager做爲Socket鏈接池,每一個QuorumPeer都會建立一個QuorumCnxManager,用於與集羣其餘全部的QuorumPeer的信息交互。

Notification類用於接收Vote信息的消息接收體,在選舉過程當中集羣交換的包都是Notification類型。

ToSend類和Nofitication相似,發送Vote信息的消息定義體。

Notification和ToSend只在Messenger中處理,除此以外的地方看不到這兩種數據結構。

3.1 WorkerReceiver

WorkerReceiver是接收線程,專門接收參與選舉的服務器返回的通知,進行相應的處理。

從QuorumCnxManager獲取網絡接收到的Message包,而後組裝成Notification消息,放入recvqueue隊列,等待後續處理。

若是消息來自觀察者或者追隨者,則將當前的領導者的zxid和sid當即回覆給對方;

若是消息來自另外一個LOOKING狀態的參與選舉者,則將對方發來包裏的zxid和sid等信息組裝Nofitication消息,而後判斷:

  1. 若是接收者是Looking狀態,則將Nofitication放入FastLeaderElection的recvqueue等待處理(這種狀況下,若是對方也是Looking狀態,則判斷哪方的Epoch和Zxid最大,大的爲新的Leader候選,小的拋棄)。
  2. 若是接收者不是Looking狀態而發送者是LOOKING,則將當前接收者認爲的Leader發送給對方。

其餘的包則忽略。

主要代碼以下:

while (!stop) {
    response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
        if((ackstate == QuorumPeer.ServerState.LOOKING)
                             && (n.electionEpoch < logicalclock.get())){
             Vote v = getVote();
             QuorumVerifier qv = self.getQuorumVerifier();
             ToSend notmsg = new ToSend(ToSend.mType.notification,
                                        v.getId(),
                                        v.getZxid(),
                                        logicalclock.get(),
                                        self.getPeerState(),
                                        response.sid,
                                        v.getPeerEpoch(),
                                        qv.toString().getBytes());
              sendqueue.offer(notmsg);
          }
    } else {
        Vote current = self.getCurrentVote();
        if(ackstate == QuorumPeer.ServerState.LOOKING){
             QuorumVerifier qv = self.getQuorumVerifier();
             ToSend notmsg = new ToSend( ToSend.mType.notification,
                                         current.getId(),
                                         current.getZxid(),
                                         current.getElectionEpoch(),
                                         self.getPeerState(),
                                         response.sid,
                                         current.getPeerEpoch(),
                                         qv.toString().getBytes());
             sendqueue.offer(notmsg);
          }
     }
}

3.2 WorkerSender

WorkerSender是發送線程,專門負責發送ToSend消息給參與選舉的其餘機器。

WorkerSender的流程比較簡單,就是將ToSend消息放入發送隊列,發送線程循環發送。

主要代碼以下:

public void run() {
      while (!stop) {
           try {
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                if(m == null) continue;
                process(m);
            } catch (InterruptedException e) {
                break;
            }
      }
      LOG.info("WorkerSender is down");
}
void process(ToSend m) {
      ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                                    m.leader,
                                                    m.zxid,
                                                    m.electionEpoch,
                                                    m.peerEpoch,
                                                    m.configData);
      manager.toSend(m.sid, requestBuffer);
}

manager就是指QuorumCnxManager。

相關文章
相關標籤/搜索