Spring Cloud Alibaba Nacos(心跳與選舉)

經過閱讀NACOS的源碼,瞭解其心跳與選舉機制。開始閱讀此篇文章以前,建議先閱讀以下兩篇文章:html

Spring Cloud Alibaba Nacos(功能篇)java

Spring Cloud Alibaba Nacos(源碼篇)算法

1、心跳機制

只有NACOS服務與所註冊的Instance之間纔會有直接的心跳維持機制,換言之,這是一種典型的集中式管理機制。緩存

心跳機制

在client這一側是心跳的發起源,進入NacosNamingService,能夠發現,只有註冊服務實例的時候纔會構造心跳包:網絡

@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

沒有特殊狀況,目前ephemeral都是true。BeatReactor維護了一個Map對象,記錄了須要發送心跳的BeatInfo,構造了一個心跳包後,BeatReactor.addBeatInfo方法將BeatInfo放入Map中。而後,內部有一個定時器,每隔5秒發送一次心跳。dom

class BeatProcessor implements Runnable {

        @Override
        public void run() {
            try {
                for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {
                    BeatInfo beatInfo = entry.getValue();
                    if (beatInfo.isScheduled()) {
                        continue;
                    }
                    beatInfo.setScheduled(true);
                    executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
            } finally {
                executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
            }
        }
    }

經過設置scheduled的值來控制是否已經下發了心跳任務,具體的心跳任務邏輯放在了BeatTask。異步

class BeatTask implements Runnable {

        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        @Override
        public void run() {
            long result = serverProxy.sendBeat(beatInfo);
            beatInfo.setScheduled(false);
            if (result > 0) {
                clientBeatInterval = result;
            }
        }
    }

sendBeat就是請求了/instance/beat接口,只返回了一個心跳間隔時長,將這個返回值用於client設置定時任務間隔,同時將scheduled置爲false,表示完成了這次心跳發送任務,能夠進行下次心跳。async

NACOS接到心跳後,會有一段instance判空的邏輯,若是找不到對應的instance,就會直接建立出來,也就是默認相信心跳的請求源是合理的。分佈式

Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(), clientBeat.getIp(), clientBeat.getPort());
        if (instance == null) {
            instance = new Instance();
            instance.setPort(clientBeat.getPort());
            instance.setIp(clientBeat.getIp());
            instance.setWeight(clientBeat.getWeight());
            instance.setMetadata(clientBeat.getMetadata());
            instance.setClusterName(clusterName);
            instance.setServiceName(serviceName);
            instance.setInstanceId(instance.generateInstanceId());
            instance.setEphemeral(clientBeat.isEphemeral());
            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }
        Service service = serviceManager.getService(namespaceId, serviceName);
        if (service == null) {
            throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId);
        }
        service.processClientBeat(clientBeat);

對於client的心跳處理,放在了對應的Service裏面,處理心跳的代碼邏輯放在了ClientBeatProcessor:ide

@Override
    public void run() {
        Service service = this.service;
        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        List<Instance> instances = cluster.allIPs(true);
        for (Instance instance : instances) {
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {
                    if (!instance.isHealthy()) {
                        instance.setHealthy(true);
                        getPushService().serviceChanged(service.getNamespaceId(), this.service.getName());
                    }
                }
            }
        }
    }

邏輯很簡單,將集羣下全部ephemeral=true的實例找出來,而後根據ip和port匹配到對應的instance,而後記錄這次心態時間。

marked屬性暫時沒有發現有什麼用處,惟一調用過setMarked的地方是經過解析一段字符串來構建Instance,下劃線分割,能夠指定marked。

// 7 possible formats of config:
// ip:port
// ip:port_weight
// ip:port_weight_cluster
// ip:port_weight_valid
// ip:port_weight_valid_cluster
// ip:port_weight_valid_marked
// ip:port_weight_valid_marked_cluster

按照處理心跳的邏輯,若是marked=true的話,這個實例就不會處理ServiceChanged事件,狀態也就得不到改變了。

PushService在處理ServiceChanged事件的時候,主要作了兩件事情:

其一,根據上次記錄的心跳時間,判斷現有的實例在緩存的時效內(默認10s)是否有心跳發送過來,主要的調用方法:

public boolean zombie() {
            return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
        }

其2、是發送udp廣播通知全部的client,有instance發生了變動。

if (compressData != null) {
        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
    } else {
        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
        if (ackEntry != null) {
            cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
        }
    }
    udpPush(ackEntry);

最後再關注一個問題:NACOS怎麼處理長時間沒有發送心跳的服務實例?

相關代碼邏輯放在了PushService:

static {
        try {
            udpSocket = new DatagramSocket();
            Receiver receiver = new Receiver();
            Thread inThread = new Thread(receiver);
            inThread.setDaemon(true);
            inThread.setName("com.alibaba.nacos.naming.push.receiver");
            inThread.start();
            executorService.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        removeClientIfZombie();
                    } catch (Throwable e) {
                        Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
                    }
                }
            }, 0, 20, TimeUnit.SECONDS);
        } catch (SocketException e) {
            Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
        }
    }

能夠看到,又是熟悉的定時器,移除client的判斷依據仍然是zombie方法。

2、選舉機制

NACOS選舉機制的底層原理是RAFT共識算法,NACOS沒有依賴諸如zookeeper之類的第三方庫,而是自實現了一套RAFT算法。

相較於大名鼎鼎的Paxos算法,RAFT算法最突出的優點就是易於理解,學習起來很輕鬆。

在RAFT算法領域中,有三種基本的狀態(角色):followercandidateleader

處於follower狀態的server不會發起任何的request,只是被動的響應leader和candidate。

處於leader狀態的server會主動的發送心跳包給各個follower,而且接收client全部的request。

而candidate是一種過渡狀態,只有整個cluster在進行新的選舉的時候,纔會出現此種狀態的server。

RAFT state machine

還有一個重要的概念是term,能夠理解爲一個任意(隨機)的時間片斷,在這個時間段內實施選舉。

更多的RAFT算法知識不在此展開講述了,接下來進入源碼閱讀,核心部分在RaftCore類中。此類被註解爲@component,咱們從init()方法開始閱讀:

@PostConstruct
    public void init() throws Exception {
        executor.submit(notifier);
        long start = System.currentTimeMillis();
        datums = raftStore.loadDatums(notifier);
        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
        while (true) {
            if (notifier.tasks.size() <= 0) {
                break;
            }
            Thread.sleep(1000L);
        }
        initialized = true;
        GlobalExecutor.registerMasterElection(new MasterElection());
        GlobalExecutor.registerHeartbeat(new HeartBeat());
    }

咋一看,有一個比較扎眼的while(true)死循環,固然也會有跳出循環的條件。再細看loadDatums()方法,是在讀取本地緩存目錄,若是不爲空,就會調用notifier.addTask,這種狀況下就會致使死循環跳出條件得不到知足,而notifier內部又是一個死循環,調用了tasks.take()取出任務,若是沒有沒有任務可取了,就會阻塞於此,上述初始化方法中的死循環也就順利跳出了。

其實這一步操做是在利用Failover機制來同步本地的service和instance信息,與選舉機制無關。

最後兩行代碼纔是關鍵,分別啓動了MasterElection和HeartBeat兩個任務。看這部分代碼的時候,須要你有分佈式的思惟方式,能夠在腦海中假定有三個獨立部署的NACOS服務組成的cluster。

先來看選舉任務的定義:

public class MasterElection implements Runnable {
        @Override
        public void run() {
            try {
                if (!peers.isReady()) {
                    return;
                }
                RaftPeer local = peers.local();
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.leaderDueMs > 0) {
                    return;
                }
                // reset timeout
                local.resetLeaderDue();
                local.resetHeartbeatDue();
                sendVote();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while master election {}", e);
            }
        }

比較關鍵的是對時效性的處理。local.leaderDueMs的值一開始是隨機生成的,範圍是[0, 15000),單位是毫秒,此後按照500毫秒一個梯度進行遞減,減小到≤0後,就會觸發選舉操做。固然,選舉以前,把超時時間重置一下。resetLeaderDue()方法是把leaderDueMs變量從新賦值,可是並非像初始化隨機賦值同樣的邏輯,而是在15000毫秒的基礎上加上了一個隨機值,其隨機值的範圍是[0, 5000)毫秒。

接下來就是選舉方法:

public void sendVote() {
        RaftPeer local = peers.get(NetUtils.localServer());
        peers.reset();
        local.term.incrementAndGet();
        local.voteFor = local.ip;
        local.state = RaftPeer.State.CANDIDATE;
        Map<String, String> params = new HashMap<String, String>(1);
        params.put("vote", JSON.toJSONString(local));
        for (final String server : peers.allServersWithoutMySelf()) {
            final String url = buildURL(server, API_VOTE);
            try {
                HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                            return 1;
                        }
                        RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
                        peers.decideLeader(peer);
                        return 0;
                    }
                });
            } catch (Exception e) {
                Loggers.RAFT.warn("error while sending vote to server: {}", server);
            }
        }
    }

