深刻淺出Zookeeper(七):Leader選舉

本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 日期 備註
1.0 2020.6.14 文章首發

1. 前言

對於一個分佈式集羣來講,保證數據寫入一致性最簡單的方式就是依靠一個節點來調度和管理其餘節點。在分佈式中咱們通常稱其爲Leader。html

爲何是最簡單的方式呢?咱們想象一下,當咱們寫數據到Leader時,Leader寫入本身的一份數據後,可能會作副本到Follower,那麼拷貝的數量、及所在的位置都由該Leader來控制。但若是是多Leader調度,就要涉及到數據分區,請求負載均衡等問題了。

今天,筆者就和你們一塊兒來看看ZK的選舉流程。java

2. 選舉算法剖析

2.1 ZAB概述

這是一種典型的多數派算法,聽名字就知道是爲ZK而生了(Zookeeper Atomic Broadcast)。其Leader的選舉主要關心節點的ID和數據ID,這兩個屬性越大,則表示數據越新,優先成爲主。算法

2.2 選舉時機

常見由兩種場景觸發選舉,不管如何,至少得有兩臺ZK機器。apache

2.2.1 Startup觸發

咱們知道,每臺zk都須要配置不一樣的myid,而當剛開始時,zxid一定都爲0。這便意味着會挑選myid最大的zk節點做爲leader。服務器

2.2.2 Leader失聯觸發

zk節點每通過一次事務處理,都會更新zxid。那便意味着數據越新,zxid會越大。在這個選舉過程當中,會挑選出zxid的節點做爲leader。網絡

2.3 Zk選舉過程剖析(帶源碼分析)

核心方法爲org.apache.zookeeper.server.quorum.QuorumPeer.startLeaderElectionorg.apache.zookeeper.server.quorum.QuorumPeer.run,咱們的源碼分析也基於此展開。併發

2.3.1 Startup

咱們得從QuorumPeerMain來看,由於這是啓動的入口:app

/**
 *
 * <h2>Configuration file</h2>
 *
 * When the main() method of this class is used to start the program, the first
 * argument is used as a path to the config file, which will be used to obtain
 * configuration information. This file is a Properties file, so keys and
 * values are separated by equals (=) and the key/value pairs are separated
 * by new lines. The following is a general summary of keys used in the
 * configuration file. For full details on this see the documentation in
 * docs/index.html
 * <ol>
 * <li>dataDir - The directory where the ZooKeeper data is stored.</li>
 * <li>dataLogDir - The directory where the ZooKeeper transaction log is stored.</li>
 * <li>clientPort - The port used to communicate with clients.</li>
 * <li>tickTime - The duration of a tick in milliseconds. This is the basic
 * unit of time in ZooKeeper.</li>
 * <li>initLimit - The maximum number of ticks that a follower will wait to
 * initially synchronize with a leader.</li>
 * <li>syncLimit - The maximum number of ticks that a follower will wait for a
 * message (including heartbeats) from the leader.</li>
 * <li>server.<i>id</i> - This is the host:port[:port] that the server with the
 * given id will use for the quorum protocol.</li>
 * </ol>
 * In addition to the config file. There is a file in the data directory called
 * "myid" that contains the server id as an ASCII decimal value.
 *
 */
@InterfaceAudience.Public
public class QuorumPeerMain {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);

    private static final String USAGE = "Usage: QuorumPeerMain configfile";

    protected QuorumPeer quorumPeer;

    /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (DatadirException e) {
            LOG.error("Unable to access datadir, exiting abnormally", e);
            System.err.println("Unable to access datadir, exiting abnormally");
            System.exit(3);
        } catch (AdminServerException e) {
            LOG.error("Unable to start AdminServer, exiting abnormally", e);
            System.err.println("Unable to start AdminServer, exiting abnormally");
            System.exit(4);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }

    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        if (args.length == 1 && config.isDistributed()) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

    public void runFromConfig(QuorumPeerConfig config)
            throws IOException, AdminServerException
    {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer");
      try {
          ServerCnxnFactory cnxnFactory = null;
          ServerCnxnFactory secureCnxnFactory = null;

          if (config.getClientPortAddress() != null) {
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(),
                      false);
          }

          if (config.getSecureClientPortAddress() != null) {
              secureCnxnFactory = ServerCnxnFactory.createFactory();
              secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                      config.getMaxClientCnxns(),
                      true);
          }

          quorumPeer = getQuorumPeer();
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
          quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
          //quorumPeer.setQuorumPeers(config.getAllMembers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setConfigFileName(config.getConfigFilename());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
          if (config.getLastSeenQuorumVerifier()!=null) {
              quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
          }
          quorumPeer.initConfigInZKDatabase();
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
          quorumPeer.setSslQuorum(config.isSslQuorum());
          quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          if (config.sslQuorumReloadCertFiles) {
              quorumPeer.getX509Util().enableCertFileReloading();
          }

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }
          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();
          
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

    // @VisibleForTesting
    protected QuorumPeer getQuorumPeer() throws SaslException {
        return new QuorumPeer();
    }
}

