經過閱讀NACOS的源碼,瞭解其心跳與選舉機制。開始閱讀此篇文章以前,建議先閱讀以下兩篇文章:html
Spring Cloud Alibaba Nacos(功能篇)java
Spring Cloud Alibaba Nacos(源碼篇)算法
只有NACOS服務與所註冊的Instance之間纔會有直接的心跳維持機制,換言之,這是一種典型的集中式管理機制。緩存
在client這一側是心跳的發起源,進入NacosNamingService,能夠發現,只有註冊服務實例的時候纔會構造心跳包:網絡
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
沒有特殊狀況,目前ephemeral都是true。BeatReactor維護了一個Map對象,記錄了須要發送心跳的BeatInfo,構造了一個心跳包後,BeatReactor.addBeatInfo方法將BeatInfo放入Map中。而後,內部有一個定時器,每隔5秒發送一次心跳。dom
class BeatProcessor implements Runnable { @Override public void run() { try { for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) { BeatInfo beatInfo = entry.getValue(); if (beatInfo.isScheduled()) { continue; } beatInfo.setScheduled(true); executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); } } catch (Exception e) { NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e); } finally { executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS); } } }
經過設置scheduled的值來控制是否已經下發了心跳任務,具體的心跳任務邏輯放在了BeatTask。異步
class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { long result = serverProxy.sendBeat(beatInfo); beatInfo.setScheduled(false); if (result > 0) { clientBeatInterval = result; } } }
sendBeat就是請求了/instance/beat接口,只返回了一個心跳間隔時長,將這個返回值用於client設置定時任務間隔,同時將scheduled置爲false,表示完成了這次心跳發送任務,能夠進行下次心跳。async
NACOS接到心跳後,會有一段instance判空的邏輯,若是找不到對應的instance,就會直接建立出來,也就是默認相信心跳的請求源是合理的。分佈式
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(), clientBeat.getIp(), clientBeat.getPort()); if (instance == null) { instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.generateInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); serviceManager.registerInstance(namespaceId, serviceName, instance); } Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } service.processClientBeat(clientBeat);
對於client的心跳處理,放在了對應的Service裏面,處理心跳的代碼邏輯放在了ClientBeatProcessor:ide
@Override public void run() { Service service = this.service; String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); getPushService().serviceChanged(service.getNamespaceId(), this.service.getName()); } } } } }
邏輯很簡單,將集羣下全部ephemeral=true的實例找出來,而後根據ip和port匹配到對應的instance,而後記錄這次心態時間。
marked屬性暫時沒有發現有什麼用處,惟一調用過setMarked的地方是經過解析一段字符串來構建Instance,下劃線分割,能夠指定marked。
// 7 possible formats of config: // ip:port // ip:port_weight // ip:port_weight_cluster // ip:port_weight_valid // ip:port_weight_valid_cluster // ip:port_weight_valid_marked // ip:port_weight_valid_marked_cluster
按照處理心跳的邏輯,若是marked=true的話,這個實例就不會處理ServiceChanged事件,狀態也就得不到改變了。
PushService在處理ServiceChanged事件的時候,主要作了兩件事情:
其一,根據上次記錄的心跳時間,判斷現有的實例在緩存的時效內(默認10s)是否有心跳發送過來,主要的調用方法:
public boolean zombie() { return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName); }
其2、是發送udp廣播通知全部的client,有instance發生了變動。
if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } udpPush(ackEntry);
最後再關注一個問題:NACOS怎麼處理長時間沒有發送心跳的服務實例?
相關代碼邏輯放在了PushService:
static { try { udpSocket = new DatagramSocket(); Receiver receiver = new Receiver(); Thread inThread = new Thread(receiver); inThread.setDaemon(true); inThread.setName("com.alibaba.nacos.naming.push.receiver"); inThread.start(); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { removeClientIfZombie(); } catch (Throwable e) { Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie"); } } }, 0, 20, TimeUnit.SECONDS); } catch (SocketException e) { Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service"); } }
能夠看到,又是熟悉的定時器,移除client的判斷依據仍然是zombie方法。
NACOS選舉機制的底層原理是RAFT共識算法,NACOS沒有依賴諸如zookeeper之類的第三方庫,而是自實現了一套RAFT算法。
相較於大名鼎鼎的Paxos算法,RAFT算法最突出的優點就是易於理解,學習起來很輕鬆。
在RAFT算法領域中,有三種基本的狀態(角色):follower、candidate、leader。
處於follower狀態的server不會發起任何的request,只是被動的響應leader和candidate。
處於leader狀態的server會主動的發送心跳包給各個follower,而且接收client全部的request。
而candidate是一種過渡狀態,只有整個cluster在進行新的選舉的時候,纔會出現此種狀態的server。
還有一個重要的概念是term,能夠理解爲一個任意(隨機)的時間片斷,在這個時間段內實施選舉。
更多的RAFT算法知識不在此展開講述了,接下來進入源碼閱讀,核心部分在RaftCore類中。此類被註解爲@component,咱們從init()方法開始閱讀:
@PostConstruct public void init() throws Exception { executor.submit(notifier); long start = System.currentTimeMillis(); datums = raftStore.loadDatums(notifier); setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); while (true) { if (notifier.tasks.size() <= 0) { break; } Thread.sleep(1000L); } initialized = true; GlobalExecutor.registerMasterElection(new MasterElection()); GlobalExecutor.registerHeartbeat(new HeartBeat()); }
咋一看,有一個比較扎眼的while(true)死循環,固然也會有跳出循環的條件。再細看loadDatums()方法,是在讀取本地緩存目錄,若是不爲空,就會調用notifier.addTask,這種狀況下就會致使死循環跳出條件得不到知足,而notifier內部又是一個死循環,調用了tasks.take()取出任務,若是沒有沒有任務可取了,就會阻塞於此,上述初始化方法中的死循環也就順利跳出了。
其實這一步操做是在利用Failover機制來同步本地的service和instance信息,與選舉機制無關。
最後兩行代碼纔是關鍵,分別啓動了MasterElection和HeartBeat兩個任務。看這部分代碼的時候,須要你有分佈式的思惟方式,能夠在腦海中假定有三個獨立部署的NACOS服務組成的cluster。
先來看選舉任務的定義:
public class MasterElection implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0) { return; } // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } }
比較關鍵的是對時效性的處理。local.leaderDueMs的值一開始是隨機生成的,範圍是[0, 15000),單位是毫秒,此後按照500毫秒一個梯度進行遞減,減小到≤0後,就會觸發選舉操做。固然,選舉以前,把超時時間重置一下。resetLeaderDue()方法是把leaderDueMs變量從新賦值,可是並非像初始化隨機賦值同樣的邏輯,而是在15000毫秒的基礎上加上了一個隨機值,其隨機值的範圍是[0, 5000)毫秒。
接下來就是選舉方法:
public void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); peers.reset(); local.term.incrementAndGet(); local.voteFor = local.ip; local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<String, String>(1); params.put("vote", JSON.toJSONString(local)); for (final String server : peers.allServersWithoutMySelf()) { final String url = buildURL(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { return 1; } RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class); peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } }
進入這個方法以後,會將自身的term自增1,爲本身投一票,狀態變成了candidate,而後將本身投票的結果發送給其餘NACOS服務。發送投票是若干次HTTP Request,由各自的RaftController來接收處理,最終調用的仍是RaftCore.receivedVote方法:
public RaftPeer receivedVote(RaftPeer remote) { if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } RaftPeer local = peers.get(NetUtils.localServer()); if (remote.term.get() <= local.term.get()) { if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } local.resetLeaderDue(); local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); return local; }
這裏面主要是對term的數值大小比較,若是一旦發現request請求過來的term比本身本地的term要大,那就放棄競選,本身轉爲follower的狀態,將term設置爲request中帶過來的term參數值;反之,就不作任何處理,直接返回local。
發送投票後,能夠同時拿到對方的投票結果,而後根據各方投票結果來計算最終哪臺server出於leader狀態。
public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; } } return leader; }
由於發起投票的請求是異步進行的,進入decideLeader方法中並不意味着全部的candidate都完成了投票,因此會在for循環中忽略未投票的peer。接下來要通過兩個步驟,其一是計算出得票數最多的peer,其二是最多的得票數還必須超過整個集羣實例數的一半。這也就說明了服務實例爲何要是奇數個,而且是三個及以上。RAFT算法推薦的是5個實例。
而後就是由leader發送心跳包給各個follower。對於心跳時效性的處理邏輯和選舉的時候一模一樣,這裏就再也不贅述了。run方法裏面沒有調用resetLeaderDue()方法,而是推遲到了sendBeat()方法裏面進行調用了,達到的效果是同樣的,防止在沒必要要的時候發起了選舉。
public class HeartBeat implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.heartbeatDueMs > 0) { return; } local.resetHeartbeatDue(); sendBeat(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while sending beat {}", e); } }
固然,follower在接收到心跳包以後,也會調用resetLeaderDue()方法,也就意味着,若是follower長時間收不到leader的心態,就認爲leader已經不可用了,隨機觸發選舉操做,選出新的leader。
最後來討論兩種可能發生的狀況,可能性微乎其微的奇葩場景就再也不討論了。
一、在一輪投票中,你們很湊巧的都投給了本身,那就等待下一輪投票開始吧!不可能每次都這麼湊巧。同時,NACOS選舉加入了時間的隨機性,若是發現不知足時間點的要求,就會放棄選舉,維持上一輪的term,最終確定是處於follower的狀態。
二、leader選舉成功後,由於網絡抖動,follower接收不到心跳包,將會從新發起選舉。從新選舉的過程當中,若是舊的leader恢復了,那就皆大歡喜,一切照常;若是沒有及時恢復過來,那就形成了雙leader的問題。不過NACOS在處理心跳包的時候會修正,當發現本身不是follower狀態,卻收到了心跳包的時候,會強制把本身的狀態變爲follower。若是湊巧兩個leader都將本身變爲了follower,也不要緊,心跳過時時間一到,立刻就能夠開始新的選舉流程了。
本文深刻探討了NACOS的心跳和選舉機制,而且對可能遇到的狀況進行了進一步分析。總的來講,心跳機制是比較好理解的,而選舉機制則須要一些RAFT算法的基礎知識,加之目前NACOS源碼的註解甚少,若是對NACOS沒有必定的瞭解,閱讀起來仍是有些困難的。總之,把握總體,不要在乎太多細節。
掃描下方二維碼,進入原創乾貨,搞「技」聖地。