Spring Cloud Eureka 源碼分析(二) 客戶端啓動過程

一.前言

        咱們在以前的一篇文章分析了eureka服務端的啓動過程[Spring Cloud Eureka 源碼分析(一) 服務端啓動過程],如今分析一下eureka客戶端的啓動,eureka客戶端主要經過向註冊中心的註冊及續約,維持服務實例在註冊中心服務列表的存在,供服務調用方發現對外提供服務;html

         那麼eureka客戶端向註冊中心的註冊和續約過程是怎樣的呢?註冊中心又是怎麼保存服務實例信息的呢?接下來咱們分析一下源碼;java

二. Eureka客戶端的啓動過程

2.1 @EnableEurekaClient

        該註解註釋在應用程序的啓動入口類,來啓動eureka客戶端;spring

/**
 * Convenience annotation for clients to enable Eureka discovery configuration
 * (specifically). Use this (optionally) in case you want discovery and know for sure that
 * it is Eureka you want. All it does is turn on discovery and let the autoconfiguration
 * find the eureka classes if they are available (i.e. you need Eureka on the classpath as
 * well).
 *
 * @author Dave Syer
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableDiscoveryClient
public @interface EnableEurekaClient {

}

          經過這個註解的註釋說明咱們能夠看到,這個註解主要用於啓動eureka服務發現客戶端的配置,在這個註解上有@EnableDiscoveryClient的修飾,說明這個註解擁有@EnableDiscoveryClient的功能;bootstrap

         值得注意的是,對於Spring Cloud的服務發現功能,Eureka只是實現此功能的一個插件式的存在,Spring Cloud 服務發現還支持如alibaba的Dubbo融合,二者在使用過程當中效果徹底一致,不比糾結;數組

         @EnableDiscoveryClient: 開啓Spring Cloud 服務發現客戶端的註解;緩存

         @EnableEurekaClient : 開啓以Eureka爲Spring Cloud服務發現組件的客戶端的註解;併發

 

2.2 EnableDiscoveryClientImportSelector

        @EnableDiscoveryClient又是如何啓動服務的呢?app

/**
 * Annotation to enable a DiscoveryClient implementation.
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

	/**
	 * If true, the ServiceRegistry will automatically register the local server.
	 */
	boolean autoRegister() default true;
}

           主要經過@Import註解導入了一個配置交由spring管理:dom

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
		extends SpringFactoryImportSelector<EnableDiscoveryClient> {

	@Override
	public String[] selectImports(AnnotationMetadata metadata) {

        //調用父類的方法,拿到經過父類方法要注入的全路徑類名數組;
		String[] imports = super.selectImports(metadata);

        //得到該註解(@EnableDiscoveryClient)的全部屬性參數
		AnnotationAttributes attributes = AnnotationAttributes.fromMap(
				metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));

        //得到屬性autoRegister的值,該值默認是true的;
		boolean autoRegister = attributes.getBoolean("autoRegister");

        //根據註解配置來判斷是否要實例化下面的那個自動配置類
		if (autoRegister) {
			List<String> importsList = new ArrayList<>(Arrays.asList(imports));
			importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
			imports = importsList.toArray(new String[0]);
		}

		return imports;
	}

	@Override
	protected boolean isEnabled() {
		return new RelaxedPropertyResolver(getEnvironment()).getProperty(
				"spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE);
	}

	@Override
	protected boolean hasDefaultFactory() {
		return true;
	}

}

說明:ide

         1) 這個類經過繼承 抽象類 SpringFactoryImportSelector<T> 來實現AutoServiceRegistrationConfiguration配置類的條件實例化;

             抽象類 SpringFactoryImportSelector<T>直接實現了DeferredImportSelector接口,間接實現 ImportSelector 接口,

             ImportSelector的用法: 經過實現 ImportSelector 接口的  public String[] selectImports(AnnotationMetadata metadata);方法,返回要實例化的全路徑類名數組,來實現普通java類的注入;

2.3  AutoServiceRegistrationConfiguration     

