Spring Cloud Alibaba Nacos(源碼篇)

在看這篇文章以前,最好對NACOS相關功能有所瞭解,推薦看完Spring Cloud Alibaba Nacos(功能篇)前端

針對功能,有目的的去找相對應的源代碼,進一步瞭解功能是如何被實現出來的。java

本文針對有必定源代碼閱讀經驗的人羣,不會深刻太多的細節,還須要讀者打開源碼跟蹤,自行領會。程序員

1、引子

進入GitHub對應的頁面,將NACOS工程clone下來。目錄和文件看起來很冗長,可是對於看源代碼真正有幫助的部分並很少。
圖片描述spring

圖片描述

圖片描述

有了這三張圖,就能順利找到突破口了,核心內容就集中在nacos-console,nacos-naming,nacos-config,順藤摸瓜,就能看到很多內容了。數據庫

若是仍是感受無從下手的話,那就移步nacos-example,裏面有主要業務的調用入口,一看便知。編程

2、配置服務

首先從一個工廠類提及:com.alibaba.nacos.api.NacosFactory。bootstrap

裏面的靜態方法用於建立ConfigService和NamingService,代碼相似,以建立ConfigService爲例:segmentfault

public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(-400, e.getMessage());
        }
}

沒有什麼複雜的邏輯,使用的是基本的反射原理。構造參數傳入了properties,這些屬性能夠經過bootstrap.yml中指定,對應的是NacosConfigProperties。api

須要細看的是構造函數中對於namespace初始化的那部份內容。緩存

private void initNamespace(Properties properties) {
        String namespaceTmp = null;

        String isUseCloudNamespaceParsing =
            properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                    String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));

        if (Boolean.valueOf(isUseCloudNamespaceParsing)) {
            namespaceTmp = TemplateUtils.stringBlankAndThenExecute(namespaceTmp, new Callable<String>() {
                @Override
                public String call() {
                    return TenantUtil.getUserTenantForAcm();
                }
            });

            namespaceTmp = TemplateUtils.stringBlankAndThenExecute(namespaceTmp, new Callable<String>() {
                @Override
                public String call() {
                    String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);
                    return StringUtils.isNotBlank(namespace) ? namespace : EMPTY;
                }
            });
        }

        if (StringUtils.isBlank(namespaceTmp)) {
            namespaceTmp = properties.getProperty(PropertyKeyConst.NAMESPACE);
        }
        namespace = StringUtils.isNotBlank(namespaceTmp) ? namespaceTmp.trim() : EMPTY;
        properties.put(PropertyKeyConst.NAMESPACE, namespace);
}

傳入的properties會指定是否解析雲環境中的namespace參數,若是是的,就是去讀取阿里雲環境的系統變量;若是不是,那麼就讀取properties中指定的namespace,沒有指定的話,最終解析出來的是空字符串。從代碼上看出來,獲取雲環境的namespace作成了異步化的形式,可是目前版本仍是使用的同步調用。

繼續跟蹤ConfigService,裏面定義了一系列接口方法,正是咱們所要看的。

每一個業務實現最終都歸結爲Http請求,就是配置的serverAddr,多個地址會依次輪轉使用,固然是在必定超時時間內依次請求,都請求不成功了,那就會拋出異常。

請求方是nacos-client,接收方最終都是落到nacos-config服務上,最後使用JdbcTemplate進行數據持久化。

這一部分的代碼一看就明白,發佈配置,獲取配置和刪除配置都有所體現,就不展開闡述了。

重點解析一下配置監聽部分的源代碼。

先將注意力放在com.alibaba.nacos.client.config.impl.CacheData這個數據結構上,是個典型的充血模型,主要是充當listener管理者的角色,這樣看來,類名取得並非那麼友好了。

實際上,能夠看出CacheData將配置信息(namespace, content)和listener聚合在一塊兒了,能夠認爲一項配置能夠附加多種listener實施監聽(由於listener接口可能有多種實現),每種listener只會有一個實例附加在配置上。

public void addListener(Listener listener) {
        if (null == listener) {
            throw new IllegalArgumentException("listener is null");
        }
        ManagerListenerWrap wrap = new ManagerListenerWrap(listener);
        if (listeners.addIfAbsent(wrap)) {
            LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
                listeners.size());
        }
}

使用了CopyOnWriteArrayList.addIfAbsent方法,這個方法最重要就是equals方法,ManagerListenerWrap是對listener的另一種形式的包裹,其實現了equals方法:

