【SpringCloud Eureka源碼】從Eureka Client發起註冊請求到Eureka Server處理的整個服務註冊過程(上)

本文使用Spring Cloud Eureka分析spring

Spring Cloud版本: Dalston.SR5json

spring-cloud-starter-eureka版本: 1.3.6.RELEASEbootstrap

netflix eureka版本: 1.6.2緩存


Eureka Client啓動並調用Eureka Server的註冊接口

Spring Cloud Eureka的自動配置

@EnableDiscoveryClient

首先從使用Eureka Client必須引入的@EnableDiscoveryClient註解提及服務器

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

    /**
     * If true, the ServiceRegistry will automatically register the local server.
     */
    boolean autoRegister() default true;
}

@EnableDiscoveryClient註解的做用:session

  • autoRegister默認值爲true,即服務發現客戶端默認會自動註冊到服務端app

  • Import導入EnableDiscoveryClientImportSelector.class,其做用是框架

    • 導入了 spring-cloud-eureka-client.jar!\META-INF\spring.factories 中的 EurekaDiscoveryClientConfigurationdom

      org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
      org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
    • 因爲autoRegister默認爲true,故還會導入AutoServiceRegistrationConfiguration,即啓用自動服務註冊的配置,等同於在配置文件中spring.cloud.service-registry.auto-registration.enabled = true


EurekaDiscoveryClientConfiguration

  • 向Spring容器註冊EurekaDiscoveryClientConfiguration.Marker.class,使得真正導入DiscoveryClient的EurekaClientAutoConfiguration配置類知足啓用條件
  • 建立監聽RefreshScopeRefreshedEvent事件的監聽器,知足在使用RefreshScope刷新時能夠重建EurekaClient(不是本文重點)
  • 在配置eureka.client.healthcheck.enabled=true的前提下,向Spring容器註冊EurekaHealthCheckHandler用於健康檢查(不是本文重點)

因此,EurekaDiscoveryClientConfiguration的主要做用是向Spring容器註冊EurekaDiscoveryClientConfiguration.Marker.class,使得EurekaClientAutoConfiguration配置類知足啓用條件


EurekaClientAutoConfiguration

EurekaClientAutoConfiguration配置類中涉及的內容比較多,主要內容:

  • 一、註冊了spring cloud包下的EurekaClientConfigBean,這是個對netflix的EurekaClientConfig客戶端配置接口的實現
  • 二、註冊了spring cloud包下的EurekaInstanceConfigBean,這是個對netflix的EurekaInstanceConfig實例信息配置接口的實現
  • 三、註冊了一些AutoServiceRegistration,即客戶端自動註冊的組件,如
    • EurekaRegistration: Eureka實例的服務註冊信息(在開啓客戶端自動註冊時纔會註冊)
    • EurekaServiceRegistry: Eureka服務註冊器
    • EurekaAutoServiceRegistration: Eureka服務自動註冊器,實現了SmartLifecycle,會在Spring容器的refresh的最後階段被調用,經過EurekaServiceRegistry註冊器註冊EurekaRegistration信息
  • 四、註冊netflix的EurekaClientApplicationInfoManager,註冊時分爲兩種狀況,便是否知足RefreshScope,若是知足,注入的Bean是帶有 @Lazy + @RefreshScope 註解
    • ApplicationInfoManager: 管理並初始化當前Instance實例的註冊信息,並提供了實例狀態監聽機制
    • EurekaClient : netflix的接口類,用於和Eureka Server交互的客戶端,而netflix的默認實現是DiscoveryClient,也是本文分析的重點
  • 五、註冊EurekaHealthIndicator,爲/health端點提供Eureka相關信息,主要有Status當前實例狀態和applications服務列表,在從Eureka Server獲取服務列表正常的狀況下,Status使用Eureka Server上的InstanceRemoteStatus,不正常狀況下,代碼中有一些判斷邏輯
public class EurekaClientAutoConfiguration {
    ...省略

    /**
     * 一、註冊EurekaClientConfigBean
     */
    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean() {
        EurekaClientConfigBean client = new EurekaClientConfigBean();
        if ("bootstrap".equals(propertyResolver.getProperty("spring.config.name"))) {
            // We don't register during bootstrap by default, but there will be another
            // chance later.
            client.setRegisterWithEureka(false);
        }
        return client;
    }