@Configuration
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
public class AutoServiceRegistrationConfiguration {
}

        開啓了配置類:

/**
 * @author Spencer Gibb
 */
@ConfigurationProperties("spring.cloud.service-registry.auto-registration")
public class AutoServiceRegistrationProperties {

	/** If Auto-Service Registration is enabled, default to true. */
	private boolean enabled = true;

	/** Should startup fail if there is no AutoServiceRegistration, default to false. */
	private boolean failFast = false;

	public boolean isFailFast() {
		return failFast;
	}

	public void setFailFast(boolean failFast) {
		this.failFast = failFast;
	}
}

         這個類有兩個配置屬性,enable:自動服務註冊是否開啓,默認爲true,沒看懂是什麼意思,關鍵尚未getter/setter方法;

                                            isFailFast:是否啓動快速失敗,默認爲false,咱們查看下getter方法的調用,看一下幹什麼的;

@Configuration
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public class AutoServiceRegistrationAutoConfiguration {

	@Autowired(required = false)
	private AutoServiceRegistration autoServiceRegistration;

	@Autowired
	private AutoServiceRegistrationProperties properties;

	@PostConstruct
	protected void init() {
		if (autoServiceRegistration == null && this.properties.isFailFast()) {
			throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
		}
	}
}

       僅在這裏的init()方法有調用,

       能夠看到這個類實例化的條件@ConditionalOnBean(AutoServiceRegistrationProperties.class)是是否有以前的配置類AutoServiceRegistrationProperties存在;

       init()方法經過servlet註解 @PostConstruct使得該方法在該類構造函數執行後執行,從而達到自動執行的目的,在這裏檢驗了AutoServiceRegistration bean是否存在,不存在則拋出異常程序當即啓動失敗,那麼這個bean又是幹什麼,在哪裏實例化的呢?

       咱們查看引用:

              AutoServiceRegistration是一個接口,咱們查看其具體實現類的調用:

      在org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration#eurekaAutoServiceRegistration咱們發現了其實例化的位置;

2.4 EurekaClientAutoConfiguration的實例化條件

         EurekaClientAutoConfiguration 經過名字咱們可知這是Eureka客戶端自動配置類;

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration")
public class EurekaClientAutoConfiguration {

        ....  省略代碼先  ....

}
  1.   首先 @Configuration註解是的該配置類被spring所感知到
  2.  @EnableConfigurationProperties註解開啓對於經過@ConfigurationProperties引入外部配置文件屬性註冊成bean的支持;
  3.  @ConditionalOnClass(EurekaClientConfig.class) 該配置類實例化的第一個條件,容器上下文中存在EurekaClientConfig類;EurekaClientConfig的默認實現類提供了一些環境配置的屬性值;
  4.  @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class) 配置類實例化的另外一個條件,上下文中存在EurekaDiscoveryClientConfiguration.Marker的bean;

       這個bean是何時實例化的呢?經過調用咱們看到:

@ConditionalOnClass(ConfigServicePropertySourceLocator.class)
@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)
@Configuration
@Import({ EurekaDiscoveryClientConfiguration.class, // this emulates @EnableDiscoveryClient, the import selector doesn't run before the bootstrap phase
		EurekaClientAutoConfiguration.class })
public class EurekaDiscoveryClientConfigServiceBootstrapConfiguration {
}

     而在META-INF/spring.factories文件中:

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

    咱們在 BootstrapApplicationListener 這個類的 151行發現了調用; 經過 SpringFactoriesLoader 加載了這個配置,在onApplicationEvent(ApplicationEnvironmentPreparedEvent event)方法中被調用;

