Nacos源碼分析(註冊發現、集羣同步、心跳、Eureka對比)

nacos-discoveryspring.factories:java

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

這裏的核心類是NacosDiscoveryAutoConfiguration,它主要註冊了NacosAutoServiceRegistrationspring

@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

 

NacosAutoServiceRegistrationbootstrap

繼承了AbstractAutoServiceRegistration,它實現了ApplicationListener<WebServerInitializedEvent>,那麼容器啓動的最後階段會去執行這裏實現的onApplicationEvent方法。緩存

@Override
	@SuppressWarnings("deprecation")
	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}

bind:服務器

@Deprecated
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context)
					.getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort()); // CAS修改端口
		this.start(); // 調用start方法作一些事情
	}

	public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}

		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get()) {
			this.context.publishEvent(
					new InstancePreRegisteredEvent(this, getRegistration()));
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
			this.running.compareAndSet(false, true);
		}

	}

能夠看到start中會進行初始化的判斷,而後調用register():數據結構

@Override
	public void register(Registration registration) {

		String serviceId = registration.getServiceId(); // 得到服務id

		Instance instance = getNacosInstanceFromRegistration(registration); // 建立instance對象

		try {
			namingService.registerInstance(serviceId, instance); // 服務註冊
			log.info("nacos registry, {} {}:{} register finished", serviceId,
					instance.getIp(), instance.getPort());
		}
		catch (Exception e) {
			log.error("nacos registry, {} register failed...{},", serviceId,
					registration.toString(), e);
		}
	}

registerInstance:併發

@Override
    public void registerInstance(String serviceName, Instance instance) throws NacosException {
        registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
    }

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

        if (instance.isEphemeral()) { // 是不是臨時實例,默認true
            // 臨時實例則構建心跳定時任務
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            long instanceInterval = instance.getInstanceHeartBeatInterval();
            // DEFAULT_HEART_BEAT_INTERVAL 默認5s
            beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
            // BeatReactor裏面維護了一個ScheduledExecutorService,經過它添加定時任務
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        // 註冊服務的邏輯
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

首先看下添加心跳機制的addBeatInfo:app

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
        // 直接添加一個定時任務BeatTask
        executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

    class BeatTask implements Runnable {

        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            // 向遠程服務發送心跳請求,是一個PUT請求
            long result = serverProxy.sendBeat(beatInfo);
            // 得到下次執行時間並添加到線程池中
            long nextTime = result > 0 ? result : beatInfo.getPeriod();
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }

serverProxy.registerService:dom

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);

        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));

        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }

能夠看到也是向遠程服務發送請求,只不過是POST類型。異步

 

至此客戶端大概流程已分析完畢,接下來看下服務端邏輯:

 

InstanceController

能夠看出是Restful風格的,一個實體一個方法,使用不一樣的請求方式區分增刪改查

服務註冊:com.alibaba.nacos.naming.controllers.InstanceController#register

@CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        // 將request轉爲Instance對象
        final Instance instance = parseInstance(request);
        // 服務註冊
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

registerInstance:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        // 根據命名空間和服務名構建service
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        // 得到service
        Service service = getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        // 添加實例主要邏輯
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

createEmptyService:

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        createServiceIfAbsent(namespaceId, serviceName, local, null);
    }

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        // 根據namespaceId得到serviceMap,它是一個ConcurrentHashMap
        // 根據serviceName從中取出服務
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            // 服務爲空則去構建
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            // 放入服務,初始化定時任務,入隊
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

putServiceAndInit:

private void putServiceAndInit(Service service) throws NacosException {
        // 向serviceMap放入service
        putService(service);
        // 構建心跳檢查任務
        service.init();
        // 向隊列中添加一個Service,有異步線程處理
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

心跳檢查任務是一個ClientBeatCheckTask,主要邏輯:

// 得到全部實例
            List<Instance> instances = service.allIPs(true);
            // 遍歷實例集合
            // 當前時間 - 上次心跳時間 > 超時時間(默認15s)
            // 超時則將健康狀態設置爲false
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            // 經過deleteIp方法調用本身本機的過時接口

            for (Instance instance : instances) {
                
                if (instance.isMarked()) {
                    continue;
                }
                
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }

至此registerInstance方法中的createEmptyService邏輯已經分析完成,繼續看addInstance的邏輯:

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        // 構建key,若是ephemeral爲true則key中存在"ephemeral."
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {
            // 服務註冊,使用CopyOnWrite思想
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            // 返回的instanceList是修改後的列表
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // key中存在"ephemeral."則consistencyService就是DistroConsistencyServiceImpl
            consistencyService.put(key, instances);
        }
    }