    /**
     * 二、註冊EurekaInstanceConfigBean
     */
    @Bean
    @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
    public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils) throws MalformedURLException {
        PropertyResolver eurekaPropertyResolver = new RelaxedPropertyResolver(this.env, "eureka.instance.");
        String hostname = eurekaPropertyResolver.getProperty("hostname");

        boolean preferIpAddress = Boolean.parseBoolean(eurekaPropertyResolver.getProperty("preferIpAddress"));
        int nonSecurePort = Integer.valueOf(propertyResolver.getProperty("server.port", propertyResolver.getProperty("port", "8080")));
        int managementPort = Integer.valueOf(propertyResolver.getProperty("management.port", String.valueOf(nonSecurePort)));
        String managementContextPath = propertyResolver.getProperty("management.contextPath", propertyResolver.getProperty("server.contextPath", "/"));
        EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
        instance.setNonSecurePort(nonSecurePort);
        instance.setInstanceId(getDefaultInstanceId(propertyResolver));
        instance.setPreferIpAddress(preferIpAddress);
        if (managementPort != nonSecurePort && managementPort != 0) {
            if (StringUtils.hasText(hostname)) {
                instance.setHostname(hostname);
            }
            String statusPageUrlPath = eurekaPropertyResolver.getProperty("statusPageUrlPath");
            String healthCheckUrlPath = eurekaPropertyResolver.getProperty("healthCheckUrlPath");
            if (!managementContextPath.endsWith("/")) {
                managementContextPath = managementContextPath + "/";
            }
            if (StringUtils.hasText(statusPageUrlPath)) {
                instance.setStatusPageUrlPath(statusPageUrlPath);
            }
            if (StringUtils.hasText(healthCheckUrlPath)) {
                instance.setHealthCheckUrlPath(healthCheckUrlPath);
            }
            String scheme = instance.getSecurePortEnabled() ? "https" : "http";
            URL base = new URL(scheme, instance.getHostname(), managementPort, managementContextPath);
            instance.setStatusPageUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getStatusPageUrlPath(), '/')).toString());
            instance.setHealthCheckUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getHealthCheckUrlPath(), '/')).toString());
        }
        return instance;
    }

    /**
     * 三、註冊客戶端自動註冊相關組件
     *     EurekaRegistration: Eureka實例的服務註冊信息(在開啓客戶端自動註冊時纔會註冊)
     *     EurekaServiceRegistry: Eureka服務註冊器
     *     EurekaAutoServiceRegistration: Eureka服務自動註冊器,
     *                                     經過EurekaServiceRegistry註冊器註冊EurekaRegistration信息
     */
    @Bean
    public EurekaServiceRegistry eurekaServiceRegistry() {
        return new EurekaServiceRegistry();
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager) {
        return EurekaRegistration.builder(instanceConfig)
                .with(applicationInfoManager)
                .with(eurekaClient)
                .with(healthCheckHandler)
                .build();
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
        return new EurekaAutoServiceRegistration(context, registry, registration);
    }


    /**
     * 四、註冊netflix的 EurekaClient 和 ApplicationInfoManager
     */
    // 若是禁用客戶端自動註冊,在此方法debug打斷點會觸發服務註冊,狀態爲STARTING
    @Bean
    public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
        return new EurekaDiscoveryClient(config, client);
    }
    
    // 普通的EurekaClient配置(不可刷新)
    @Configuration
    @ConditionalOnMissingRefreshScope
    protected static class EurekaClientConfiguration {

        @Autowired
        private ApplicationContext context;

        @Autowired(required = false)
        private DiscoveryClientOptionalArgs optionalArgs;

        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }

        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        public ApplicationInfoManager eurekaApplicationInfoManager(
                EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }
    }

     // 可刷新的EurekaClient配置類
    @Configuration
    @ConditionalOnRefreshScope  //知足@ConditionalOnClass(RefreshScope.class)         
                                //    @ConditionalOnBean(RefreshAutoConfiguration.class)
    protected static class RefreshableEurekaClientConfiguration {

        @Autowired
        private ApplicationContext context;

        @Autowired(required = false)
        private DiscoveryClientOptionalArgs optionalArgs;

        // 註冊CloudEurekaClient,是com.netflix.discovery.EurekaClient接口的實現類
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
            manager.getInfo(); // force initialization
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }

        // 註冊ApplicationInfoManager
        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }
    }

    
    /**
     * 五、註冊 EurekaHealthIndicator
     */
    @Configuration
    @ConditionalOnClass(Endpoint.class)
    protected static class EurekaHealthIndicatorConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public EurekaHealthIndicator eurekaHealthIndicator(EurekaClient eurekaClient,
                                                           EurekaInstanceConfig instanceConfig, EurekaClientConfig clientConfig) {
            return new EurekaHealthIndicator(eurekaClient, instanceConfig, clientConfig);
        }
    }
}

如上,在知足一系列Conditional條件後,會向Spring容器中註冊CloudEurekaClient,它是com.netflix.discovery.EurekaClient接口的實現類,具體繼承實現關係以下


DiscoveryClient繼承實現關係

如上圖所示,剛剛建立的CloudEurekaClientcom.netflix.discovery.DiscoveryClient的子類,它們都實現了com.netflix.discovery.EurekaClient接口

