Nacos是一款阿里巴巴推出的一款微服務發現、配置管理框架。咱們本次對將對它的服務註冊發現功能進行簡單源碼分析。java
Nacos的分析分爲兩部分,一部分是咱們的客戶端(將本身註冊到Nacos),另外一部分是Nacos Server處理咱們的註冊請求等。git
細節篇幅很少展現,大體以下github
引入了pom依賴web
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
並在application.yml配置好nacos地址(本地),咱們的這個應用啓動後會向Nacos服務端去註冊。spring
咱們從https://github.com/alibaba/nacos,即Nacos的官網github按tag拉下源碼到本地。api
會有不少模塊:address、api、client、cmdb、core、console等等。緩存
從console裏的Nacos.java文件啓動便可,它是個SpringBoot應用,啓動後就能夠處理註冊等請求了。多線程
打開客戶端引入的依賴包的pom,只引入了spring-cloud-alibaba-nacos-discovery:併發
SpringCloud系列都是經過spring.factories文件進行自動配置,咱們打開spring-cloud-alibaba-nacos-discovery的spring.factories文件:app
去看看NacosDiscoveryAutoConfiguration這個名字的,名字能夠看出它是和自動註冊發現相關的配置類:
@Configuration @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class }) public class NacosDiscoveryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
註冊了三個Bean,各個Bean名字也是見名知義,上面兩個是服務與nacos註冊邏輯自己,最後一個Auto的纔是自動配置相關的,應該是入口。
打開NacosAutoServiceRegistration源碼,會發現它的父類AbstractAutoServiceRegistration實現了ApplicationListener
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> { //略*** @Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); }
註冊入口應該就是這裏,bind方法開始執行nacos本身的邏輯,bind方法:
public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); //略 this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); }
start:
public void start() { //略 if (!this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } }
這裏就能夠發現自動配置觸發的註冊方法了,register();,後續就是如何註冊了!
不斷跟進剛剛的多個register()重名方法,會來到真正的register方法,以下:
public void register(Registration registration) { //略 String serviceId = registration.getServiceId(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, instance); //略 } catch (Exception e) { //略 } }
邏輯比較直接,主要是獲取服務id(好比服務名啥的)+這個實例的具體信息(封裝成Instance),最後經過namingService去註冊,跟進註冊:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { //判斷是不是臨時節點 if (instance.isEphemeral()) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); //略 beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
其實這裏能夠看出若是不是臨時節點是不須要發送心跳消息的,這裏心跳機制是經過beatReactor.addBeatInfo裏內部的一個定時任務去實現的,核心就是內部的:
long result = serverProxy.sendBeat(beatInfo); long nextTime = result > 0 ? result : beatInfo.getPeriod(); executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
經過線程池跑任務,定時訪問Nacos服務端的/instance/beat接口,發送HTTP請求 表示本身活着
剛剛registerInstance裏的
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
繼續跟進:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(9); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); }
其實就是準備參數準備發http請求了哈,註冊的接口地址是NACOS_URL_INSTANCE,也就是:/instance的post請求
客戶端註冊總結:
1.經過SpringCloud一向使用的spring.factories文件進行自動配置
2.自動配置類將本身注入IOC容器,並實現了ApplicationListener接口,在web容器初始化事件發佈以後加載本身的邏輯
3.加載註冊邏輯,經過發送http請求到/instance接口將自己的信息發給Nacos服務端,以及心跳任務定時發送,告訴本身活着
上面有說到nacos客戶端註冊是經過發送http請求到/instance接口。咱們看看/instance接口作了什麼。Nacos服務端的controller源碼以下:
@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { //...略 @CanDistro @PostMapping public String register(HttpServletRequest request) throws Exception { String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); return "ok"; } }
跟進裏面的serviceManager.registerInstance註冊方法:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
createEmptyService是要在放入instance實例(即註冊的那個節點信息)以前確保service存在,不存在則建立一個,以後就能夠經過getService取出來了。最後再經過addInstance繼續註冊
看看createEmptyService是怎麼建立的,什麼結構?
經過斷點不斷跟進createEmptyService方法源碼,會來到ServiceManager.java的putService方法:
public void putService(Service service) { if (!serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16)); } } } serviceMap.get(service.getNamespaceId()).put(service.getName(), service); }
最後是放到到一個serviceMap的Map結構去了,以下:
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
雙層Map,內部含義實際上是:
Map<namespace,Map<group:serviceNmae,Service>>//第一層key是namespace,第二層裏纔是name和service
實際上放入map以後,還會把service初始化,調用init方法,內部會執行健康檢查:
1.某個實例超過15秒沒收到心跳則把它的healthy屬性設置爲false
2.繼續超過30秒沒收到心跳就會直接剔除這個實例
回到前面的註冊地方,最後保證了有Service以後繼續走主邏輯,addInstance:
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
跟進
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
最後是執行consistencyService.put(key, instances);註冊,這裏會有兩個實現DistroConsistencyServiceImpl和RaftConsistencyServiceImpl,分別對應着註冊中心的AP實現和CP實現,一個基於內存優先可用性(A),一個基於磁盤優先一致性(C),是CAP理論裏的取捨。CAP具體可看:https://baike.baidu.com/item/CAP原則/5712863?fr=aladdin
Nacos的AP模式採用distro協議,Distro是阿里的自創協議,Distro 協議被定位爲 臨時數據的一致性協議
繼續看以前的源碼,註冊最後是來到:
consistencyService.put(key, instances);
跟進:
@Override public void put(String key, Record value) throws NacosException { //1.將註冊實例更新到內存註冊表 onPut(key, value); //2.同步實例信息到Nacos Server集羣其它節點 taskDispatcher.addTask(key); }
如加的註釋這樣,分了兩步實現
跟進onPut源碼:
public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { //封裝數據節點保存 Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //只要傳key就拿到上面的節點去更新了 notifier.addTask(key, ApplyAction.CHANGE); }
這裏也看到了最後notifier.addTask運用了生產者消費者的思想,裏面是添加一個任務到阻塞隊列中去,等着處理,由於這些操做自己不須要當即返回成功,對提高性能有很大幫助。
傳了ApplyAction.CHANGE類型,咱們跟進notifier.addTask,會發現是在Notifier內部類裏,它是多線程Runnable的實現類,邏輯都在run方法裏,等着對應的線程調起執行。
public class Notifier implements Runnable { //略部分代碼 @Override public void run() { while (true) { try { //略部分代碼 for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { //略 } } //略 } catch (Throwable e) { //略 } } } }
判斷是剛纔咱們傳的ApplyAction.CHANGE會去執行listener.onChange,這裏有多個實現,咱們能夠經過打斷點進入的是com.alibaba.nacos.naming.core.Service類中
public void onChange(String key, Instances value) throws Exception { //略 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
核心就是updateIPs:
public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJSON()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(","); } }
爲了防止讀寫併發衝突,方法第一句直接建立了一個新的HashMap,而後去操做新的HashMap,操做完了以後再去替換老的Map數據,CopyOnWrite的思想。
Eureka防止讀寫衝突用的是多級緩存結構,多級緩存定時同步,客戶端感知及時性不如Nacos。
最後還發布了服務變化事件
回到以前的代碼,put方法中是taskDispatcher.addTask(key);進行同步信息到集羣其它節點,跟進代碼:
public void addTask(String key) { queue.offer(key); }
就是把節點的key加入到阻塞隊列中了,等待以後執行,這是內部類TaskScheduler裏的方法,看看總體:
public class TaskScheduler implements Runnable { //略 public void addTask(String key) { queue.offer(key); } @Override public void run() { List<String> keys = new ArrayList<>(); while (true) { try { String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("got key: {}", key); } if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) { continue; } if (StringUtils.isBlank(key)) { continue; } if (dataSize == 0) { keys = new ArrayList<>(); } keys.add(key); dataSize++; if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) { for (Server member : dataSyncer.getServers()) { if (NetUtils.localServer().equals(member.getKey())) { continue; } SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getKey()); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask)); } dataSyncer.submit(syncTask, 0); } lastDispatchTime = System.currentTimeMillis(); dataSize = 0; } } catch (Exception e) { Loggers.DISTRO.error("dispatch sync task failed.", e); } } } }
能夠看到if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod())
達到必定是數量或時間差,就開始提交批量發送同步任務。邏輯在同步類DataSyncer的run方法裏,裏面就是往/distro/datum接口發送數據同步。
Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl具體就不展開了,這裏只簡單介紹一下大概實現方式:
1.是阿里本身實現的CP模式的簡單raft協議
2.判斷本身是Leader節點的話才執行邏輯,不然轉發給Leader
3.同步更新實例數據到磁盤,異步更新內存註冊表
4.用CountDownLatch實現,必須集羣半數以上節點寫入成功才返回客戶端成功
5.成功後調用/raft/datum/commit接口提交
客戶端經過調用/instance/list接口獲取服務端map相關數據,而且會有個延時執行的定時任務去不斷更新最新服務數據