public class BootstrapApplicationListener
		implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {

        @Override
	    public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
 
        }

        ... 代碼省略 ...
}

         BootstrapApplicationListener 實現了 ApplicationListener 接口 而且指定了一個 ApplicationEnvironmentPreparedEvent 事件;

          這裏使用的是Spring的監聽器,利用觀察者模式(Observer),使用方法咱們會在另外一篇文章中講解;通俗來說就是咱們定義一個監聽器,和事件,當咱們發佈這個事件時,監聽器會監聽到,而後執行對應的操做,也叫<發佈--訂閱>;

         回到這裏面講,Spring的監聽器,Spring根據應用啓動的過程,提供了四種事件供咱們使用:

  • ApplicationStartedEvent :spring boot啓動開始時執行的事件
  • ApplicationEnvironmentPreparedEvent:spring boot 對應Enviroment已經準備完畢,但此時上下文context尚未建立。
  • ApplicationPreparedEvent:spring boot上下文context建立完成,但此時spring中的bean是沒有徹底加載完成的。
  • ApplicationFailedEvent:spring boot啓動異常時執行事件

         這樣就清晰了,當應用環境初始化完畢,可是尚未建立context(容器/上下文)的節點時,我的理解其實就是應用啓動到初始化完各類配置屬性後,context容器還未建立,各類bean尚未實例化的時候,經過發佈此時的時間來觸發監聽器,執行加載該配置文件而且實例化EurekaDiscoveryClientConfiguration.Marker這個bean;

     5.  @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) 實例化條件,默認爲true,這個屬性參數很熟悉了;

     6.  @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
      CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })

           這個註解的用法經過註釋咱們能夠知道,示意一個自動配置類應該在另一個特殊的自動配置類以前被實例化;用於指定配置的實例化順序;

  • NoopDiscoveryClientAutoConfiguration  :
    1. 直譯等待發現客戶端自動配置,實例化條件 @ConditionalOnMissingBean(DiscoveryClient.class) 
    2.  查看 接口DiscoveryClient的實現類有EurekaDiscoveryClient和NoopDiscoveryClient,該註解含義是若上下文中存在DiscoveryClient的bean,則實例化被註解的類,不然實例化DiscoveryClient,若實例化成功,再實例化被註解的類;因爲EurekaClientAutoConfiguration 實例化是先於 NoopDiscoveryClientAutoConfiguration ,而 EurekaDiscoveryClient在EurekaClientAutoConfiguration 中 147行實例化,所以這裏條件知足;
    3. 在這裏若是SpringCloud的服務發現模塊不是由Eureka實現的,或者沒有指定服務發現模塊,則這裏將會實例化NoopDiscoveryClient,也印證了Eureka是SpringCloud服務發現功能實現的一個選擇而已.NoopDiscoveryClient是否是SpringCloud默認的服務發現客戶端實現呢?
    4. 這個類實現了Spring的監聽器ApplicationListener接口,並指定了ContextRefreshedEvent(Spring容器加載完畢後)事件,因爲該類實例化在EurekaClientAutoConfiguration以後,因此該類實例化時服務發現客戶端已經實例化完畢,所以在容器加載完畢後發佈了實例化註冊事件,在服務應用實例註冊服務註冊中心以後,代表客戶端註冊完畢;
  • CommonsClientAutoConfiguration :
    1. 看類內容貌似是健康檢查的一些方法,這裏不重點講解了,以後有須要在寫一篇文章連接過去;
  • ServiceRegistryAutoConfiguration :
    /**
      *  看類內容,EndPoint主要是用來監控應用服務運行情況的;
      */
    
    @Configuration
    public class ServiceRegistryAutoConfiguration {
         
    
    
        /**
          * 實例化條件,該bean以前在EurekaClientAutoConfiguration中已經實例化完畢
          */
    	@ConditionalOnBean(ServiceRegistry.class)
    	@ConditionalOnClass(Endpoint.class)
    	protected class ServiceRegistryEndpointConfiguration {
    
            //該bean以前在EurekaClientAutoConfiguration中已經實例化完畢
    		@Autowired(required = false)
    		private Registration registration;
    
    		@Bean
    		public ServiceRegistryEndpoint serviceRegistryEndpoint(ServiceRegistry serviceRegistry) {
    			ServiceRegistryEndpoint endpoint = new ServiceRegistryEndpoint(serviceRegistry);
    			endpoint.setRegistration(registration);
    			return endpoint;
    		}
    	}
    }

         再來看上面主要想實例化的bean,ServiceRegistryEndpoint ,能夠看到提供了2個接口供調用,獲取狀態(getStatus)和修改狀態(setStatus);

    @ManagedResource(description = "Can be used to display and set the service instance status using the service registry")
    @SuppressWarnings("unchecked")
    public class ServiceRegistryEndpoint implements MvcEndpoint {
    
    	private final ServiceRegistry serviceRegistry;
    
    	private Registration registration;
    
    	public ServiceRegistryEndpoint(ServiceRegistry<?> serviceRegistry) {
    		this.serviceRegistry = serviceRegistry;
    	}
    
    	public void setRegistration(Registration registration) {
    		this.registration = registration;
    	}
    
    	@RequestMapping(path = "instance-status", method = RequestMethod.POST)
    	@ResponseBody
    	@ManagedOperation
    	public ResponseEntity<?> setStatus(@RequestBody String status) {
    		Assert.notNull(status, "status may not by null");
    
    		if (this.registration == null) {
    			return ResponseEntity.status(HttpStatus.NOT_FOUND).body("no registration found");
    		}
    
    		this.serviceRegistry.setStatus(this.registration, status);
    		return ResponseEntity.ok().build();
    	}
    
    	@RequestMapping(path = "instance-status", method = RequestMethod.GET)
    	@ResponseBody
    	@ManagedAttribute
    	public ResponseEntity getStatus() {
    		if (this.registration == null) {
    			return ResponseEntity.status(HttpStatus.NOT_FOUND).body("no registration found");
    		}
    
    		return ResponseEntity.ok().body(this.serviceRegistry.getStatus(this.registration));
    	}
    
    	@Override
    	public String getPath() {
    		return "/service-registry";
    	}
    
    	@Override
    	public boolean isSensitive() {
    		return true;
    	}
    
    	@Override
    	public Class<? extends Endpoint<?>> getEndpointType() {
    		return null;
    	}
    }

    關於EndPoint的用法咱們以後會寫一篇文章連接過去,專門介紹; //TODO

       7.  @AutoConfigureAfter(name = "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration")

            這個厲害了,居然要在EurekaClientAutoConfiguration以前實例化;咱們看下作了什麼;

            看類註釋是說環境中關聯屬性改變刷新做用域的自動配置類,//TODO

 這樣,EurekaClientAutoConfiguration即將被實例化;