EurekaClient是Netflix對服務發現客戶端抽象的接口,包含不少方法,而DiscoveryClient是其默認實現,也是本文分析的重點,CloudEurekaClient是spring cloud的實現,根據類上註釋,其主要重寫了onCacheRefreshed()方法,這個方法主要是從Eureka Server fetchRegistry()獲取服務列表以後用於以廣播方式通知緩存刷新事件的,其實DiscoveryClient也有onCacheRefreshed()方法的實現,但因爲DiscoveryClient是Netflix的類,只發送了com.netflix.discovery.EurekaEvent,而CloudEurekaClient使用Spring的ApplicationEventPublisher,發送了HeartbeatEvent

注意:

上面說的都是netflix的DiscoveryClient

還有另外一個DiscoveryClient,是 org.springframework.cloud.client.discovery.DiscoveryClient

是Spring對服務發現客戶端的抽象


建立DiscoveryClient的過程

DiscoveryClient構造方法

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, 
                EurekaClientConfig config, 
                AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    /**
     * AbstractDiscoveryClientOptionalArgs 是DiscoveryClient的可選參數,可理解爲擴展點
     * 包含healthCheckHandlerProvider、healthCheckCallbackProvider、eventListeners等
     * spring cloud默認實現爲MutableDiscoveryClientOptionalArgs,但此處相關成員變量賦值後認爲空
     */
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
    }
    
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();

    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }

    this.backupRegistryProvider = backupRegistryProvider;

    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    localRegionApps.set(new Applications());

    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    // 若是 shouldFetchRegistry=true,註冊netflix servo監控
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    // 若是 shouldRegisterWithEureka=true,註冊netflix servo監控
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    // 若是既不要向eureka server註冊,又不要獲取服務列表,就什麼都不用初始化
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();

        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
        return;  // no need to setup up an network tasks and we are done
    }

    // 【重點】建立各類Executor 和 eurekaTransport、instanceRegionChecker
    try {
        // 執行定時任務的定時器,定時線程名爲 DiscoveryClient-%d
        // 在定時器中用於定時執行TimedSupervisorTask監督任務,監督任務會強制超時 和 記錄監控數據
        scheduler = Executors.newScheduledThreadPool(3,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        // 執行heartbeat心跳任務的執行器,默認最大線程數=2,線程名爲:DiscoveryClient-HeartbeatExecutor-%d
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        // 執行服務列表緩存刷新的執行器,默認最大線程數=2,線程名爲:DiscoveryClient-CacheRefreshExecutor-%d
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        // 初始化eurekaTransport在服務註冊,獲取服務列表時的client
        scheduleServerEndpointTask(eurekaTransport, args);

        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    // 若是須要從eureka server獲取服務列表,而且嘗試fetchRegistry(false)失敗,調用BackupRegistry
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

    // 【重點】初始化全部定時任務
    initScheduledTasks();
    
    // 添加servo監控
    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}

上面的DiscoveryClient構造方法代碼比較多,但多數都是一些賦值,本次分析的重點在註釋中已經標出,建立了各類Executor 和 eurekaTransport、instanceRegionChecker,以後又調用initScheduledTasks()方法初始化全部這些定時任務


【重點】initScheduledTasks() 初始化定時任務

/**
 * Initializes all scheduled tasks.
 */
private void initScheduledTasks() {
    // 一、若是要從Eureka Server獲取服務列表
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        
        // 從eureka服務器獲取註冊表信息的頻率(默認30s)
        // 同時也是單次獲取服務列表的超時時間
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        // 若是緩存刷新超時,下一次執行的delay最大是registryFetchIntervalSeconds的幾倍(默認10),默認每次執行是上一次的2倍
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        
        /**
         * 【#### 執行CacheRefreshThread,服務列表緩存刷新任務 ####】
         * 執行TimedSupervisorTask監督任務的定時器,具體執行器爲cacheRefreshExecutor,任務爲CacheRefreshThread
         */
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",               //監控名
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds, //指定具體任務的超時時間
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    
    // 二、若是要註冊到Eureka Server
    if (clientConfig.shouldRegisterWithEureka()) {
        // 續租的時間間隔(默認30s)
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        // 若是心跳任務超時,下一次執行的delay最大是renewalIntervalInSecs的幾倍(默認10),默認每次執行是上一次的2倍
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        /**
         * 【#### 執行HeartbeatThread,發送心跳數據 ####】
         * 執行TimedSupervisorTask監督任務的定時器,具體執行器爲heartbeatExecutor,任務爲HeartbeatThread
         */
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        /** 
         * 【#### InstanceInfo複製器 ####】
         * 啓動後臺定時任務scheduler,線程名爲 DiscoveryClient-InstanceInfoReplicator-%d
         * 默認每30s執行一次定時任務,查看Instance信息(DataCenterInfo、LeaseInfo、InstanceStatus)是否有變化
         * 若是有變化,執行 discoveryClient.register()
         */
        instanceInfoReplicator = new InstanceInfoReplicator(
               this,            //當前DiscoveryClient
               instanceInfo,    //當前實例信息
               clientConfig.getInstanceInfoReplicationIntervalSeconds(),//InstanceInfo的複製間隔(默認30s)
               2); // burstSize

        /**
         * 【StatusChangeListener 狀態改變監聽器】
         */
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                
                //使用InstanceInfo複製器 scheduler.submit()一個Runnable任務
                //後臺立刻執行 discoveryClient.register()
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        /**
         * 是否關注Instance狀態變化,使用後臺線程將狀態同步到eureka server(默認true)
         * 調用 ApplicationInfoManager#setInstanceStatus(status) 會觸發
         * 將 StatusChangeListener 註冊到 ApplicationInfoManager
         */
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        // 啓動InstanceInfo複製器
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } 
    // 當前服務實例不註冊到Eureka Server
    else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

總的來講initScheduledTasks()作了如下幾件事:

  • 若是shouldFetchRegistry=true,即要從Eureka Server獲取服務列表
    • 啓動刷新服務列表定時線程(DiscoveryClient-CacheRefreshExecutor-%d),默認registryFetchIntervalSeconds=30s執行一次,任務爲CacheRefreshThread,即從Eureka Server獲取服務列表,也刷新客戶端緩存
  • 若是shouldRegisterWithEureka=true,即要註冊到Eureka Server
    • 啓動heartbeat心跳定時線程(DiscoveryClient-HeartbeatExecutor-%d),默認renewalIntervalInSecs=30s續約一次,任務爲HeartbeatThread,即客戶端向Eureka Server發送心跳
    • 啓動InstanceInfo複製器定時線程(DiscoveryClient-InstanceInfoReplicator-%d),開啓定時線程檢查當前Instance的DataCenterInfo、LeaseInfo、InstanceStatus,若是發現變動就執行discoveryClient.register(),將實例信息同步到Server端


Eureka Client複製InstanceInfo,發起註冊

由建立DiscoveryClient的過程可知,建立了不少定時執行線程,如定時從Server端刷新服務列表的CacheRefreshThread,定時報心跳續約的HeartbeatThread,還有用於更新並複製本地實例狀態到Server端的InstanceInfo複製器定時線程,而正是InstanceInfoReplicator#run()中的discoveryClient.register()發起了註冊

那麼怎麼能夠觸發註冊行爲呢?

// InstanceInfoReplicator#run()
public void run() {
    try {
        /**
         * 刷新 InstanceInfo
         * 一、刷新 DataCenterInfo
         * 二、刷新 LeaseInfo 租約信息
         * 三、根據HealthCheckHandler獲取InstanceStatus,並更新,若是狀態發生變化會觸發全部StatusChangeListener
         */
        discoveryClient.refreshInstanceInfo();

        // 若是isInstanceInfoDirty=true,返回dirtyTimestamp,不然是null
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();  //發起註冊
            instanceInfo.unsetIsDirty(dirtyTimestamp);  //isInstanceInfoDirty置爲false
        }
    } 
    catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } 
    finally { // 繼續下次任務
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

如上,先刷新InstanceInfo,刷新後若是發現有髒數據,即實例發生了變動,還未同步給Server的數據,就發起註冊

那麼在Eureka Client啓動的這種場景下,怎樣會觸發有髒數據下的註冊?

  • 由InstanceInfoReplicator複製器的自動定時任務在刷新InstanceInfo時發現有髒數據,並更新
  • InstanceInfoReplicator複製器提供onDemandUpdate()按需更新方法,一旦調用,立刻會submit()任務,其中會cancel自動更新任務,立刻執行InstanceInfoReplicator#run()


InstanceInfoReplicator複製器自動定時更新

InstanceInfoReplicator複製器在啓動建立DiscoveryClient時被建立並start()啓動

// InstanceInfoReplicator#start()
public void start(int initialDelayMs) { // 默認40s
    if (started.compareAndSet(false, true)) {
        instanceInfo.setIsDirty();  // for initial register 初始化時會將instanceInfo設置爲dirty
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

因此當自動更新啓動時會設置InstanceInfo爲髒數據,由於要觸發第一次向Server同步,那麼在40s後會調用InstanceInfoReplicator#run(),假設InstanceInfo並無其它變動,那麼也會發起discoveryClient.register()

注意:

正常狀況下是不會由延遲40s的第一次執行定時任務發起註冊,而是下面的onDemandUpdate() 主動按需更新發起註冊

若是設置@EnableDiscoveryClient(autoRegister = false) 或者 spring.cloud.service-registry.auto-registration.enabled=false,即放棄自動註冊,並在EurekaClientAutoConfiguration的以下方法打斷點

@Bean
public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
  return new EurekaDiscoveryClient(config, client);
}

會在斷點生效時觸發EurekaClient的實例化,而此EurekaClient就是一個DiscoveryClient,會啓動InstanceInfoReplicator自動定時更新線程,但因爲new InstanceInfoFactory().create(config)時本地實例狀態爲STARTING,因此註冊到Server端的狀態也是STARTING


onDemandUpdate() 主動按需更新

目前只有在ApplicationInfoManager#setInstanceStatus()更新實例狀態,且實例狀態真的發生變動,觸發StatusChangeListener狀態變動監聽器時,會調用onDemandUpdate立刻submit任務執行InstanceInfoReplicator#run(),再發起註冊

因爲Spring Cloud默認是啓用服務自動註冊AutoServiceRegistration的,因此在EurekaClientAutoConfiguration自動配置時會註冊服務自動註冊相關組件(EurekaRegistration、EurekaServiceRegistry、EurekaAutoServiceRegistration),其中EurekaAutoServiceRegistration實現了Spring的SmartLifecycle接口,會在Spring容器refresh要完畢時觸發生命週期方法start(),其中會使用EurekaServiceRegistry服務註冊器註冊EurekaRegistration這個本地實例信息

// EurekaServiceRegistry#register()
public void register(EurekaRegistration reg) {
    maybeInitializeClient(reg);

    if (log.isInfoEnabled()) {
        log.info("Registering application " + reg.getInstanceConfig().getAppname()
                + " with eureka with status "
                + reg.getInstanceConfig().getInitialStatus());
    }

    // 設置初始化狀態
    reg.getApplicationInfoManager()
            .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

    if (reg.getHealthCheckHandler() != null) {
        reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
    }
}

主要是設置初始化狀態步驟,而EurekaInstanceConfigBean本地實例信息的initialStatus初始化狀態爲 InstanceStatus.UP,因此狀態與new InstanceInfo()時的STARTING不一樣,發生了狀態變動,觸發在建立DiscoveryClient時設置的StatusChangeListener

...省略

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
};

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}

...省略

其中會調用 InstanceInfoReplicator.onDemandUpdate() 實例信息複製器作按需更新,立刻將UP狀態更新/註冊到Server端

因此,以我判斷,Eureka Client啓動時的自動註冊大多數應該是Spring Cloud的服務自動註冊機制,在Spring容器基本啓動完畢時,觸發服務自動註冊操做,其中會使用ApplicationInfoManager更新實例狀態爲初始狀態UP,一旦實例狀態變動會被立刻監聽到,執行復制器的InstanceInfoReplicator.onDemandUpdate()按需更新,立刻執行一次discoveryClient.register()操做

因此,下面就是分析 discoveryClient.register() 是怎麼註冊服務的


DiscoveryClient#register() 註冊

// DiscoveryClient#register()
boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

如上,註冊方法使用eurekaTransport的註冊客戶端registrationClient調用了register(instanceinfo)方法

EurekaTransportDiscoveryClient的內部類,其中包含

  • registrationClient 和 registrationClientFactory: 負責註冊、續約相關工做的EurekaHttpClientEurekaHttpClientFactory的實現類
  • queryClient 和 queryClientFactory: 負責獲取Server端服務列表的EurekaHttpClientEurekaHttpClientFactory的實現類
  • TransportClientFactory: 負責傳輸消息的客戶端工廠(底層用於和Server交互的http框架是 Jersey,此處的工廠就和Jersey相關)

那麼EurekaTransport的相關組件,尤爲是registrationClient註冊客戶端是如何初始化的呢?


registrationClient - 服務註冊相關的EurekaHttpClient

初始化是在DiscoveryClient的構造方法中

eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);

