在看這篇文章以前,最好對NACOS相關功能有所瞭解,推薦看完Spring Cloud Alibaba Nacos(功能篇)。前端
針對功能,有目的的去找相對應的源代碼,進一步瞭解功能是如何被實現出來的。java
本文針對有必定源代碼閱讀經驗的人羣,不會深刻太多的細節,還須要讀者打開源碼跟蹤,自行領會。程序員
進入GitHub對應的頁面,將NACOS工程clone下來。目錄和文件看起來很冗長,可是對於看源代碼真正有幫助的部分並很少。spring
有了這三張圖,就能順利找到突破口了,核心內容就集中在nacos-console,nacos-naming,nacos-config,順藤摸瓜,就能看到很多內容了。數據庫
若是仍是感受無從下手的話,那就移步nacos-example,裏面有主要業務的調用入口,一看便知。編程
首先從一個工廠類提及:com.alibaba.nacos.api.NacosFactory。bootstrap
裏面的靜態方法用於建立ConfigService和NamingService,代碼相似,以建立ConfigService爲例:segmentfault
public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(-400, e.getMessage()); } }
沒有什麼複雜的邏輯,使用的是基本的反射原理。構造參數傳入了properties,這些屬性能夠經過bootstrap.yml中指定,對應的是NacosConfigProperties。api
須要細看的是構造函數中對於namespace初始化的那部份內容。緩存
private void initNamespace(Properties properties) { String namespaceTmp = null; String isUseCloudNamespaceParsing = properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING, System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING, String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING))); if (Boolean.valueOf(isUseCloudNamespaceParsing)) { namespaceTmp = TemplateUtils.stringBlankAndThenExecute(namespaceTmp, new Callable<String>() { @Override public String call() { return TenantUtil.getUserTenantForAcm(); } }); namespaceTmp = TemplateUtils.stringBlankAndThenExecute(namespaceTmp, new Callable<String>() { @Override public String call() { String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE); return StringUtils.isNotBlank(namespace) ? namespace : EMPTY; } }); } if (StringUtils.isBlank(namespaceTmp)) { namespaceTmp = properties.getProperty(PropertyKeyConst.NAMESPACE); } namespace = StringUtils.isNotBlank(namespaceTmp) ? namespaceTmp.trim() : EMPTY; properties.put(PropertyKeyConst.NAMESPACE, namespace); }
傳入的properties會指定是否解析雲環境中的namespace參數,若是是的,就是去讀取阿里雲環境的系統變量;若是不是,那麼就讀取properties中指定的namespace,沒有指定的話,最終解析出來的是空字符串。從代碼上看出來,獲取雲環境的namespace作成了異步化的形式,可是目前版本仍是使用的同步調用。
繼續跟蹤ConfigService,裏面定義了一系列接口方法,正是咱們所要看的。
每一個業務實現最終都歸結爲Http請求,就是配置的serverAddr,多個地址會依次輪轉使用,固然是在必定超時時間內依次請求,都請求不成功了,那就會拋出異常。
請求方是nacos-client,接收方最終都是落到nacos-config服務上,最後使用JdbcTemplate進行數據持久化。
這一部分的代碼一看就明白,發佈配置,獲取配置和刪除配置都有所體現,就不展開闡述了。
重點解析一下配置監聽部分的源代碼。
先將注意力放在com.alibaba.nacos.client.config.impl.CacheData這個數據結構上,是個典型的充血模型,主要是充當listener管理者的角色,這樣看來,類名取得並非那麼友好了。
實際上,能夠看出CacheData將配置信息(namespace, content)和listener聚合在一塊兒了,能夠認爲一項配置能夠附加多種listener實施監聽(由於listener接口可能有多種實現),每種listener只會有一個實例附加在配置上。
public void addListener(Listener listener) { if (null == listener) { throw new IllegalArgumentException("listener is null"); } ManagerListenerWrap wrap = new ManagerListenerWrap(listener); if (listeners.addIfAbsent(wrap)) { LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group, listeners.size()); } }
使用了CopyOnWriteArrayList.addIfAbsent方法,這個方法最重要就是equals方法,ManagerListenerWrap是對listener的另一種形式的包裹,其實現了equals方法:
@Override public boolean equals(Object obj) { if (null == obj || obj.getClass() != getClass()) { return false; } if (obj == this) { return true; } ManagerListenerWrap other = (ManagerListenerWrap) obj; return listener.equals(other.listener); }
再往上層翻,能夠找到對於listener更高層的管理API:com.alibaba.nacos.client.config.impl.ClientWorker。
一樣是對listener的管理,可是增長了重複校驗,其中cacheMap是關鍵,以下定義:
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>()
使用了具備原子性操做特性的AtomicReference,能夠避免併發帶來的數據不一致的問題,裏面包裹的是一個HashMap,value是CacheData對象,而key是有必定生成規則的,在GroupKey這個類中能夠找到:
static public String getKeyTenant(String dataId, String group, String tenant) { StringBuilder sb = new StringBuilder(); urlEncode(dataId, sb); sb.append('+'); urlEncode(group, sb); if (StringUtils.isNotEmpty(tenant)) { sb.append('+'); urlEncode(tenant, sb); } return sb.toString(); }
其實是將配置信息用「+」號進行拼接,若是配置信息中自己存在了「+」和「%」,會使用urlEncode方法進行編碼轉義。固然,也有配套的解析方法,這裏就再也不展開講解了。
接下來的無非就是就cacheMap的一系列get和set操做,用以維護listener。特別注意的是,每次更新操做都是先生成一個copy對象,操做此對象以後,再整個set(覆蓋)到cacheMap中。
最後說一下listener是如何運行起來的。
仍然是在ClientWorker當中能夠找到,將注意力轉移到構造函數中。其中,能夠注意到,初始化了兩個線程池:
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS);
兩個用於執行定時任務的scheduledThreadPool,而這兩個線程池的分工也是嵌套的:executor用於發佈配置檢查的任務,而executorService則是任務的接收者,是真正執行任務的角色。
因此發佈任務的線程池只分配了1個核心線程數,而執行任務的線程池的核心線程是CPU核數。
由於配置檢查是一個長輪詢的過程,一個任務執行者能監測的配置數量須要獲得控制,因此NACOS目前使用了一個比較簡單的分任務規則:
public void checkConfigInfo() { // 分任務 int listenerSize = cacheMap.get().size(); // 向上取整爲批數 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判斷任務是否在執行 這塊須要好好想一想。 任務列表如今是無序的。變化過程可能有問題 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
在ParamUtil.getPerTaskConfigSize()中返回的是每一個任務能監測的配置數量上限,默認是3000條,能夠經過系統變量PER_TASK_CONFIG_SIZE更改這個上限。
從代碼上能夠看出,若是當前listener的數量沒有超過3000個,配置監測的線程池還運轉不起來。若是細看這個部分的代碼,仍是會發現一些問題的,主要是圍繞着任務管理衍生出來的一系列問題。
長輪詢裏面主要有兩部分邏輯:
有了上述的基礎,這部分代碼看起來會比較輕鬆了,結構上基本類似。
直接進入com.alibaba.nacos.api.naming.NamingService,裏面有多個registerInstance重構方法,用於服務註冊。
先看看Instance實體類包含的內容:id,ip,port,serviceName,clusterName(所在集羣),weight(權重),healthy(是否正常),enabled(是否啓用),ephemeral(是不是臨時的),這9個屬性所有均可以在Console中有所體現。
而後,直接看註冊服務的方法:
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
前面一大段代碼是對臨時服務實例的處理,就是在構造一個心跳包發送給NACOS服務。
registerService方法就是封裝了HTTP請求,最終在InstanceController中處理請求。
若是項目集成了spring-cloud-starter-alibaba-nacos-discovery,服務啓動後默認是自動註冊的。若是想看自動註冊的過程,能夠從AbstractAutoServiceRegistration開始着手,當中有一段代碼:
@EventListener(WebServerInitializedEvent.class) public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals( ((ConfigurableWebServerApplicationContext) context).getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); }
監聽了Web服務初始化完成的事件,最終會執行start方法:
public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get()) { register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } }
其中,register方法就是最核心的部分了,來源於NacosServiceRegistry的實現:
@Override public void register(NacosRegistration registration) { if (!registration.isRegisterEnabled()) { logger.info("Nacos Registration is disabled..."); return; } if (StringUtils.isEmpty(registration.getServiceId())) { logger.info("No service to register for nacos client..."); return; } NamingService namingService = registration.getNacosNamingService(); String serviceId = registration.getServiceId(); Instance instance = new Instance(); instance.setIp(registration.getHost()); instance.setPort(registration.getPort()); instance.setWeight(registration.getRegisterWeight()); instance.setClusterName(registration.getCluster()); instance.setMetadata(registration.getMetadata()); try { namingService.registerInstance(serviceId, instance); logger.info("nacos registry, {} {}:{} register finished", serviceId, instance.getIp(), instance.getPort()); }catch (Exception e) { logger.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); } }
這段代碼就很是熟悉了,最終就回到了上述的namingService.registerInstance方法。
/** * Map<namespace, Map<group::serviceName, Service>> */ private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
以上出現了另一個實體類:com.alibaba.nacos.naming.core.Service,Service是包含了Instance,一個Service下有多個Instance,便可組成一個Cluster。
在調用registerInstance註冊實例的時候,若是發現對應的Service沒有被註冊,那麼會registerService,而且會初始化對應的Cluster,啓動健康檢查的定時器。
和registerInstance相反的是deregisterInstance,即爲取消註冊,也能夠認爲是服務實例下線。
最後來看看NACOS如何實現服務發現功能。
從消費者(調用方)的角度來看,集成的starter項目中有個類:NacosServerList,最重要的是繼承了AbstractServerList,實現了兩個關鍵的接口方法,至關因而NACOS與Ribbon的對接點。
public interface ServerList<T extends Server> { public List<T> getInitialListOfServers(); /** * Return updated list of servers. This is called say every 30 secs * (configurable) by the Loadbalancer's Ping cycle * */ public List<T> getUpdatedListOfServers(); }
NACOS對於這個兩個接口的實現,都使用了getServers方法,而進入到getServers方法體裏面,其實就是利用了上述所說的NacosNamingService.selectInstances方法,經過serviceId獲取到ServiceInfo對象,而後獲取到Service下面的全部有效的Instance。
從提供者(被調用方)的角度看,NACOS是經過定時器來實時更新ServiceInfo,主要業務邏輯是在HostReactor中實現的。與前述的serviceMap不同,HostReactor中維護的是serviceInfoMap。
private Map<String, ServiceInfo> serviceInfoMap;
HostReactor藉助了FailoverReactor對ServiceInfo作了磁盤緩存,仍然是啓動了定時任務,在指定的目錄下序列化ServiceInfo,以此實現了Failover機制。而啓動failover-mode也是有開關的,其實就是一個特定文件的一部份內容,這些配置的監測也是經過定時任務來實現的。
File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
整個過程以下圖所示:
這一部分是管理控制檯的實現,實際上是一個很是典型的WEB項目。
使用了Spring Security + JWT進行安全控制,前端技術是ReactJs,利用JdbcTemplate進行數據庫持久化。
須要注意的是,控制檯提供的功能並不都是從nacos-console這個服務中獲取的數據,而是分散在了各個服務中。
nacos-console提供了控制檯登陸,namespace管理,控制檯服務狀態這三部分能力,而配置管理和服務管理分別請求的是nacos-config和nacos-naming所提供的API,而這些API就是官網所提到的Open-API。
NACOS相關源碼通俗易懂,沒有什麼高深的理念,也沒有進行層層封裝和包裹,有必定編程經驗的程序員能在半小時以內把握整個項目的脈絡。
固然,也會存在一些不可忽視的缺點,好比,註釋過少,代碼還有很大的重構空間,tenant和namespace兩個概念混淆使用。
關於Spring Cloud Alibaba Nacos的介紹到此就結束了,但願對你有所幫助。
掃描下方二維碼,進入原創乾貨,搞「技」聖地。