@Override
public boolean equals(Object obj) {
        if (null == obj || obj.getClass() != getClass()) {
            return false;
        }
        if (obj == this) {
            return true;
        }
        ManagerListenerWrap other = (ManagerListenerWrap) obj;
        return listener.equals(other.listener);
}

再往上層翻,能夠找到對於listener更高層的管理API:com.alibaba.nacos.client.config.impl.ClientWorker。

一樣是對listener的管理,可是增長了重複校驗,其中cacheMap是關鍵,以下定義:

private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>()

使用了具備原子性操做特性的AtomicReference,能夠避免併發帶來的數據不一致的問題,裏面包裹的是一個HashMap,value是CacheData對象,而key是有必定生成規則的,在GroupKey這個類中能夠找到:

static public String getKeyTenant(String dataId, String group, String tenant) {
        StringBuilder sb = new StringBuilder();
        urlEncode(dataId, sb);
        sb.append('+');
        urlEncode(group, sb);
        if (StringUtils.isNotEmpty(tenant)) {
            sb.append('+');
            urlEncode(tenant, sb);
        }
        return sb.toString();
}

其實是將配置信息用「+」號進行拼接,若是配置信息中自己存在了「+」和「%」,會使用urlEncode方法進行編碼轉義。固然,也有配套的解析方法,這裏就再也不展開講解了。

接下來的無非就是就cacheMap的一系列get和set操做,用以維護listener。特別注意的是,每次更新操做都是先生成一個copy對象,操做此對象以後,再整個set(覆蓋)到cacheMap中。

最後說一下listener是如何運行起來的。

仍然是在ClientWorker當中能夠找到,將注意力轉移到構造函數中。其中,能夠注意到,初始化了兩個線程池:

executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
       }, 1L, 10L, TimeUnit.MILLISECONDS);

兩個用於執行定時任務的scheduledThreadPool,而這兩個線程池的分工也是嵌套的:executor用於發佈配置檢查的任務,而executorService則是任務的接收者,是真正執行任務的角色。

因此發佈任務的線程池只分配了1個核心線程數,而執行任務的線程池的核心線程是CPU核數。

由於配置檢查是一個長輪詢的過程,一個任務執行者能監測的配置數量須要獲得控制,因此NACOS目前使用了一個比較簡單的分任務規則:

public void checkConfigInfo() {
        // 分任務
        int listenerSize = cacheMap.get().size();
        // 向上取整爲批數
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // 要判斷任務是否在執行 這塊須要好好想一想。 任務列表如今是無序的。變化過程可能有問題
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
}

在ParamUtil.getPerTaskConfigSize()中返回的是每一個任務能監測的配置數量上限,默認是3000條,能夠經過系統變量PER_TASK_CONFIG_SIZE更改這個上限。

從代碼上能夠看出,若是當前listener的數量沒有超過3000個,配置監測的線程池還運轉不起來。若是細看這個部分的代碼,仍是會發現一些問題的,主要是圍繞着任務管理衍生出來的一系列問題。

長輪詢裏面主要有兩部分邏輯:

  • 檢查本地配置,與CacheData存儲的信息保持一致;
  • 檢查server端配置,更新CacheData存儲的信息。

3、服務註冊與發現

有了上述的基礎,這部分代碼看起來會比較輕鬆了,結構上基本類似。

直接進入com.alibaba.nacos.api.naming.NamingService,裏面有多個registerInstance重構方法,用於服務註冊。

先看看Instance實體類包含的內容:id,ip,port,serviceName,clusterName(所在集羣),weight(權重),healthy(是否正常),enabled(是否啓用),ephemeral(是不是臨時的),這9個屬性所有均可以在Console中有所體現。

而後,直接看註冊服務的方法:

@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

前面一大段代碼是對臨時服務實例的處理,就是在構造一個心跳包發送給NACOS服務。

registerService方法就是封裝了HTTP請求,最終在InstanceController中處理請求。

若是項目集成了spring-cloud-starter-alibaba-nacos-discovery,服務啓動後默認是自動註冊的。若是想看自動註冊的過程,能夠從AbstractAutoServiceRegistration開始着手,當中有一段代碼:

@EventListener(WebServerInitializedEvent.class)
    public void bind(WebServerInitializedEvent event) {
        ApplicationContext context = event.getApplicationContext();
        if (context instanceof ConfigurableWebServerApplicationContext) {
            if ("management".equals(
                    ((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
                return;
            }
        }
        this.port.compareAndSet(0, event.getWebServer().getPort());
        this.start();
    }

監聽了Web服務初始化完成的事件,最終會執行start方法:

public void start() {
        if (!isEnabled()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Discovery Lifecycle disabled. Not starting");
            }
            return;
        }
        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get()) {
            register();
            if (shouldRegisterManagement()) {
                registerManagement();
            }
            this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
            this.running.compareAndSet(false, true);
        }
    }

其中,register方法就是最核心的部分了,來源於NacosServiceRegistry的實現:

@Override
    public void register(NacosRegistration registration) {

        if (!registration.isRegisterEnabled()) {
            logger.info("Nacos Registration is disabled...");
            return;
        }
        if (StringUtils.isEmpty(registration.getServiceId())) {
            logger.info("No service to register for nacos client...");
            return;
        }
        NamingService namingService = registration.getNacosNamingService();
        String serviceId = registration.getServiceId();

        Instance instance = new Instance();
        instance.setIp(registration.getHost());
        instance.setPort(registration.getPort());
        instance.setWeight(registration.getRegisterWeight());
        instance.setClusterName(registration.getCluster());
        instance.setMetadata(registration.getMetadata());
        try {
            namingService.registerInstance(serviceId, instance);
            logger.info("nacos registry, {} {}:{} register finished", serviceId, instance.getIp(), instance.getPort());
        }catch (Exception e) {
            logger.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
        }
    }

這段代碼就很是熟悉了,最終就回到了上述的namingService.registerInstance方法。

/**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

以上出現了另一個實體類:com.alibaba.nacos.naming.core.Service,Service是包含了Instance,一個Service下有多個Instance,便可組成一個Cluster。

圖片描述

在調用registerInstance註冊實例的時候,若是發現對應的Service沒有被註冊,那麼會registerService,而且會初始化對應的Cluster,啓動健康檢查的定時器。

和registerInstance相反的是deregisterInstance,即爲取消註冊,也能夠認爲是服務實例下線。

最後來看看NACOS如何實現服務發現功能。

從消費者(調用方)的角度來看,集成的starter項目中有個類:NacosServerList,最重要的是繼承了AbstractServerList,實現了兩個關鍵的接口方法,至關因而NACOS與Ribbon的對接點。

public interface ServerList<T extends Server> {

    public List<T> getInitialListOfServers();
    
    /**
     * Return updated list of servers. This is called say every 30 secs
     * (configurable) by the Loadbalancer's Ping cycle
     * 
     */
    public List<T> getUpdatedListOfServers();   
}

NACOS對於這個兩個接口的實現,都使用了getServers方法,而進入到getServers方法體裏面,其實就是利用了上述所說的NacosNamingService.selectInstances方法,經過serviceId獲取到ServiceInfo對象,而後獲取到Service下面的全部有效的Instance。

從提供者(被調用方)的角度看,NACOS是經過定時器來實時更新ServiceInfo,主要業務邏輯是在HostReactor中實現的。與前述的serviceMap不同,HostReactor中維護的是serviceInfoMap。

private Map<String, ServiceInfo> serviceInfoMap;
HostReactor藉助了FailoverReactor對ServiceInfo作了磁盤緩存,仍然是啓動了定時任務,在指定的目錄下序列化ServiceInfo,以此實現了Failover機制。而啓動failover-mode也是有開關的,其實就是一個特定文件的一部份內容,這些配置的監測也是經過定時任務來實現的。

File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);

整個過程以下圖所示:

圖片描述

4、管理控制檯(Console)

這一部分是管理控制檯的實現,實際上是一個很是典型的WEB項目。

使用了Spring Security + JWT進行安全控制,前端技術是ReactJs,利用JdbcTemplate進行數據庫持久化。

須要注意的是,控制檯提供的功能並不都是從nacos-console這個服務中獲取的數據,而是分散在了各個服務中。

nacos-console提供了控制檯登陸,namespace管理,控制檯服務狀態這三部分能力,而配置管理和服務管理分別請求的是nacos-config和nacos-naming所提供的API,而這些API就是官網所提到的Open-API。

5、總結

NACOS相關源碼通俗易懂,沒有什麼高深的理念,也沒有進行層層封裝和包裹,有必定編程經驗的程序員能在半小時以內把握整個項目的脈絡。

固然,也會存在一些不可忽視的缺點,好比,註釋過少,代碼還有很大的重構空間,tenant和namespace兩個概念混淆使用。

關於Spring Cloud Alibaba Nacos的介紹到此就結束了,但願對你有所幫助。

掃描下方二維碼,進入原創乾貨,搞「技」聖地。

圖片描述

相關文章
相關標籤/搜索