com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put:

public void put(String key, Record value) throws NacosException {
        // 實例放入隊列
        onPut(key, value);
        // 若是是集羣環境,將節點信息放入ConcurrentHashMap中
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
    }

onPut:

notifier.addTask(key, DataOperation.CHANGE);


        public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.offer(Pair.with(datumKey, action));
        }

這裏的tasks是

private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

這裏我有個沒想通的點,爲何要使用ArrayBlockingQueue?LinkedBlockingQueue性能更好纔對

此處run方法會循環從隊列中拿數據,拿到則去註冊;由於是阻塞隊列,隊列爲空時會睡眠,不存在CPU浪費的問題。

public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                    Pair<String, DataOperation> pair = tasks.take();
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }

 

distroProtocol.sync:集羣環境所用,將修改的信息放入Map中,異步線程會拿到它去調用遠程接口同步數據

public void sync(DistroKey distroKey, DataOperation action, long delay) {
        // 得到全部節點遍歷
        for (Member each : memberManager.allMembersWithoutSelf()) {
        // 節點信息封裝爲DistroKey
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // 節點信息放入ConcurrentHashMap中
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }

 

com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder

@Component
public class DistroTaskEngineHolder {
    
    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
}

成員變量DistroDelayTaskExecuteEngine,它的父構造方法:

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
        super(logger);
        tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
        processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
        // 向定時線程池添加ProcessRunnable
        processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
    }

ProcessRunnable邏輯:

private class ProcessRunnable implements Runnable {

        @Override
        public void run() {
            try {
                processTasks();
            } catch (Throwable e) {
                getEngineLog().error(e.toString(), e);
            }
        }
    }

最終它會執行com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor#process,從上面的Map中取出信息放到一個隊列中

com.alibaba.nacos.common.task.engine.TaskExecuteWorker.InnerWorker

private class InnerWorker extends Thread {
        
        InnerWorker(String name) {
            setDaemon(false);
            setName(name);
        }
        
        @Override
        public void run() {
            while (!closed.get()) {
                try {
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    task.run();
                    long duration = System.currentTimeMillis() - begin;
                    if (duration > 1000L) {
                        log.warn("distro task {} takes {}ms", task, duration);
                    }
                } catch (Throwable e) {
                    log.error("[DISTRO-FAILED] " + e.toString(), e);
                }
            }
        }
    }

從隊列中拿出任務直接執行run方法,這裏執行的是DistroSyncChangeTask中的邏輯:

public void run() {
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
        try {
            String type = getDistroKey().getResourceType();
            DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
            distroData.setType(DataOperation.CHANGE);
            boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
            if (!result) {
                handleFailedTask();
            }
            Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
        } catch (Exception e) {
            Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
            handleFailedTask();
        }
    }

這裏syncData調用遠程接口/distro/datum同步數據。

 

DistroProtocol

構造方法

public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        this.distroConfig = distroConfig;
        startDistroTask();
    }

會調用startDistroTask():

