SpringCloud Alibaba Nacos註冊中心源碼淺析

1、前置瞭解

1.1 簡介

Nacos是一款阿里巴巴推出的一款微服務發現、配置管理框架。咱們本次對將對它的服務註冊發現功能進行簡單源碼分析。java

1.2 流程

Nacos的分析分爲兩部分,一部分是咱們的客戶端(將本身註冊到Nacos),另外一部分是Nacos Server處理咱們的註冊請求等。git

1.3 要分析demo示例

細節篇幅很少展現,大體以下github

1.3.1 客戶端方面:

引入了pom依賴web

<dependency>
   <groupId>com.alibaba.cloud</groupId>
   <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

並在application.yml配置好nacos地址(本地),咱們的這個應用啓動後會向Nacos服務端去註冊。spring

1.3.2 Nacos服務端方面

咱們從https://github.com/alibaba/nacos,即Nacos的官網github按tag拉下源碼到本地。api

會有不少模塊:address、api、client、cmdb、core、console等等。緩存

從console裏的Nacos.java文件啓動便可,它是個SpringBoot應用,啓動後就能夠處理註冊等請求了。多線程

2、Nacos客戶端源碼流程

2.1 自動配置觸發邏輯入口

打開客戶端引入的依賴包的pom,只引入了spring-cloud-alibaba-nacos-discovery:併發

SpringCloud系列都是經過spring.factories文件進行自動配置,咱們打開spring-cloud-alibaba-nacos-discovery的spring.factories文件:app

去看看NacosDiscoveryAutoConfiguration這個名字的,名字能夠看出它是和自動註冊發現相關的配置類:

@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}
}

註冊了三個Bean,各個Bean名字也是見名知義,上面兩個是服務與nacos註冊邏輯自己,最後一個Auto的纔是自動配置相關的,應該是入口。

打開NacosAutoServiceRegistration源碼,會發現它的父類AbstractAutoServiceRegistration實現了ApplicationListener 接口,通常不少框架都是經過監聽spring事件機制而後開始運做各自的源碼邏輯,打開ApplicationListener接口的重寫方法看看:

public abstract class AbstractAutoServiceRegistration<R extends Registration>
		implements AutoServiceRegistration, ApplicationContextAware,
		ApplicationListener<WebServerInitializedEvent> {

	//略***

	@Override
	@SuppressWarnings("deprecation")
	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}

註冊入口應該就是這裏,bind方法開始執行nacos本身的邏輯,bind方法:

public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		//略
		this.port.compareAndSet(0, event.getWebServer().getPort());
		this.start();
	}

start:

public void start() {
   //略
   if (!this.running.get()) {
      this.context.publishEvent(
            new InstancePreRegisteredEvent(this, getRegistration()));
      register();
      if (shouldRegisterManagement()) {
         registerManagement();
      }
      this.context.publishEvent(
            new InstanceRegisteredEvent<>(this, getConfiguration()));
      this.running.compareAndSet(false, true);
   }

}

這裏就能夠發現自動配置觸發的註冊方法了,register();,後續就是如何註冊了!

2.2 客戶端註冊邏輯 register()

不斷跟進剛剛的多個register()重名方法,會來到真正的register方法,以下:

public void register(Registration registration) {
		//略
		String serviceId = registration.getServiceId();
		Instance instance = getNacosInstanceFromRegistration(registration);

		try {
			namingService.registerInstance(serviceId, instance);
			//略
		}
		catch (Exception e) {
			//略
		}
	}

邏輯比較直接,主要是獲取服務id(好比服務名啥的)+這個實例的具體信息(封裝成Instance),最後經過namingService去註冊,跟進註冊:

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
		//判斷是不是臨時節點
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            //略
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }

        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

心跳機制

其實這裏能夠看出若是不是臨時節點是不須要發送心跳消息的,這裏心跳機制是經過beatReactor.addBeatInfo裏內部的一個定時任務去實現的,核心就是內部的:

