zkServer.sh 中的main函數: org.apache.zookeeper.server.quorum.QuorumPeerMain;java
zkCli.sh 中的main函數: org.apache.zookeeper.ZooKeeperMain;算法
key:推送過來的服務實例 myid,表明哪一個服務實例的選票; apache
Value: Vote對象對應Notification, 標識該服務器推舉的選票信息:
版本默認爲 version = 0x0
leader的myid: leader,
leader的zxid: zxid,
leader的epoch: peerEpoch,
發送者的logicalclock: electionEpoch服務器
選票來自OBSERVING,拋棄;ide
選票來自FOLLOWING/LEADING:函數
斷定 外票的選舉輪次electionEpoch與本身自己的logicalclockthis
當一個新啓動的節點加入集羣時,它對集羣內其餘節點發出投票請求,而其餘節點已不處於LOOKING狀態,此時其餘節點回應選舉結果,該節點收集這些結果到outofelection中,最終在收到合法LEADER消息且這些選票也構成選舉結束條件時,該節點就結束本身的選舉行爲。注意到代碼中會logicalclock = n.electionEpoch;更新選舉輪數。spa
選票來自LOOKING:
比較 選票發送方的選舉序列electionEpoch 與 自己時序logicalclockdebug
將外票保存在選票集合<recvset>中,若是更新以後的本地推薦選票知足集羣配置的有效策略(過半有效) 則 從隊列recvqueue中循環獲取poll,比較:code
處理完以後recvqueue沒有了對象, 說明接收選票完結了,當前更新以後的本地推薦選票就是leader。肯定本身的角色,清空接收隊列,返回該選票。
switch (n.state) { case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock) { logicalclock = n.electionEpoch; recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock)); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if(n.electionEpoch == logicalclock){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; }