04篇 Nacos Client服務訂閱機制之【核心流程】

學習不用那麼功利,二師兄帶你從更高維度輕鬆閱讀源碼~

提及Nacos的服務訂閱機制,對此不瞭解的朋友,可能感受很是神祕,這篇文章就你們深刻淺出的瞭解一下Nacos 2.0客戶端的訂閱實現。因爲涉及到的內容比較多,就分幾篇來說,本篇爲第一篇。segmentfault

Nacos訂閱概述

Nacos的訂閱機制,若是用一句話來描述就是:Nacos客戶端經過一個定時任務,每6秒從註冊中心獲取實例列表,當發現實例發生變化時,發佈變動事件,訂閱者進行業務處理。該更新實例的更新實例,該更新本地緩存的更新本地緩存。緩存

nacos

上圖畫出了訂閱方法的主線流程,涉及的內容較多,處理細節複雜。這裏只用把握住核心部分便可。下面就經過代碼和流程圖來逐步分析上述過程。服務器

從訂閱到定時任務開啓

咱們這裏聊的訂閱機制,其實本質上就是服務發現的準實時感知。上面已經看到了當執行訂閱方法時,會觸發定時任務,定時去拉服務器端的數據。因此,本質上,訂閱機制就是實現服務發現的一種方式,對照的方式就是直接查詢接口了。微信

NacosNamingService中暴露的許多重載的subscribe,重載的目的就是讓你們少寫一些參數,這些參數呢,Nacos給默認處理了。最終這些重載方法都會調用到下面這個方法:ide

// NacosNamingService
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

方法中的事件監聽咱們暫時不聊,直接看subscribe方法,這裏clientProxy類型爲NamingClientProxyDelegate。實例化NacosNamingService時該類被實例化,前面章節中已經講到,再也不贅述。學習

而clientProxy.subscribe方法在NamingClientProxyDelegate中實現:this

// NamingClientProxyDelegate
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 獲取緩存中的ServiceInfo
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        // 若是爲null,則進行訂閱邏輯處理,基於gRPC協議
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // 定時調度UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // ServiceInfo本地緩存處理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

這段方法是否是眼熟啊?對的,在前面分析《Nacos Client服務發現》時咱們已經講過了。看來異曲同工,查詢服務列表和訂閱最終都調用了同一個方法。url

上篇講了其餘流程,咱們這裏重點看任務調度:spa

// ServiceInfoUpdateService
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        // 構建UpdateTask
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

該方法包含了構建serviceKey、經過serviceKey判重,最後添加UpdateTask。代理

而其中的addTask的實現就是發起了一個定時任務:

// ServiceInfoUpdateService
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

定時任務延時1秒執行。

跟蹤到這裏就告一階段了。核心功能只有兩個:調用訂閱方法和發起定時任務。

定時任務都幹了啥

UpdateTask封裝了訂閱機制的核心業務邏輯,先來經過一張流程圖看一下都作了啥。

nacos

有了上述流程圖,基本就很清晰的瞭解UpdateTask所作的事情了。直接貼出run方法的全部代碼:

public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        // 判斷該註冊的Service是否被訂閱,若是沒有訂閱則再也不執行
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
            NAMING_LOGGER
                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
            return;
        }

        // 獲取緩存的service信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            // 根據serviceName從註冊中心服務端獲取Service信息
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        // 過時服務(服務的最新更新時間小於等於緩存刷新時間),從註冊中心從新查詢
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 處理Service消息
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        // 刷新更新時間
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // 下次更新緩存時間設置,默認爲6秒
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 重置失敗數量爲0
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
    } finally {
        // 下次調度刷新時間,下次執行的時間與failCount有關
        // failCount=0,則下次調度時間爲6秒,最長爲1分鐘
        // 即當無異常狀況下緩存實例的刷新時間是6秒
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

首先在判斷服務是不是被訂閱過,實現方法是ChangeNotifier#isSubscribed:

public boolean isSubscribed(String groupName, String serviceName, String clusters) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    return CollectionUtils.isNotEmpty(eventListeners);
}

查看該方法的源碼會發現,這裏的listenerMap正是最開始的subscribe方法中registerListener註冊的EventListener。

run方法後面的業務處理基本上都雷同了,先判斷緩存是否有ServiceInfo信息,若是沒有則查詢註冊中心、處理ServiceInfo、更新上次處理時間。

而下面判斷ServiceInfo是否失效,正是經過「上次更新時間」與當前ServiceInfo中的「上次更新時間」作比較來判斷。若是失效,也會查詢註冊中心、處理ServiceInfo、更新上次處理時間等一系列操做。

業務邏輯最後會計算下一次定時任務的執行時間,經過delayTime來延遲執行。delayTime默認爲 1000L * 6,也就是6秒。而在finally裏面真的發起下一次定時任務。當出現異常時,下次執行的時間與失敗次數有關,但最長不超過1分鐘。

小結

這一篇咱們講了Nacos客戶端服務訂閱機制的源碼,主要有如下步驟:

第一步:訂閱方法的調用,並進行EventListener的註冊,後面UpdateTask要用來進行判斷;

第二步:經過委託代理類來處理訂閱邏輯,此處與獲取實例列表方法使用了同一個方法;

第三步:經過定時任務執行UpdateTask方法,默認執行間隔爲6秒,當發生異常時會延長,但不超過1分鐘;

第四步:UpdateTask方法中會比較本地是否存在緩存,緩存是否過時。當不存在或過時時,查詢註冊中心,獲取最新實例,更新最後獲取時間,處理ServiceInfo。

第五步:從新計算定時任務時間,循環執行上述流程。

下一篇,咱們會在此基礎上繼續講解ServiceInfoHolder#processServiceInfo方法中是如何對獲取到的實例信息進行處理的。

博主簡介:《SpringBoot技術內幕》技術圖書做者,酷愛鑽研技術,寫技術乾貨文章。

公衆號:「程序新視界」,博主的公衆號,歡迎關注~

技術交流:請聯繫博主微信號:zhuan2quan

相關文章
相關標籤/搜索