主要是scheduleServerEndpointTask()方法

private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                        AbstractDiscoveryClientOptionalArgs args) {

    Collection<?> additionalFilters = args == null
            ? Collections.emptyList()
            : args.additionalFilters;

    EurekaJerseyClient providedJerseyClient = args == null
            ? null
            : args.eurekaJerseyClient;
    
    TransportClientFactories argsTransportClientFactories = null;
    if (args != null && args.getTransportClientFactories() != null) {
        argsTransportClientFactories = args.getTransportClientFactories();
    }
    
    // Ignore the raw types warnings since the client filter interface changed between jersey 1/2
    @SuppressWarnings("rawtypes")
    TransportClientFactories transportClientFactories = argsTransportClientFactories == null
            ? new Jersey1TransportClientFactories()
            : argsTransportClientFactories;

    // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
    // 一、參數中是否提供了transportClientFactory的實現,沒有就使用Jersey1TransportClientFactories
    eurekaTransport.transportClientFactory = providedJerseyClient == null
            ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
            : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);

    ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
        @Override
        public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
            long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
            long delay = getLastSuccessfulRegistryFetchTimePeriod();
            if (delay > thresholdInMs) {
                logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
                        thresholdInMs, delay);
                return null;
            } else {
                return localRegionApps.get();
            }
        }
    };

    eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
            clientConfig,
            transportConfig,
            eurekaTransport.transportClientFactory,
            applicationInfoManager.getInfo(),
            applicationsSource
    );

    /**
     * 是否要想Eureka Server註冊
     * 二、建立RegistrationClient用於註冊的客戶端及工廠,並設置到eurekaTransport
     */ 
    if (clientConfig.shouldRegisterWithEureka()) {
        EurekaHttpClientFactory newRegistrationClientFactory = null;
        EurekaHttpClient newRegistrationClient = null;
        try {
            newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    transportConfig
            );
            newRegistrationClient = newRegistrationClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
        eurekaTransport.registrationClient = newRegistrationClient;
    }

    // 
    /**
     * 是否要從Server端獲取服務列表
     * 三、建立QueryClient用於查詢服務列表的客戶端及工廠,並設置到eurekaTransport
     */ 
    // new method (resolve from primary servers for read)
    // Configure new transport layer (candidate for injecting in the future)
    if (clientConfig.shouldFetchRegistry()) {
        EurekaHttpClientFactory newQueryClientFactory = null;
        EurekaHttpClient newQueryClient = null;
        try {
            newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    clientConfig,
                    transportConfig,
                    applicationInfoManager.getInfo(),
                    applicationsSource
            );
            newQueryClient = newQueryClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.queryClientFactory = newQueryClientFactory;
        eurekaTransport.queryClient = newQueryClient;
    }
}