進入這個方法以後,會將自身的term自增1,爲本身投一票,狀態變成了candidate,而後將本身投票的結果發送給其餘NACOS服務。發送投票是若干次HTTP Request,由各自的RaftController來接收處理,最終調用的仍是RaftCore.receivedVote方法:

public RaftPeer receivedVote(RaftPeer remote) {
        if (!peers.contains(remote)) {
            throw new IllegalStateException("can not find peer: " + remote.ip);
        }
        RaftPeer local = peers.get(NetUtils.localServer());
        if (remote.term.get() <= local.term.get()) {
            if (StringUtils.isEmpty(local.voteFor)) {
                local.voteFor = local.ip;
            }
            return local;
        }
        local.resetLeaderDue();
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
        local.term.set(remote.term.get());
        return local;
    }

這裏面主要是對term的數值大小比較,若是一旦發現request請求過來的term比本身本地的term要大,那就放棄競選,本身轉爲follower的狀態,將term設置爲request中帶過來的term參數值;反之,就不作任何處理,直接返回local。

發送投票後,能夠同時拿到對方的投票結果,而後根據各方投票結果來計算最終哪臺server出於leader狀態。

public RaftPeer decideLeader(RaftPeer candidate) {
        peers.put(candidate.ip, candidate);
        SortedBag ips = new TreeBag();
        int maxApproveCount = 0;
        String maxApprovePeer = null;
        for (RaftPeer peer : peers.values()) {
            if (StringUtils.isEmpty(peer.voteFor)) {
                continue;
            }
            ips.add(peer.voteFor);
            if (ips.getCount(peer.voteFor) > maxApproveCount) {
                maxApproveCount = ips.getCount(peer.voteFor);
                maxApprovePeer = peer.voteFor;
            }
        }
        if (maxApproveCount >= majorityCount()) {
            RaftPeer peer = peers.get(maxApprovePeer);
            peer.state = RaftPeer.State.LEADER;
            if (!Objects.equals(leader, peer)) {
                leader = peer;
            }
        }
        return leader;
    }

由於發起投票的請求是異步進行的,進入decideLeader方法中並不意味着全部的candidate都完成了投票,因此會在for循環中忽略未投票的peer。接下來要通過兩個步驟,其一是計算出得票數最多的peer,其二是最多的得票數還必須超過整個集羣實例數的一半。這也就說明了服務實例爲何要是奇數個,而且是三個及以上。RAFT算法推薦的是5個實例。

而後就是由leader發送心跳包給各個follower。對於心跳時效性的處理邏輯和選舉的時候一模一樣,這裏就再也不贅述了。run方法裏面沒有調用resetLeaderDue()方法,而是推遲到了sendBeat()方法裏面進行調用了,達到的效果是同樣的,防止在沒必要要的時候發起了選舉。

public class HeartBeat implements Runnable {
        @Override
        public void run() {
            try {
                if (!peers.isReady()) {
                    return;
                }
                RaftPeer local = peers.local();
                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0) {
                    return;
                }
                local.resetHeartbeatDue();
                sendBeat();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
            }
        }

固然,follower在接收到心跳包以後,也會調用resetLeaderDue()方法,也就意味着,若是follower長時間收不到leader的心態,就認爲leader已經不可用了,隨機觸發選舉操做,選出新的leader。

最後來討論兩種可能發生的狀況,可能性微乎其微的奇葩場景就再也不討論了。

一、在一輪投票中,你們很湊巧的都投給了本身,那就等待下一輪投票開始吧!不可能每次都這麼湊巧。同時,NACOS選舉加入了時間的隨機性,若是發現不知足時間點的要求,就會放棄選舉,維持上一輪的term,最終確定是處於follower的狀態。

二、leader選舉成功後,由於網絡抖動,follower接收不到心跳包,將會從新發起選舉。從新選舉的過程當中,若是舊的leader恢復了,那就皆大歡喜,一切照常;若是沒有及時恢復過來,那就形成了雙leader的問題。不過NACOS在處理心跳包的時候會修正,當發現本身不是follower狀態,卻收到了心跳包的時候,會強制把本身的狀態變爲follower。若是湊巧兩個leader都將本身變爲了follower,也不要緊,心跳過時時間一到,立刻就能夠開始新的選舉流程了。

3、總結

本文深刻探討了NACOS的心跳和選舉機制,而且對可能遇到的狀況進行了進一步分析。總的來講,心跳機制是比較好理解的,而選舉機制則須要一些RAFT算法的基礎知識,加之目前NACOS源碼的註解甚少,若是對NACOS沒有必定的瞭解,閱讀起來仍是有些困難的。總之,把握總體,不要在乎太多細節。

掃描下方二維碼,進入原創乾貨,搞「技」聖地。
圖片描述

相關文章
相關標籤/搜索