咱們從QuorumPeerMain.main() -> main.initializeAndRun(args) -> runFromConfig -> quorumPeer.start(),繼續往下看QuorumPeer.java(這個類用於管理選舉相關的邏輯):負載均衡

@Override
    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        loadDataBase();
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        startLeaderElection();
        super.start();
    }

如今,咱們來到核心代碼startLeaderElectionless

synchronized public void startLeaderElection() {
       try {
           if (getPeerState() == ServerState.LOOKING) {
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }

       // if (!getView().containsKey(myid)) {
      //      throw new RuntimeException("My id " + myid + " not in the peer list");
        //}
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(getQuorumAddress().getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);
    }

邏輯很是的簡單,若是處於Looking狀態(服務器剛啓動時默認爲Looking),那麼就發起選舉的投票,並確認選舉算法(從3.4.0開始,只有FastLeaderElection選舉算法了),並將其發送出去。因爲代碼篇幅較大,這裏再也不粘出,感興趣的讀者能夠自行閱讀FastLeaderElection.Messenger.WorkerReceiver.run。其本質上就是一個線程,從存儲vote的隊列中取出vote,併發送。

在這裏普及一下服務器狀態:

  1. LOOKING:尋找Leader狀態。當服務器處於該狀態時,它認爲當前集羣中沒有Leader。
  2. FOLLOWING:跟隨者狀態,代表當前服務器角色Follower。
  3. LEADING:領導者狀態,代表當前服務器角色是Leader。
  4. OBSERVING:觀察者狀態,代表當前服務器是Observer。

接下來看QuorumPeer的相關核心代碼:

@Override
    public void run() {
        updateThreadName();

        LOG.debug("Starting quorum peer");
        try {
            jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for(QuorumServer s: getView().values()){
                ZKMBeanInfo p;
                if (getId() == s.id) {
                    p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                        jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }

        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk =
                            new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
    
                        // Instead of starting roZk immediately, wait some grace
                        // period before we decide we're partitioned.
                        //
                        // Thread is used here because otherwise it would require
                        // changes in each of election strategy classes which is
                        // unnecessary code coupling.
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                           reconfigFlagClear();
                            if (shuttingDownLE) {
                               shuttingDownLE = false;
                               startLeaderElection();
                               }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }                        
                    }
                    break;

在這裏僅僅截取了Looking的相關邏輯,上半段的if主要處理只讀服務——其用於handle只讀client。else邏輯則是常見的狀況,可是從代碼塊:

reconfigFlagClear();
                            if (shuttingDownLE) {
                               shuttingDownLE = false;
                               startLeaderElection();
                               }
                            setCurrentVote(makeLEStrategy().lookForLeader());

其實區別不大。接着來看lookForLeader,爲了篇幅,咱們只截取Looking相關的代碼:

/**
     * Starts a new round of leader election. Whenever our QuorumPeer
     * changes its state to LOOKING, this method is invoked, and it
     * sends notifications to all other peers.
     */
    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = Time.currentElapsedTime();
        }
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                logicalclock.incrementAndGet();
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

註釋說的很清楚,這個方法會開啓新的一輪選舉:當咱們的服務器狀態變爲Looking,這個方法會被調用,被通知集羣其餘須要參與選舉的服務器。那麼在這段邏輯中,recvqueue會存放着相關的選舉通知信息,取出一個。接下來有兩個邏輯分支:

  1. 爲空。想辦法通知其餘服務器。
  2. 有效的投票(即你們的選舉輪次都是統一論次),那麼便進行選票PK。

咱們來看totalOrderPredicate這個方法:

/**
     * Check if a pair (server id, zxid) succeeds our
     * current vote.
     *
     * @param id    Server identifier
     * @param zxid  Last zxid observed by the issuer of this vote
     */
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */

        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

理一下邏輯:

  1. 若是新的輪次大於內部投票輪次,則須要進行投票變動
  2. 若是選舉輪次一致,並外部投票的ZXID大於內部投票的,則須要變動
  3. 若是選舉輪次一致,並外部投票的SID大於內部投票的,則須要變動

通過這個邏輯,即可以肯定外部投票優於內部投票——即更適合成爲Leader。這時便會把外部選票信息來覆蓋內部投票,併發送出去:

case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();

接下來就會判斷集羣中是否有過半的服務器承認該投票。

/**
     * Termination predicate. Given a set of votes, determines if have
     * sufficient to declare the end of the election round.
     * 
     * @param votes
     *            Set of votes
     * @param vote
     *            Identifier of the vote received last
     */
    protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        /*
         * First make the views consistent. Sometimes peers will have different
         * zxids for a server depending on timing.
         */
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                voteSet.addAck(entry.getKey());
            }
        }

        return voteSet.hasAllQuorums(); //是否超過一半
    }

