學習不用那麼功利,二師兄帶你從更高維度輕鬆閱讀源碼~
提及Nacos的服務訂閱機制,對此不瞭解的朋友,可能感受很是神祕,這篇文章就你們深刻淺出的瞭解一下Nacos 2.0客戶端的訂閱實現。因爲涉及到的內容比較多,就分幾篇來說,本篇爲第一篇。segmentfault
Nacos的訂閱機制,若是用一句話來描述就是:Nacos客戶端經過一個定時任務,每6秒從註冊中心獲取實例列表,當發現實例發生變化時,發佈變動事件,訂閱者進行業務處理。該更新實例的更新實例,該更新本地緩存的更新本地緩存。緩存
上圖畫出了訂閱方法的主線流程,涉及的內容較多,處理細節複雜。這裏只用把握住核心部分便可。下面就經過代碼和流程圖來逐步分析上述過程。服務器
咱們這裏聊的訂閱機制,其實本質上就是服務發現的準實時感知。上面已經看到了當執行訂閱方法時,會觸發定時任務,定時去拉服務器端的數據。因此,本質上,訂閱機制就是實現服務發現的一種方式,對照的方式就是直接查詢接口了。微信
NacosNamingService中暴露的許多重載的subscribe,重載的目的就是讓你們少寫一些參數,這些參數呢,Nacos給默認處理了。最終這些重載方法都會調用到下面這個方法:ide
// NacosNamingService public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { if (null == listener) { return; } String clusterString = StringUtils.join(clusters, ","); changeNotifier.registerListener(groupName, serviceName, clusterString, listener); clientProxy.subscribe(serviceName, groupName, clusterString); }
方法中的事件監聽咱們暫時不聊,直接看subscribe方法,這裏clientProxy類型爲NamingClientProxyDelegate。實例化NacosNamingService時該類被實例化,前面章節中已經講到,再也不贅述。學習
而clientProxy.subscribe方法在NamingClientProxyDelegate中實現:this
// NamingClientProxyDelegate @Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); // 獲取緩存中的ServiceInfo ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (null == result) { // 若是爲null,則進行訂閱邏輯處理,基於gRPC協議 result = grpcClientProxy.subscribe(serviceName, groupName, clusters); } // 定時調度UpdateTask serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); // ServiceInfo本地緩存處理 serviceInfoHolder.processServiceInfo(result); return result; }
這段方法是否是眼熟啊?對的,在前面分析《Nacos Client服務發現》時咱們已經講過了。看來異曲同工,查詢服務列表和訂閱最終都調用了同一個方法。url
上篇講了其餘流程,咱們這裏重點看任務調度:spa
// ServiceInfoUpdateService public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) { String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); if (futureMap.get(serviceKey) != null) { return; } synchronized (futureMap) { if (futureMap.get(serviceKey) != null) { return; } // 構建UpdateTask ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters)); futureMap.put(serviceKey, future); } }
該方法包含了構建serviceKey、經過serviceKey判重,最後添加UpdateTask。代理
而其中的addTask的實現就是發起了一個定時任務:
// ServiceInfoUpdateService private synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); }
定時任務延時1秒執行。
跟蹤到這裏就告一階段了。核心功能只有兩個:調用訂閱方法和發起定時任務。
UpdateTask封裝了訂閱機制的核心業務邏輯,先來經過一張流程圖看一下都作了啥。
有了上述流程圖,基本就很清晰的瞭解UpdateTask所作的事情了。直接貼出run方法的全部代碼:
public void run() { long delayTime = DEFAULT_DELAY; try { // 判斷該註冊的Service是否被訂閱,若是沒有訂閱則再也不執行 if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) { NAMING_LOGGER .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters); return; } // 獲取緩存的service信息 ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (serviceObj == null) { // 根據serviceName從註冊中心服務端獲取Service信息 serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); serviceInfoHolder.processServiceInfo(serviceObj); lastRefTime = serviceObj.getLastRefTime(); return; } // 過時服務(服務的最新更新時間小於等於緩存刷新時間),從註冊中心從新查詢 if (serviceObj.getLastRefTime() <= lastRefTime) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); // 處理Service消息 serviceInfoHolder.processServiceInfo(serviceObj); } // 刷新更新時間 lastRefTime = serviceObj.getLastRefTime(); if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } // 下次更新緩存時間設置,默認爲6秒 // TODO multiple time can be configured. delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; // 重置失敗數量爲0 resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e); } finally { // 下次調度刷新時間,下次執行的時間與failCount有關 // failCount=0,則下次調度時間爲6秒,最長爲1分鐘 // 即當無異常狀況下緩存實例的刷新時間是6秒 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }
首先在判斷服務是不是被訂閱過,實現方法是ChangeNotifier#isSubscribed:
public boolean isSubscribed(String groupName, String serviceName, String clusters) { String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); return CollectionUtils.isNotEmpty(eventListeners); }
查看該方法的源碼會發現,這裏的listenerMap正是最開始的subscribe方法中registerListener註冊的EventListener。
run方法後面的業務處理基本上都雷同了,先判斷緩存是否有ServiceInfo信息,若是沒有則查詢註冊中心、處理ServiceInfo、更新上次處理時間。
而下面判斷ServiceInfo是否失效,正是經過「上次更新時間」與當前ServiceInfo中的「上次更新時間」作比較來判斷。若是失效,也會查詢註冊中心、處理ServiceInfo、更新上次處理時間等一系列操做。
業務邏輯最後會計算下一次定時任務的執行時間,經過delayTime來延遲執行。delayTime默認爲 1000L * 6,也就是6秒。而在finally裏面真的發起下一次定時任務。當出現異常時,下次執行的時間與失敗次數有關,但最長不超過1分鐘。
這一篇咱們講了Nacos客戶端服務訂閱機制的源碼,主要有如下步驟:
第一步:訂閱方法的調用,並進行EventListener的註冊,後面UpdateTask要用來進行判斷;
第二步:經過委託代理類來處理訂閱邏輯,此處與獲取實例列表方法使用了同一個方法;
第三步:經過定時任務執行UpdateTask方法,默認執行間隔爲6秒,當發生異常時會延長,但不超過1分鐘;
第四步:UpdateTask方法中會比較本地是否存在緩存,緩存是否過時。當不存在或過時時,查詢註冊中心,獲取最新實例,更新最後獲取時間,處理ServiceInfo。
第五步:從新計算定時任務時間,循環執行上述流程。
下一篇,咱們會在此基礎上繼續講解ServiceInfoHolder#processServiceInfo方法中是如何對獲取到的實例信息進行處理的。
博主簡介:《SpringBoot技術內幕》技術圖書做者,酷愛鑽研技術,寫技術乾貨文章。
公衆號:「程序新視界」,博主的公衆號,歡迎關注~
技術交流:請聯繫博主微信號:zhuan2quan