因此,下面就是逐層深刻分析RegistrationClient用於註冊的客戶端及工廠是如何建立的?

因爲RegistrationClient實際上是一種EurekaHttpClient,而EurekaHttpClient是接口,其實現類不少

查看源碼發現,Netflix採用的是 Factory工廠 + 代理 的模式,從最外層建立的EurekaHttpClient工廠包含一個成員變量是另外一個代理的EurekaHttpClient工廠,每一個工廠生成的EurekaHttpClient功能不同,在從外層執行一個操做時,最外層的工廠執行其相關功能後,使用代理的工廠新建EurekaHttpClient實例,再調用其相同的方法,也實現這個EurekaHttpClient的相關功能,就這樣逐層深刻,各司其職後,最後使用Jersey發送POST請求到Eureka Server發起註冊,而這些EurekaHttpClient都是在com.netflix.discovery.shared.transport.decoratorEurekaHttpClient的包裝類的包下的,由外到內大體是:

  • SessionedEurekaHttpClient: 強制在必定時間間隔後重連EurekaHttpClient,防止永遠只鏈接特定Eureka Server,反過來保證了在Server端集羣拓撲發生變化時的負載重分配
    • RetryableEurekaHttpClient: 帶有重試功能,默認最多3次,在配置的全部候選Server地址中嘗試請求,成功重用,失敗會重試另外一Server,並維護隔離清單,下次跳過,當隔離數量達到閾值,清空隔離清單,從新開始
      • RedirectingEurekaHttpClient: Server端返回302重定向時,客戶端shutdown原EurekaHttpClient,根據response header中的Location新建EurekaHttpClient
        • MetricsCollectingEurekaHttpClient: 統計收集Metrics信息
          • JerseyApplicationClient: AbstractJerseyEurekaHttpClient的子類
            • AbstractJerseyEurekaHttpClient: 底層實現經過Jersery註冊、發心跳等的核心類
              • jerseyClient: Jersery客戶端