不然的話會繼續收集選票。

接下來即是更新服務器狀態。

/*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, logicalclock.get(), 
                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }

2.3.2 Leader失聯

上文咱們提到了QuorumPeer.java,裏面有個main loop,不一樣的角色會在這個loop下作本身的事。直到退出。在這裏,咱們以Follower爲例,進行分析:

case FOLLOWING:
                    try {
                       LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }
                    break;

follower.followLeader()

/**
     * the main method called by the follower to follow the leader
     *
     * @throws InterruptedException
     */
    void followLeader() throws InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
                QuorumPeer.FLE_TIME_UNIT);
        self.start_fle = 0;
        self.end_fle = 0;
        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
        try {
            QuorumServer leaderServer = findLeader();            
            try {
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange())
                   throw new Exception("learned about role change");
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                try {
                    sock.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        }
    }

跳往核心方法processPacket

/**
     * Examine the packet received in qp and dispatch based on its contents.
     * @param qp
     * @throws IOException
     */
    protected void processPacket(QuorumPacket qp) throws Exception{
        switch (qp.getType()) {
        case Leader.PING:            
            ping(qp);            
            break;
        case Leader.PROPOSAL:           
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            
            if (hdr.getType() == OpCode.reconfig){
               SetDataTxn setDataTxn = (SetDataTxn) txn;       
               QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
               self.setLastSeenQuorumVerifier(qv, true);                               
            }
            
            fzk.logRequest(hdr, txn);
            break;
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break;
            
        case Leader.COMMITANDACTIVATE:
           // get the new configuration from the request
           Request request = fzk.pendingTxns.element();
           SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();                                                                                                      
           QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));                                
 
           // get new designated leader from (current) leader's message
           ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
           long suggestedLeaderId = buffer.getLong();
            boolean majorChange = 
                   self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
           // commit (writes the new config to ZK tree (/zookeeper/config)                     
           fzk.commit(qp.getZxid());
            if (majorChange) {
               throw new Exception("changes proposed in reconfig");
           }
           break;
        case Leader.UPTODATE:
            LOG.error("Received an UPTODATE message after Follower started");
            break;
        case Leader.REVALIDATE:
            revalidate(qp);
            break;
        case Leader.SYNC:
            fzk.sync();
            break;
        default:
            LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
            break;
        }
    }

case COMMITANDACTIVATE中,咱們能夠看到當其收到leader改變相關的消息時,就會拋出異常。接下來它本身就會變成LOOKING狀態,開始選舉。

那麼如何肯定leader不可用呢?答案是經過心跳指令。在必定時間內若是leader的心跳沒有過來,那麼則認爲其已經不可用。

LeanerHandler.run裏的case Leader.PING

case Leader.PING:
                    // Process the touches
                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
                            .getData());
                    DataInputStream dis = new DataInputStream(bis);
                    while (dis.available() > 0) {
                        long sess = dis.readLong();
                        int to = dis.readInt();
                        leader.zk.touch(sess, to);
                    }
                    break;

3. 其餘常見選舉算法

首先,咱們要知道。選舉算法的本質是共識算法,而絕大多數共識算法就是爲了解決分佈式環境下數據一致性而誕生的。而zk裏所謂leader、follower之類的,無非也是個狀態,基於zk這個語義下(上下文裏)你們都認爲一個leader是leader,纔是有效的共識。