2.5 EurekaClientAutoConfiguration實例化過程

2.5.1  Static Inner Class *2

            有兩個靜態內部類優先加載

  • EurekaClientConfiguration 
@Configuration
	@ConditionalOnMissingRefreshScope
	protected static class EurekaClientConfiguration {

		@Autowired
		private ApplicationContext context;

		@Autowired(required = false)
		private DiscoveryClientOptionalArgs optionalArgs;

		@Bean(destroyMethod = "shutdown")
		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
		public EurekaClient eurekaClient(ApplicationInfoManager manager,
				EurekaClientConfig config) {
			return new CloudEurekaClient(manager, config, this.optionalArgs,
					this.context);
		}

		@Bean
		@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
		public ApplicationInfoManager eurekaApplicationInfoManager(
				EurekaInstanceConfig config) {
			InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
			return new ApplicationInfoManager(config, instanceInfo);
		}
	}
  • RefreshableEurekaClientConfiguration 

@Configuration
	@ConditionalOnRefreshScope
	protected static class RefreshableEurekaClientConfiguration {

		@Autowired
		private ApplicationContext context;

		@Autowired(required = false)
		private DiscoveryClientOptionalArgs optionalArgs;

		@Bean(destroyMethod = "shutdown")
		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
		@org.springframework.cloud.context.config.annotation.RefreshScope
		@Lazy
		public EurekaClient eurekaClient(ApplicationInfoManager manager,
				EurekaClientConfig config, EurekaInstanceConfig instance) {
			manager.getInfo(); // force initialization
			return new CloudEurekaClient(manager, config, this.optionalArgs,
					this.context);
		}

		@Bean
		@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
		@org.springframework.cloud.context.config.annotation.RefreshScope
		@Lazy
		public ApplicationInfoManager eurekaApplicationInfoManager(
				EurekaInstanceConfig config) {
			InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
			return new ApplicationInfoManager(config, instanceInfo);
		}

	}

