當咱們把zookeeper服務啓動時,首先須要作的一件事就是leader選舉,zookeeper中leader選舉的算法有3種,包括LeaderElection算法、AuthFastLeaderElection算法以及FastLeaderElection算法,其中FastLeadElection算法是默認的,固然,咱們也能夠在配置文件中修改配置項:electionAlg。java
一、當zookeeper服務啓動時,在類QuorumPeerMain中的入口函數main,主線程啓動:算法
public class QuorumPeerMain { private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class); private static final String USAGE = "Usage: QuorumPeerMain configfile"; protected QuorumPeer quorumPeer; /** * To start the replicated server specify the configuration file name on * the command line. * @param args path to the configfile */ public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain();
二、而後即是QuorumPeer重寫Thread.start方法,啓動:服務器
quorumPeer.start(); quorumPeer.join();
在類QuorumPeer中app
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDataBase(); cnxnFactory.start(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } startLeaderElection(); super.start(); }三、能夠從上面的源碼中看到,quorumPeer線程啓動後,首先作的是數據恢復,它會讀取保存在磁盤中的數據:
private void loadDataBase() { try { //從本地文件中恢復db zkDb.loadDataBase(); // load the epochs /* 從最新的zxid恢復epoch變量 其中zxid爲long型,前32位表明epoch值,後32位表明zxid值, 這個zxid(ZooKeeper Transaction Id),即事務id,zookeeper每次更,zxid都會增大 所以越大表明數據越新 */ long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version currentEpoch = epochOfZxid; //....
synchronized public void startLeaderElection() { try { /* 先投本身 */ if (getPeerState() == ServerState.LOOKING) { currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } // if (!getView().containsKey(myid)) { // throw new RuntimeException("My id " + myid + " not in the peer list"); //} if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } this.electionAlg = createElectionAlgorithm(electionType); }五、而後即是綁定選舉端口,FastLeaderElection初始化:
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = new QuorumCnxManager(this); /* 綁定選舉端口,等待集羣其它機器鏈接 */ QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); //基於TCP的選舉算法 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; /* 業務層發送隊列,業務對象ToSend 業務層接收隊列,業務對象Notification */ sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); }在FastLeaderElection.java文件中:
Messenger(QuorumCnxManager manager) { this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true); }七、在進行選舉的過程當中,每臺zookeeper server服務器有如下四種狀態:LOOKING、FOLLOWING、LEADING、OBSERVING,其中出於OBSERVING狀態的server不參加投票過程,只有出於LOOKING狀態的機子才參加投票過程,一旦投票結束,server的狀態就會變成FOLLOWER或者LEADER。
下面先說一下leader選舉過程:ide
步驟1:對於處於LOOKING狀態的server來講,首先判斷一個被稱爲邏輯時鐘值(logicalclock),若是收到的logicalclock的值大於當前server自身的logicalclock值,說明這是更新的一次選舉,此時須要更新自身server的logicalclock值,而且將以前收到的來自其餘server的投票結果清空,而後判斷是否須要更新自身的投票,判斷的標準是先看epoch值的大小,而後再判斷zxid的大小,最後再看server id的大小(固然,針對這種狀況,server確定會更新自身的投票,由於當前server的epoch值小於收到的epoch值嘛),而後將自身的投票廣播給其餘server。函數
在FastLeaderElection.java文件中:this
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }步驟2:若是是自身的logicalclock值大於接收的logicalclock值,那麼就直接break;若是恰好相等, 就根據epoch、zxid以及server id來判斷是否須要更新,而後再把本身的投票廣播給其餘server,最後要把收到投票加入到當前server接收的投票隊伍中。
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
在FastLeaderElection.java文件的lookForLeader函數中:.net
case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); //清空以前收到的投票結果 recvset.clear(); //判斷是否須要更新自身投票 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); //廣播 sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } //加入投票隊伍 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
FastLeaderElection.java文件中;線程
//判斷投票是否結束 private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ for (Map.Entry<Long, Vote> entry : votes.entrySet()) { if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); } } return voteSet.hasAllQuorums(); }
//判讀投票是否結束 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader //再等一下子,看是否有新的投票 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ //若是沒有發生新的投票,則結束選舉過程 //設置自身狀態 if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } }
case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ //當前server與發送方server的logicalclock相同 if(n.electionEpoch == logicalclock.get()){ //加入到recvset中 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(termPredicate(recvset, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, n.electionEpoch)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } }
outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state)); if (termPredicate(outofelection, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, IGNOREVALUE)) { synchronized(this){ logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; }
參考博客:debug