常見的共識算法都有哪些呢?現階段的共識算法主要能夠分紅三大類:公鏈,聯盟鏈和私鏈。下面描述這三種類別的特徵:

  • 私鏈:私鏈的共識算法即區塊鏈這個概念還沒普及時的傳統分佈式系統裏的共識算法,好比 zookeeper 的 zab 協議,就是類 paxos 算法的一種。私鏈的適用環境通常是不考慮集羣中存在做惡節點,只考慮由於系統或者網絡緣由致使的故障節點。
  • 聯盟鏈:聯盟鏈中,經典的表明項目是 Hyperledger 組織下的 Fabric 項目, Fabric0.6 版本使用的就是 pbft 算法。聯盟鏈的適用環境除了須要考慮集羣中存在故障節點,還須要考慮集羣中存在做惡節點。對於聯盟鏈,每一個新加入的節點都是須要驗證和審覈的。
  • 公鏈:公鏈不斷須要考慮網絡中存在故障節點,還須要考慮做惡節點,這一點和聯盟鏈是相似的。和聯盟鏈最大的區別就是,公鏈中的節點能夠很自由的加入或者退出,不須要嚴格的驗證和審覈。
copy from https://zhuanlan.zhihu.com/p/...;做者:美圖技術團隊

基於篇幅,接下來簡單介紹下兩個較爲典型的共識算法。

3.1 Raft

Raft 算法是典型的多數派投票選舉算法,其選舉機制與咱們平常生活中的民主投票機制相似,核心思想是「少數服從多數」。也就是說,Raft 算法中,得到投票最多的節點成爲主。

採用 Raft 算法選舉,集羣節點的角色有 3 種:

  • Leader,即主節點,同一時刻只有一個 Leader,負責協調和管理其餘節點;- Candidate,即候選者,每個節點均可以成爲 Candidate,節點在該角色下才能夠被選爲新的 Leader;
  • Follower,Leader 的跟隨者,不能夠發起選舉。

Raft 選舉的流程,能夠分爲如下幾步:

  1. 初始化時,全部節點均爲 Follower 狀態。
  2. 開始選主時,全部節點的狀態由 Follower 轉化爲 Candidate,並向其餘節點發送選舉請求。
  3. 其餘節點根據接收到的選舉請求的前後順序,回覆是否贊成成爲主。這裏須要注意的是,在每一輪選舉中,一個節點只能投出一張票。
  4. 若發起選舉請求的節點得到超過一半的投票,則成爲主節點,其狀態轉化爲 Leader,其餘節點的狀態則由 Candidate 降爲 Follower。Leader 節點與 Follower 節點之間會按期發送心跳包,以檢測主節點是否活着。
  5. 當 Leader 節點的任期到了,即發現其餘服務器開始下一輪選主週期時(或主節點掛了),Leader 節點的狀態由 Leader 降級爲 Follower,進入新一輪選主。

這個算法比起ZAB,較易實現,但因爲消息通訊量大,相比於ZAB,更適用於中小的場景。

3.2 Pow

PoW 算法,是以每一個節點或服務器的計算能力(即「算力」)來競爭記帳權的機制,所以是一種使用工做量證實機制的共識算法。也就是說,誰的算力強(解題快),誰得到記帳權的可能性就越大。

好比發生一次交易,同時有三個節點(A、B、C)都收到了這個記帳請求。A節點已經算出來了,那麼就會通知BC節點進行驗證——這是一種橢圓曲線加密算法,解題的速度會比驗證的速度慢不少。當全部節點驗證後,這個記帳就記下來了。

聽起來很公平。但PoW 機制每次達成共識須要全網共同參與運算,增長了每一個節點的計算量,而且若是題目過難,會致使計算時間長、資源消耗多 ;而若是題目過於簡單,會致使大量節點同時得到記帳權,衝突多。這些問題,都會增長達成共識的時間。

4. 小結

在本文,咱們先提到了zookeeper的leader選舉,大體流程以下:

4.1 服務器啓動時選舉

  1. 每一個Server會發出一個投票
  2. 接受來每一個Server的投票
  3. 處理投票(對比ZXID和myid)
  4. 統計投票,直到超過半數的機器收到相同的投票信息
  5. 更改服務器角色

4.2 服務器運行期間選舉

服務器啓動時選舉很是的像,無非就是多了一個狀態變動——當Leader掛了,餘下的Follower都會將本身的服務器狀態變動爲LOOKING,而後進入選舉流程。

4.3 一致性算法和共識算法

咱們還提到了一致性算法和共識算法的概念,那麼一致性與共識的區別是什麼呢?在日常使用中,咱們一般會混淆一致性和共識這兩個概念,不妨在這兒說清:

  • 一致性:分佈式系統中的多個節點之間,給定一系列的操做,在約定協議的保障下,對外界呈現的數據或狀態是一致的。
  • 共識:分佈式系統中多個節點之間,彼此對某個狀態達成一致結果的過程。

即:一致性強調的是結果,共識強調的是達成一致的過程,共識算法是保障系統知足不一樣程度一致性的核心技術。

相關文章
相關標籤/搜索