SessionedEurekaHttpClient - 定時重連

// SessionedEurekaHttpClient#execute()
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    long now = System.currentTimeMillis();
    long delay = now - lastReconnectTimeStamp;
    // 若是上次重連時間到如今已經超過了currentSessionDurationMs,關閉當前EurekaHttpClient
    if (delay >= currentSessionDurationMs) {
        logger.debug("Ending a session and starting anew");
        lastReconnectTimeStamp = now;
        currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
        TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
    }

    // 若是EurekaHttpClient爲空,clientFactory.newClient()重建
    EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
    if (eurekaHttpClient == null) {
        eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
    }
    
    // 繼續執行後續
    return requestExecutor.execute(eurekaHttpClient);
}


RetryableEurekaHttpClient - 候選範圍內失敗重試

// RetryableEurekaHttpClient#execute()
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    List<EurekaEndpoint> candidateHosts = null;
    int endpointIdx = 0;
    
    // 最多重試numberOfRetries(默認:3)
    for (int retry = 0; retry < numberOfRetries; retry++) {
        EurekaHttpClient currentHttpClient = delegate.get();//從AtomicReference<EurekaHttpClient>獲取當前EurekaHttpClient
        EurekaEndpoint currentEndpoint = null;
        if (currentHttpClient == null) {
            if (candidateHosts == null) {
                candidateHosts = getHostCandidates(); //返回候選集合 排除 已經失敗隔離的Host集合
                if (candidateHosts.isEmpty()) {
                    throw new TransportException("There is no known eureka server; cluster server list is empty");
                }
            }
            if (endpointIdx >= candidateHosts.size()) {
                throw new TransportException("Cannot execute request on any known server");
            }

            // 根據當前的下標獲取Endpoint,並新建 JerseyClient
            currentEndpoint = candidateHosts.get(endpointIdx++);
            currentHttpClient = clientFactory.newClient(currentEndpoint);
        }

        try {
            // 繼續後續執行
            EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
            // 若是根據當前操做類型 和 返回狀態碼,知足狀態計算器,記錄currentHttpClient可用,下次繼續使用
            // 返回狀態碼是:200、300、302,或者Register、SendHeartBeat狀況下是404
            if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                delegate.set(currentHttpClient);
                if (retry > 0) {
                    logger.info("Request execution succeeded on retry #{}", retry);
                }
                return response;
            }
            logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
        } 
        catch (Exception e) {
            logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
        }

        // Connection error or 5xx from the server that must be retried on another server
        // 請求失敗 或 報5xx錯誤,將delegate清空,重試另外一個Server,並將當前Endpoint放到隔離集合
        delegate.compareAndSet(currentHttpClient, null);
        if (currentEndpoint != null) {
            quarantineSet.add(currentEndpoint);
        }
    }
    
    // 屢次重試後仍沒法成功返回結果,上拋異常
    throw new TransportException("Retry limit reached; giving up on completing the request");
}