private void startLoadTask() {
        DistroCallback loadCallback = new DistroCallback() {
            @Override
            public void onSuccess() {
                isInitialized = true;
            }

            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        GlobalExecutor.submitLoadDataTask(
            new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
    }

向線程池添加定時任務DistroLoadDataTask:

public void run() {
        try {
            load();
            if (!checkCompleted()) {
                GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
            } else {
                loadCallback.onSuccess();
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
            }
        } catch (Exception e) {
            loadCallback.onFailed(e);
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
        }
    }

    private void load() throws Exception {
        for (String each : distroComponentHolder.getDataStorageTypes()) {
            if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
                // 經過loadAllDataSnapshotFromRemote從遠程拉取數據
                loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
            }
        }
    }

    private boolean loadAllDataSnapshotFromRemote(String resourceType) {
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);

        for (Member each : memberManager.allMembersWithoutSelf()) {
            try {
                DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
                // 拉取數據的邏輯
                boolean result = dataProcessor.processSnapshot(distroData);
                // 若是拉取成功則直接返回,保證只從一臺服務器拉取數據
                if (result) {
                    return true;
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
            }
        }
        return false;
    }

 

拉取數據會調用遠程的com.alibaba.nacos.naming.controllers.DistroController#onSyncDatum

最終調用dataProcessor.processData:

public boolean processData(DistroData distroData) {
        DistroHttpData distroHttpData = (DistroHttpData) distroData;
        Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
        onPut(datum.key, datum.value);
        return true;
    }

會調用onPut去註冊實例。

 

服務發現

客戶端:

com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String):

核心邏輯在getServiceInfo方法中的getServiceInfo0方法中:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        // 從Map中獲取服務
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            
            updatingMap.put(serviceName, new Object());
            // 若是爲空則調用updateServiceNow,得到遠程服務列表放入到Map中
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
            
        } else if (updatingMap.containsKey(serviceName)) {
            
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        // 發佈定時任務,定時拉取列表信息去更新Map
        scheduleUpdateIfAbsent(serviceName, clusters);
        
        return serviceInfoMap.get(serviceObj.getKey());
    }

服務端:

服務端收到請求,進入com.alibaba.nacos.naming.controllers.InstanceController#list:

會調用doSrvIpxt,核心邏輯就是調用srvIPs方法:

public List<Instance> srvIPs(List<String> clusters) {
        if (CollectionUtils.isEmpty(clusters)) {
            clusters = new ArrayList<>();
            clusters.addAll(clusterMap.keySet());
        }
        // 得到實例集合並返回
        return allIPs(clusters);
    }

    public List<Instance> allIPs(List<String> clusters) {
        List<Instance> result = new ArrayList<>();
        for (String cluster : clusters) {
            // 這裏的信息是註冊時候放入的
            Cluster clusterObj = clusterMap.get(cluster);
            if (clusterObj == null) {
                continue;
            }
            
            result.addAll(clusterObj.allIPs());
        }
        return result;
    }

 

 

和Eureka對比

首先給阿里的編碼風格點贊,看着就是舒服,Eureka代碼一大片邏輯都寫在一塊兒了,相比下Nacos看起來簡潔清爽了許多

Eureka註冊部分代碼:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 加讀鎖
            read.lock();
            // 得到微服務組
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            // 根據傳入的id得到服務實例
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 若是存在則賦值給registrant
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    registrant = existingLease.getHolder();
                }
            } else {
                // 不存在,記錄數量
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
            }
            // 使用registrant建立Lease
            // 會記錄registrationTimestamp(服務註冊時間) 
            // lastUpdateTimestamp(最後操做時間) duration(失效時間數)
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                // 設置恢復正常時的狀態
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 放入微服務組中
            gMap.put(registrant.getId(), lease);
        } finally {
            read.unlock();
        }

Eureka是同步去註冊的,註冊時加讀鎖,Nacos註冊時直接入隊列,異步線程去進行註冊,註冊時使用CopyOnWrite空間換時間,提高註冊併發

 

服務發現對比

Nacos獲取服務實例直接取自ephemeralInstances,Eureka服務註冊和發現時會加鎖,爲了下降鎖競爭,有三級緩存

// 無過時時間,保存服務信息的對外輸出數據結構,定時從二級緩存拉取註冊信息
    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();

    // 爲了下降註冊表registry讀寫鎖競爭,下降讀取頻率,本質上是 guava 的緩存,包含定時失效機制
    private final LoadingCache<Key, Value> readWriteCacheMap;

    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

讀取順序:只讀緩存->讀寫緩存->真實數據

只讀緩存的數據只會來源於讀寫緩存,並且沒有提供主動更新API。

讀寫緩存是使用Guava實現的自己設置了 guava 的失效機制,隔一段時間後本身自動失效。

定時更新一級緩存的時候,會讀取二級緩存,若是二級緩存沒有數據,也會觸發load,拉取registry的註冊數據

相關文章
相關標籤/搜索