1)判斷是採用單實例模式仍是多實例模式啓動QuorumPeerMain算法
2)在多實例模式下,加載啓動參數中指定的配置文件服務器
3)啓動QuorumPeeride
public class QuorumPeerMain { ... protected QuorumPeer quorumPeer; public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } ... } protected void initializeAndRun(String[] args) throws ConfigException, IOException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // 啓動data目錄下的定時清理任務 ... // 啓動參數中指定配置文件,且配置文件中指定爲多實例 if (args.length == 1 && config.servers.size() > 0) { runFromConfig(config); // } else { // 單實例模式啓動 ... } } public void runFromConfig(QuorumPeerConfig config) throws IOException { ... try { // 客戶端鏈接工廠,默認NIOServerCnxnFactory ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); quorumPeer = new QuorumPeer(config.getServers(), new File(config.getDataDir()), new File(config.getDataLogDir()), config.getElectionAlg(), // 選舉算法,默認FastLeaderElection(3) config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit(), config.getQuorumListenOnAllIPs(), cnxnFactory, config.getQuorumVerifier()); ... quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); ... quorumPeer.start(); // 啓動當前實例 quorumPeer.join(); // 主線程等待quorumPeer線程執行 } catch (InterruptedException e) { ... } } }
1)監聽客戶端鏈接this
2)確認選舉算法,監聽選舉端口spa
3)啓動服務(run),開始選舉線程
4 ) 成爲Leader / Follower / Observer(後續介紹)日誌
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider { ... ServerCnxnFactory cnxnFactory; // NIOServerCnxnFactory ... @Override public synchronized void start() { loadDataBase(); // 加載currentEpoch和acceptedEpoch cnxnFactory.start(); // 監聽客戶端鏈接,NIOServerCnxnFactory.start startLeaderElection(); // 確認選舉算法,監聽選舉端口 super.start(); // 啓動服務(run),開始選舉 } ... synchronized public void startLeaderElection() { try { // 當前投票投給本身 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { ... } ... // 確認選舉算法,監聽選舉端口 this.electionAlg = createElectionAlgorithm(electionType); } ... protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; switch (electionAlgorithm) { ... case 3: qcm = createCnxnManager(); // QuorumCnxManager負責收發投票 QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); // 啓動ServerSocket監聽選舉端口 le = new FastLeaderElection(this, qcm); } ... break; ... } return le; } ... @Override public void run() { ... try { while (running) { switch (getPeerState()) { case LOOKING: if (Boolean.getBoolean("readonlymode.enabled")) { // 啓動ReadOnlyZooKeeperServer處理只讀請求 ... } else { try { setBCVote(null); // 開始選舉,FastLeaderElection.lookForLeader setCurrentVote(makeLEStrategy().lookForLeader()); } ... } break; case OBSERVING: try { setObserver(makeObserver(logFactory)); // Observer observer.observeLeader(); } ... break; case FOLLOWING: try { setFollower(makeFollower(logFactory)); // Follower follower.followLeader(); } ... break; case LEADING: try { setLeader(makeLeader(logFactory)); // Leader leader.lead(); setLeader(null); } ... break; } } } finally { ... } } }
3. 快速選舉算法code
1)廣播推薦的Leader爲本身,並接收其它ZK服務器的投票server
2)選舉進行中blog
1' 若對方推薦的Leader > 本身推薦Leader,跟隨對方投票並廣播
2' 更新票箱,判斷對方投票是否過半,是則返回最終投票,不然選舉繼續
3)選舉已經結束
更新票箱,判斷對方投票是否過半且爲Leader,是則返回最終投票,不然選舉繼續
public class FastLeaderElection implements Election { ... public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ ... starter(self, manager); } private void starter(QuorumPeer self, QuorumCnxManager manager) { ... sendqueue = new LinkedBlockingQueue<ToSend>(); // 待發送投票隊列 recvqueue = new LinkedBlockingQueue<Notification>(); // 待接收投票隊列 // 經過QuorumCnxManager往sendqueue添加投票,從recvqueue獲取投票 this.messenger = new Messenger(manager); } ... public Vote lookForLeader() throws InterruptedException { ... try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); // 投票箱 // 如當前ZK服務器新加入到集羣時,選舉已經結束且當前選舉輪數落後 HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); ... synchronized(this){ logicalclock++; // 選舉輪數 + 1 // 初始推薦Leader爲本身,即初始投票投給本身 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } ... sendNotifications(); // 廣播本身的投票 // 循環直至選舉出Leader while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ // 獲取一個其它ZK服務器的投票(超時時間爲200毫秒) Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); if(n == null){ // 從新廣播本身的投票,增長超時時間 ... } else if(self.getVotingView().containsKey(n.sid)) { switch (n.state) { case LOOKING: // 選舉進行中 if (n.electionEpoch > logicalclock) { // 當前選舉輪數落後 logicalclock = n.electionEpoch; // 更新選舉輪數 recvset.clear(); // 清空票箱 // 對方投票 > 當前投票(對方推薦的LeaderID > 本身推薦的LeaderID) if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); // 跟隨對方投票 } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); // 堅持當前投票 } sendNotifications(); // 從新廣播本身的投票 } else if (n.electionEpoch < logicalclock) { // 當前選舉輪數領先 // 紀錄日誌,不作其它處理 ... break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); // 跟隨對方投票 sendNotifications(); // 從新廣播本身的投票 } ... // 更新票箱 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 對方投票過半 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // 等待200ms接收投票 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { // 對方投票 > 當前投票,則選舉繼續 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } if (n == null) { // 200ms內未接收到新的投票 // 服務器狀態由LOOKING轉爲LEADING/FOLLOWING/OBSERVING self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); // 確認和返回最終投票 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: break; case FOLLOWING: case LEADING: // 如當前ZK服務器新加入到集羣時,選舉已經結束 if(n.electionEpoch == logicalclock) { // ??爲什麼選舉輪數落後則使用outofelection // 更新recvset票箱,並查找當前集羣Leader recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } // 更新outofelection票箱,並查找當前集羣Leader outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? erverState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; ... } } ... } return null; } ... } }