//########## RetryableEurekaHttpClient#getHostCandidates() 
// 返回 全部候選的Host節點數據 與 隔離集合 的數據差集
private List<EurekaEndpoint> getHostCandidates() {
    List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints(); //全部候選節點數據
    quarantineSet.retainAll(candidateHosts); //確保quarantineSet隔離集合中的數據都在candidateHosts中
                                             //當candidateHosts發生變化時也能及時清理quarantineSet隔離集合

    // If enough hosts are bad, we have no choice but start over again
    // 默認:0.66百分比
    int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
    
    // 隔離集合爲空
    if (quarantineSet.isEmpty()) {
        // no-op
    } 
    // 隔離數據已經大於閥值,不得已要從新開始,清空隔離集合
    else if (quarantineSet.size() >= threshold) { 
        logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
        quarantineSet.clear();
    } 
    // 隔離集合不爲空,也不大於閥值,排除隔離集合中的Endpoint後返回
    else { 
        List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
        for (EurekaEndpoint endpoint : candidateHosts) {
            if (!quarantineSet.contains(endpoint)) {
                remainingHosts.add(endpoint);
            }
        }
        candidateHosts = remainingHosts;
    }

    return candidateHosts;
}


//########## ServerStatusEvaluators#LEGACY_EVALUATOR
// Eureka Server返回狀態的計算器,計算不一樣場景下的不一樣狀態碼是否表明成功
private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {
    @Override
    public boolean accept(int statusCode, RequestType requestType) {
        if (statusCode >= 200 && statusCode < 300 || statusCode == 302) {
            return true;
        } else if (requestType == RequestType.Register && statusCode == 404) {
            return true;
        } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) {
            return true;
        } else if (requestType == RequestType.Cancel) {  // cancel is best effort
            return true;
        } else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) {
            return true;
        }
        return false;
    }
};


RedirectingEurekaHttpClient - 按Server端要求重定向到新Server

//########## RedirectingEurekaHttpClient#executeOnNewServer
// Server端返回302重定向時,客戶端shutdown原EurekaHttpClient,根據response header中的Location新建EurekaHttpClient
private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor,
                                                AtomicReference<EurekaHttpClient> currentHttpClientRef) {
    URI targetUrl = null;
    // 最多重定向默認10次
    for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount++) {
        EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get());
        
        // 若是返回的不是302重定向,返回response
        if (httpResponse.getStatusCode() != 302) {
            if (followRedirectCount == 0) {
                logger.debug("Pinning to endpoint {}", targetUrl);
            } else {
                logger.info("Pinning to endpoint {}, after {} redirect(s)", targetUrl, followRedirectCount);
            }
            return httpResponse;
        }

        // 從response中獲取Location,用於重建EurekaHttpClient
        targetUrl = getRedirectBaseUri(httpResponse.getLocation());
        if (targetUrl == null) {
            throw new TransportException("Invalid redirect URL " + httpResponse.getLocation());
        }

        currentHttpClientRef.getAndSet(null).shutdown();
        currentHttpClientRef.set(factory.newClient(new DefaultEndpoint(targetUrl.toString())));
    }
    String message = "Follow redirect limit crossed for URI " + serviceEndpoint.getServiceUrl();
    logger.warn(message);
    throw new TransportException(message);
}