在上面首先實例化了ApplicationInfoManager 和EurekaClient 兩個bean,區別之處在因而否支持刷新,這裏先不介紹了,以後會有文章連接過去;//TODO

在實例化EurekaClient中,包含裏EurekaClient啓動的主要操做動做,咱們在第三節分析;

2.5.2 HasFeatures 

@Bean
	public HasFeatures eurekaFeature() {
		return HasFeatures.namedFeature("Eureka Client", EurekaClient.class);
	}

這個bean看調用是用於應用監控的;

2.5.3 EurekaClientConfigBean

 

        實例化了eureka客戶端一些配置信息的bean,該bean屬性值來自於eureka.client的配置字段;

2.5.4 EurekaInstanceConfigBean

        實例化了eureka實例配置信息的bean,該bean屬性值來自於eureka.instance的配置字段;

2.5.5 DiscoveryClient

2.5.6 EurekaServiceRegistry

          在以前講到ServiceRegistryAutoConfiguration的時候有提到過

 

三.  Eureka客戶端啓動主要流程

3.1 啓動入口

       經過步步深刻,咱們來到了 com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)這個方法;

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;

        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());

        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();

            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
            return;  // no need to setup up an network tasks and we are done
        }

        try {
            scheduler = Executors.newScheduledThreadPool(3,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        initScheduledTasks();
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }

        這裏面就是eurekaClient啓動的全部操做了;咱們主要看下面的這段代碼,摘自上面一大段中的一小段;

3.2 建立線程池

         咱們知道eureka客戶端維持在註冊中心的可用主要經過註冊和續約,爲定時任務;

//任務管理線程池
            scheduler = Executors.newScheduledThreadPool(3,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            //心跳線程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            //緩存刷新線程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()

3.3  初始化定時任務  initScheduledTasks()

            在這個方法中初始化了2個定時任務,分別是緩存刷新和心跳註冊/續約,二者初始化操做相同,咱們之後者爲例說明如何初始化定時任務的;

3.3.1 ScheduledExecutorService

              在這裏採用了java自帶的定時任務工具 ScheduledExecutorService,該工具是一個基於線程池的任務工具,用法請參考Executors的3種經常使用線程池及ScheduledExecutorService最後一段介紹關於ScheduledExecutorService的介紹;           

scheduler.schedule(                         
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

              看完上面連接的文章,咱們知道scheduler.schedule( ...params) 是延遲執行一次任務的方法,可是咱們知道,心跳續約是定時循環發生的,那麼這裏的操做就不符合咱們的指望啊!!!

              別急,咱們再仔細看一下這個方法裏面的參數,TimedSupervisorTask 值得深刻研究一下;

                                               

         TimedSupervisorTask 在頂層實現了Runnable 接口,  能夠做爲task傳入scheduler方法中; 接着查看  TimedSupervisorTask的run()方法;在這裏找到了咱們的答案;

         在這個方法中,咱們剝掉不重要的操做,留下主要的步驟:

public void run() {
        Future future = null;
        try {
            future = executor.submit(task);
            ... 省略部分代碼...
        } catch (TimeoutException e) {
            ... 省略部分代碼...
        } catch (RejectedExecutionException e) {
           ... 省略部分代碼...
        } catch (Throwable e) {
            ... 省略部分代碼...
        } finally {
             ... 省略部分代碼...
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

          首先調用線程池submit(task) 方法提交任務執行,在finally方法中再一次的將任務裝載進任務管理中scheduler.schedule()延遲執行;這樣每次執行完一次任務後從新裝載下一次任務,從而達到任務周而復始,不斷執行的目的;每30s執行一次;

3.3.2 續約renew()

          在com.netflix.discovery.DiscoveryClient.HeartbeatThread中定義了續約的操做,咱們查看renew()方法;

/**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = 
                  eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), 
                  instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, 
                  httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, 
                  instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

              在renew()這個方法中,首先向註冊中心執行了心跳續約的請求,StatusCode爲200成功,若爲404則執行register()從新註冊操做;

3.3.3 註冊register()      

/**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

            首先執行註冊請求,204說明註冊成功;  

3.4 註冊續約流程

3.4.1 註冊

        在3.3呢,初始化了定時任務,那麼第一次註冊是在哪裏呢?哈哈,在初始化定時任務方法initScheduledTasks()的最後一行:

instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

      InstanceInfoReplicator實現了Runnable接口,

public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

      在調用該方法時,將任務交由scheduler進行延遲執行一次,延遲時間爲默認40s;咱們找到run()方法;

public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

         能夠看到執行了第一次的註冊操做,時間節點大約在服務啓動後的第40s;而且在finally中再一次裝載了這個任務;可見註冊任務也是定時循環執行的;

        那麼正常狀況下,成功register一次,後續應該經過renew保持狀態,這又是怎麼處理的呢?

3.4.2  如何避免重複註冊

          eureka客戶端在第一次執行register註冊時,將標記com.netflix.appinfo.InstanceInfo#isInstanceInfoDirty置爲true,默認是false的;而且將在com.netflix.appinfo.InstanceInfo#lastDirtyTimestamp保存時間戳

public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

          在instanceInfo.setIsDirty()方法中:

public synchronized void setIsDirty() {
        isInstanceInfoDirty = true;
        lastDirtyTimestamp = System.currentTimeMillis();
    }

這樣第一次執行register操做時,知足if判斷條件;

public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            // //第一次註冊時,dirtyTimestamp 塞入了時間戳,這裏能夠拿的到
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();                  
            if (dirtyTimestamp != null) {
                discoveryClient.register();

                //第一次執行註冊register後,將isInstanceInfoDirty 置爲false
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }


    public synchronized void unsetIsDirty(long unsetDirtyTimestamp) {
        if (lastDirtyTimestamp <= unsetDirtyTimestamp) {
            isInstanceInfoDirty = false;
        } else {
        }
    }

    public synchronized Long isDirtyWithTime() {
        //被置爲false,拿不到時間戳,返回null,這樣最上面的run()方法在定時任務執行時不知足if條件,不會再次執行註冊方法
        if (isInstanceInfoDirty) {
            return lastDirtyTimestamp;
        } else {
            return null;
        }
    }

        經過上面的分析,咱們在這裏總結一下eureka客戶端正常註冊流程:  有一個boolean標記和時間戳記錄;經過維護Boolean標記來決定是否能夠得到時間戳記錄,從而決定是否在註冊定時任務中執行註冊register操做,所以正常狀況下經過定時任務觸發的註冊方法只會執行一次;那麼什麼狀況下register()會再次在定時任務中知足執行條件呢?注意這裏重點講的是經過定時任務觸發註冊方法執行,而不是register()方法的執行;

        咱們看run()方法中調用的第一個方法  discoveryClient.refreshInstanceInfo();  經過名字能夠看出這裏刷新了實例信息;咱們研究一下;

3.4.3 實例的刷新

          首先咱們打開這個方法:

void refreshInstanceInfo() {
        //1. 刷新數據中心信息若是須要
        applicationInfoManager.refreshDataCenterInfoIfRequired();
        //2. 刷新續約信息若是須要
        applicationInfoManager.refreshLeaseInfoIfRequired();

        InstanceStatus status;
        try {

            //經過健康檢查獲取實例狀態
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            status = InstanceStatus.DOWN;
        }

        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }

       先看第一個刷新applicationInfoManager.refreshDataCenterInfoIfRequired();

public void refreshDataCenterInfoIfRequired() {
        String existingAddress = instanceInfo.getHostName();

        String newAddress;
        if (config instanceof RefreshableInstanceConfig) {
            // Refresh data center info, and return up to date address
            newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
        } else {
            newAddress = config.getHostName(true);
        }
        String newIp = config.getIpAddress();

        if (newAddress != null && !newAddress.equals(existingAddress)) {
            logger.warn("The address changed from : {} => {}", existingAddress, newAddress);

            // :( in the legacy code here the builder is acting as a mutator.
            // This is hard to fix as this same instanceInfo instance is referenced elsewhere.
            // We will most likely re-write the client at sometime so not fixing for now.
            InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
            
      
     builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo());
            instanceInfo.setIsDirty();
        }
    }

         咱們看newAddress = config.getHostName(true);這個方法;

@Override
	public String getHostName(boolean refresh) {
		if (refresh && !this.hostInfo.override) {
			this.ipAddress = this.hostInfo.getIpAddress();
			this.hostname = this.hostInfo.getHostname();
		}
		return this.preferIpAddress ? this.ipAddress : this.hostname;
	}

         這裏涉及到配置文件的一個屬性配置: eureka.client.preferIpAddress,在上面方法的return 中完美的解釋了這個屬性的用法;在訪問服務的主機的時候,IP地址是否優先於主機名使用;默認是false的;

         上面刷新了服務實例信息;再看另一個刷新方法: refreshLeaseInfoIfRequired()

public void refreshLeaseInfoIfRequired() {
        LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
        if (leaseInfo == null) {
            return;
        }
        int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
        int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
        if (leaseInfo.getDurationInSecs() != currentLeaseDuration || 
            leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {
            LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
                    .setRenewalIntervalInSecs(currentLeaseRenewal)
                    .setDurationInSecs(currentLeaseDuration)
                    .build();
            instanceInfo.setLeaseInfo(newLeaseInfo);
            instanceInfo.setIsDirty();
        }
    }

        經過查看方法體咱們可知主要更新了續約信息,如續約的頻率和過時時長;

        在上述兩個刷新有內容更新時,都會將註冊定時任務的入口標記置爲true,使得能夠經過循環執行的註冊任務來觸發註冊;

3.4.4 週期上報狀態( InstanceInfoReplicator )

     com.netflix.discovery.DiscoveryClient#initScheduledTasks

// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
        this,
        instanceInfo,
        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
        2); // burstSize
//註冊了一個實例狀態監聽的狀態監聽類,當實例狀態有變化時調用notify方法進行通知
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
};

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//主動啓動了一個週期上報狀態的任務
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

com.netflix.discovery.InstanceInfoReplicator實現了Runnable接口,說明這是一個任務類;

看一下run()方法:

public void run() {
    try {
        //刷新服務實例狀態,若是狀態有更改,則會更新實例狀態變動時間
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        若實例狀態變動時間不爲null,說明實例狀態有變動,則此時調用register向服務端從新註冊實例
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
         //同時裝載下一次任務,實現週期執行
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

該上報狀態週期的任務有2處觸發入口,第一個就是主動啓動,在上面提到的:

instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

另外一處就是實例狀態監聽類的notify觸發的,最終執行的方法都是任務類的run方法;

這裏存在一個併發的問題,當週期執行且實例狀態發生變動就會有2個任務被裝載,所以在notify觸發入口能夠看見

public boolean onDemandUpdate() {
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        scheduler.submit(new Runnable() {
            @Override
            public void run() {
                logger.debug("Executing on-demand update of local InstanceInfo");

                Future latestPeriodic = scheduledPeriodicRef.get();
                //當上一個任務沒有被執行的時候,任務將被取消
                if (latestPeriodic != null && !latestPeriodic.isDone()) {
                    logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                    latestPeriodic.cancel(false);
                }

                InstanceInfoReplicator.this.run();
            }
        });
        return true;
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

這裏主要作了實例狀態上報的工做,雖然服務實例註冊在註冊中心,但實例的狀態有

public enum InstanceStatus {
    UP, // Ready to receive traffic
    DOWN, // Do not send traffic- healthcheck callback failed
    STARTING, // Just about starting- initializations to be done - do not
    // send traffic
    OUT_OF_SERVICE, // Intentionally shutdown for traffic
    UNKNOWN;

    public static InstanceStatus toEnum(String s) {
        for (InstanceStatus e : InstanceStatus.values()) {
            if (e.name().equalsIgnoreCase(s)) {
                return e;
            }
        }
        return UNKNOWN;
    }
}

等五種,須要定時向註冊中心同步服務狀態用於註冊中心客戶端提供可用服務列表;

3.5 緩存刷新任務

       這裏由實例狀態引出來的,那麼咱們先看客戶端是如何獲取遠端(註冊中心)實例信息的;       

private synchronized void updateInstanceRemoteStatus() {
        // Determine this instance's status for this app and set to UNKNOWN if not found
        InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
        if (instanceInfo.getAppName() != null) {
            Application app = getApplication(instanceInfo.getAppName());
            if (app != null) {
                InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
                if (remoteInstanceInfo != null) {
                    currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
                }
            }
        }
        if (currentRemoteInstanceStatus == null) {
            currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
        }

        // Notify if status changed
        if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
            onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
            lastRemoteInstanceStatus = currentRemoteInstanceStatus;
        }
    }

     咱們看currentRemoteInstanceStatus這個值得獲取,看到 getApplication(instanceInfo.getAppName()); 

@Override
    public Application getApplication(String appName) {
        return getApplications().getRegisteredApplications(appName);
    }

    getApplications()獲取了全部實例的集合,根據appName獲取目標應用實例信息;而getApplications()的值來源於com.netflix.discovery.DiscoveryClient#localRegionApps變量;那麼必定有一個地方爲這個變量塞入了值,並且是定時循環的任務;

咱們查看localRegionApps的set()方法,跟蹤到了這裏 com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry:

private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

        能夠看到變量localRegionApps來自於變量apps,而apps = httpResponse.getEntity();這裏經過請求註冊中心來獲取服務實例信息的;

        咱們在深刻跟蹤下這個方法的觸發; 有2個觸發的入口;

/**
     * The task that fetches the registry information at specified intervals.
     *
     */
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

     這是以前沒有仔細介紹的緩存刷新任務,定時循環執行,查看該類的註釋能夠知道,這個任務主要的功能就是用來獲取註冊信息的,從註冊中心獲取註冊列表;

    另一個地方在這裏,初始化任務線程池以後,初始化任務以前調用;

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        initScheduledTasks();
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

     經過代碼可知,若是配置獲取註冊列表且獲取註冊中心服務列表失敗則會拿以前的備份;

     緩存默認第一次是全量拉出服列表;配置屬性爲

eureka:
  client:
    disable-delta: false

以後的是默認增量拉取,若是增量拉取失敗,則會全量拉取;具體邏輯在

com.netflix.discovery.DiscoveryClient#getAndUpdateDelta中;

四. 結語

    Eureka客戶端到這裏基本上介紹完畢了

    最後總結一下eureka客戶端作的事情;

    1.根據配置文件初始化bean,建立客戶端實例信息 InstanceInfo

    2.第一次全量拉取註冊中心服務列表(url=/apps),初始化週期任務:

        (2.1) CacheRefreshThread  定時刷新本地緩存服務列表,如果客戶端第一次拉取,則會全量拉取,後面則增量拉取.若增量拉取失敗則全量拉取,配置屬性爲eureka.client.registryFetchIntervalSeconds=30默認拉取一次;

         (2.2) HeartbeatThread 經過renew()續約任務,維持於註冊中心的心跳(url=/apps/${appName}/${id}),若返回狀態碼爲404則說明該服務實例沒有在註冊中心註冊,執行register()向註冊中心註冊實例信息;

        (2.3) ApplicationInfoManager.StatusChangeListener 註冊實例狀態監聽類,監聽服務實例狀態變化,向註冊中心同步實例狀態;

               InstanceInfoReplicator  定時刷新實例狀態,並向註冊中心同步,默認eureka.client.instanceInfoReplicationIntervalSeconds=30執行一次.若實例狀態有變動,則從新執行註冊;

 

 

...待續

相關文章
相關標籤/搜索