long result = serverProxy.sendBeat(beatInfo);
            long nextTime = result > 0 ? result : beatInfo.getPeriod();
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);

經過線程池跑任務,定時訪問Nacos服務端的/instance/beat接口,發送HTTP請求 表示本身活着

繼續看註冊

剛剛registerInstance裏的

serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);

繼續跟進:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);

        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));

        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }

其實就是準備參數準備發http請求了哈,註冊的接口地址是NACOS_URL_INSTANCE,也就是:/instance的post請求

客戶端註冊總結:

1.經過SpringCloud一向使用的spring.factories文件進行自動配置

2.自動配置類將本身注入IOC容器,並實現了ApplicationListener接口,在web容器初始化事件發佈以後加載本身的邏輯

3.加載註冊邏輯,經過發送http請求到/instance接口將自己的信息發給Nacos服務端,以及心跳任務定時發送,告訴本身活着

3、Nacos服務端處理註冊

上面有說到nacos客戶端註冊是經過發送http請求到/instance接口。咱們看看/instance接口作了什麼。Nacos服務端的controller源碼以下:

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    //...略

    @CanDistro
    @PostMapping
    public String register(HttpServletRequest request) throws Exception {

        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

        serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
        return "ok";
    }
}

跟進裏面的serviceManager.registerInstance註冊方法:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

        createEmptyService(namespaceId, serviceName, instance.isEphemeral());

        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }

        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

createEmptyService是要在放入instance實例(即註冊的那個節點信息)以前確保service存在,不存在則建立一個,以後就能夠經過getService取出來了。最後再經過addInstance繼續註冊

看看createEmptyService是怎麼建立的,什麼結構?

3.1 createEmptyService建立保證Service

經過斷點不斷跟進createEmptyService方法源碼,會來到ServiceManager.java的putService方法:

public void putService(Service service) {
        if (!serviceMap.containsKey(service.getNamespaceId())) {
            synchronized (putServiceLock) {
                if (!serviceMap.containsKey(service.getNamespaceId())) {
                    serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
                }
            }
        }
        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    }

最後是放到到一個serviceMap的Map結構去了,以下:

private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

雙層Map,內部含義實際上是:

Map<namespace,Map<group:serviceNmae,Service>>//第一層key是namespace,第二層裏纔是name和service

實際上放入map以後,還會把service初始化,調用init方法,內部會執行健康檢查:

1.某個實例超過15秒沒收到心跳則把它的healthy屬性設置爲false

2.繼續超過30秒沒收到心跳就會直接剔除這個實例

3.2 addInstance註冊

回到前面的註冊地方,最後保證了有Service以後繼續走主邏輯,addInstance:

addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);

跟進

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);

        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);

            consistencyService.put(key, instances);
        }
    }

最後是執行consistencyService.put(key, instances);註冊,這裏會有兩個實現DistroConsistencyServiceImpl和RaftConsistencyServiceImpl,分別對應着註冊中心的AP實現和CP實現,一個基於內存優先可用性(A),一個基於磁盤優先一致性(C),是CAP理論裏的取捨。CAP具體可看:https://baike.baidu.com/item/CAP原則/5712863?fr=aladdin

4、Nacos服務端AP模式實現:DistroConsistencyServiceImpl

Nacos的AP模式採用distro協議,Distro是阿里的自創協議,Distro 協議被定位爲 臨時數據的一致性協議

繼續看以前的源碼,註冊最後是來到:

consistencyService.put(key, instances);

跟進:

@Override
    public void put(String key, Record value) throws NacosException {
        //1.將註冊實例更新到內存註冊表
        onPut(key, value);
        //2.同步實例信息到Nacos Server集羣其它節點
        taskDispatcher.addTask(key);
    }

如加的註釋這樣,分了兩步實現

4.1 onPut將註冊實例更新到內存註冊表

跟進onPut源碼:

public void onPut(String key, Record value) {

        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            //封裝數據節點保存
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }
		//只要傳key就拿到上面的節點去更新了
        notifier.addTask(key, ApplyAction.CHANGE);
    }

這裏也看到了最後notifier.addTask運用了生產者消費者的思想,裏面是添加一個任務到阻塞隊列中去,等着處理,由於這些操做自己不須要當即返回成功,對提高性能有很大幫助。

傳了ApplyAction.CHANGE類型,咱們跟進notifier.addTask,會發現是在Notifier內部類裏,它是多線程Runnable的實現類,邏輯都在run方法裏,等着對應的線程調起執行。

public class Notifier implements Runnable {
		//略部分代碼
        @Override
        public void run() {

            while (true) {
                try {
                    //略部分代碼
                    for (RecordListener listener : listeners.get(datumKey)) {
                        count++;
                        try {
                            if (action == ApplyAction.CHANGE) {
                                listener.onChange(datumKey, dataStore.get(datumKey).value);
                                continue;
                            }
                            if (action == ApplyAction.DELETE) {
                                listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {
                            //略
                        }
                    }

                    //略
                } catch (Throwable e) {
                    //略
                }
            }
        }
    }

判斷是剛纔咱們傳的ApplyAction.CHANGE會去執行listener.onChange,這裏有多個實現,咱們能夠經過打斷點進入的是com.alibaba.nacos.naming.core.Service類中

public void onChange(String key, Instances value) throws Exception {
        //略
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        recalculateChecksum();
    }

核心就是updateIPs:

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }

        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }

                if (StringUtils.isEmpty(instance.getClusterName())) {
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }

                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                        instance.getClusterName(), instance.toJSON());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }

                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }

                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }

        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral);
        }

        setLastModifiedMillis(System.currentTimeMillis());
        getPushService().serviceChanged(this);
        StringBuilder stringBuilder = new StringBuilder();

        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");
        }

    }

爲了防止讀寫併發衝突,方法第一句直接建立了一個新的HashMap,而後去操做新的HashMap,操做完了以後再去替換老的Map數據,CopyOnWrite的思想。

Eureka防止讀寫衝突用的是多級緩存結構,多級緩存定時同步,客戶端感知及時性不如Nacos。

最後還發布了服務變化事件

4.2 同步實例信息到Nacos Server集羣其它節點

回到以前的代碼,put方法中是taskDispatcher.addTask(key);進行同步信息到集羣其它節點,跟進代碼:

public void addTask(String key) {
            queue.offer(key);
        }

就是把節點的key加入到阻塞隊列中了,等待以後執行,這是內部類TaskScheduler裏的方法,看看總體:

public class TaskScheduler implements Runnable {

        //略

        public void addTask(String key) {
            queue.offer(key);
        }

        @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {

                try {

                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);

                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}", key);
                    }

                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }

                    if (StringUtils.isBlank(key)) {
                        continue;
                    }

                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }

                    keys.add(key);
                    dataSize++;

                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {

                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }

                            dataSyncer.submit(syncTask, 0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }

                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }
    }

能夠看到if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod())

達到必定是數量或時間差,就開始提交批量發送同步任務。邏輯在同步類DataSyncer的run方法裏,裏面就是往/distro/datum接口發送數據同步。

5、Nacos服務端CP模式實現:RaftConsistencyServiceImpl

Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl具體就不展開了,這裏只簡單介紹一下大概實現方式:

1.是阿里本身實現的CP模式的簡單raft協議

2.判斷本身是Leader節點的話才執行邏輯,不然轉發給Leader

3.同步更新實例數據到磁盤,異步更新內存註冊表

4.用CountDownLatch實現,必須集羣半數以上節點寫入成功才返回客戶端成功

5.成功後調用/raft/datum/commit接口提交

6、服務發現

客戶端經過調用/instance/list接口獲取服務端map相關數據,而且會有個延時執行的定時任務去不斷更新最新服務數據

相關文章
相關標籤/搜索