MetricsCollectingEurekaHttpClient - 統計收集執行狀況

// MetricsCollectingEurekaHttpClient#execute()
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
    Stopwatch stopwatch = requestMetrics.latencyTimer.start(); //統計執行延時
    try {
        EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
        requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); //按狀態統計
        return httpResponse;
    } catch (Exception e) {
        requestMetrics.connectionErrors.increment(); //統計錯誤
        exceptionsMetric.count(e); //按異常名統計
        throw e;
    } finally {
        stopwatch.stop();
    }
}


AbstractJerseyEurekaHttpClient - 底層經過Jersery發送註冊、心跳請求

public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
    protected final Client jerseyClient; //真正處理請求的Jersery客戶端
    protected final String serviceUrl; //鏈接的Server端地址
    
    protected AbstractJerseyEurekaHttpClient(Client jerseyClient, String serviceUrl) {
        this.jerseyClient = jerseyClient;
        this.serviceUrl = serviceUrl;
        logger.debug("Created client for url: {}", serviceUrl);
    }
    
    /**
     * 註冊方法
     */
    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName(); //請求Eureka Server的【/apps/應用名】接口地址
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);  //實例InstanceInfo數據,經過Post請求body發過去
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } 
        finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    
    ...省略


Eureka Server接收到的註冊請求詳情

通過上面的步驟,客戶端已經能夠經過Jersery發送Http請求給Eureka Server端註冊,具體請求以下:

POST /eureka/apps/應用名 HTTP/1.1
Accept-Encoding: gzip
Content-Type: application/json
Accept: application/json
DiscoveryIdentity-Name: DefaultClient
DiscoveryIdentity-Version: 1.4
DiscoveryIdentity-Id: 192.168.70.132
Transfer-Encoding: chunked
Host: localhost:8001
Connection: Keep-Alive
User-Agent: Java-EurekaClient/v1.6.2

1a0
{"instance":{
​ "instanceId":"192.168.70.132:應用名:10001",
​ "hostName":"192.168.70.132",
​ "app":"應用名",
​ "ipAddr":"192.168.70.132",
​ "status":"UP",
​ "overriddenstatus":"UNKNOWN",
​ "port": { "\(":10001, "@enabled" : "true" }, ​ "securePort": { "\)":443, "@enabled" : "false"},
​ "countryId":1,
​ "dataCenterInfo":{"@class":"com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
​ "name":"MyOwn"
}


Eureka Client註冊流程總結

大致來講,Eureka Client的註冊是由Spring Cloud的AutoServiceRegistration自動註冊發起,在設置應用實例Instance初始狀態爲UP時,觸發了InstanceInfoReplicator#onDemandUpdate()按需更新方法,將實例Instance信息經過DiscoveryClient註冊到Eureka Server,期間通過了一些EurekaHttpClient的裝飾類,實現了諸如按期重連失敗重試註冊重定向、統計收集Metrics信息等功能,最後由JerseryClient發送POST請求調用Eureka Server的【/eureka/apps/應用名】端點,請求體攜帶InstanceInfo實例信息,完成註冊


  • EurekaAutoServiceRegistration#start(): 實現Spring的SmartLifecycle,在Spring容器refresh()最後一步finishRefresh()會調用生命週期的start()方法

    • EurekaServiceRegistry#register(EurekaRegistration): 使用服務註冊器註冊服務信息

      • ApplicationInfoManager#setInstanceStatus(初始狀態): 應用實例信息管理器更新初始狀態爲 UP

        • StatusChangeListener: 觸發實例狀態監聽(此Listener是在DiscoveryClient#initScheduledTasks()方法中設置的)

          • InstanceInfoReplicator.onDemandUpdate(): 實例狀態複製器執行按需狀態更新

            • DiscoveryClient#register(): DiscoveryClient發起註冊實例信息

              • EurekaHttpClientDecorator#execute(): 執行EurekaHttpClient的裝飾類,實現其各自功能

                SessionedEurekaHttpClient: 定時重連

                RetryableEurekaHttpClient: 候選範圍內失敗重試

                RedirectingEurekaHttpClient: 按Eureka Server端要求重定向到新Server

                MetricsCollectingEurekaHttpClient: 統計收集執行狀況

                • JerseyApplicationClient#register(): 封裝註冊請求數據
                  • JerseyClient發送Post註冊請求
相關文章
相關標籤/搜索