目錄java
本文使用Spring Cloud Eureka分析spring
Spring Cloud版本: Dalston.SR5json
spring-cloud-starter-eureka版本: 1.3.6.RELEASEbootstrap
netflix eureka版本: 1.6.2緩存
首先從使用Eureka Client必須引入的@EnableDiscoveryClient
註解提及服務器
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { /** * If true, the ServiceRegistry will automatically register the local server. */ boolean autoRegister() default true; }
@EnableDiscoveryClient
註解的做用:session
autoRegister默認值爲true,即服務發現客戶端默認會自動註冊到服務端app
Import導入EnableDiscoveryClientImportSelector.class
,其做用是框架
導入了 spring-cloud-eureka-client.jar!\META-INF\spring.factories 中的 EurekaDiscoveryClientConfiguration
dom
org.springframework.cloud.client.discovery.EnableDiscoveryClient=\ org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
因爲autoRegister默認爲true,故還會導入AutoServiceRegistrationConfiguration
,即啓用自動服務註冊的配置,等同於在配置文件中spring.cloud.service-registry.auto-registration.enabled = true
EurekaClientAutoConfiguration
配置類知足啓用條件RefreshScopeRefreshedEvent
事件的監聽器,知足在使用RefreshScope刷新時能夠重建EurekaClient(不是本文重點)eureka.client.healthcheck.enabled=true
的前提下,向Spring容器註冊EurekaHealthCheckHandler
用於健康檢查(不是本文重點)因此,EurekaDiscoveryClientConfiguration的主要做用是向Spring容器註冊EurekaDiscoveryClientConfiguration.Marker.class,使得EurekaClientAutoConfiguration
配置類知足啓用條件
EurekaClientAutoConfiguration
配置類中涉及的內容比較多,主要內容:
EurekaClientConfigBean
,這是個對netflix的EurekaClientConfig
客戶端配置接口的實現EurekaInstanceConfigBean
,這是個對netflix的EurekaInstanceConfig
實例信息配置接口的實現EurekaRegistration
: Eureka實例的服務註冊信息(在開啓客戶端自動註冊時纔會註冊)EurekaServiceRegistry
: Eureka服務註冊器EurekaAutoServiceRegistration
: Eureka服務自動註冊器,實現了SmartLifecycle,會在Spring容器的refresh的最後階段被調用,經過EurekaServiceRegistry
註冊器註冊EurekaRegistration
信息EurekaClient
和ApplicationInfoManager
,註冊時分爲兩種狀況,便是否知足RefreshScope,若是知足,注入的Bean是帶有 @Lazy + @RefreshScope 註解的
ApplicationInfoManager
: 管理並初始化當前Instance實例的註冊信息,並提供了實例狀態監聽機制EurekaClient
: netflix的接口類,用於和Eureka Server交互的客戶端,而netflix的默認實現是DiscoveryClient
,也是本文分析的重點EurekaHealthIndicator
,爲/health端點提供Eureka相關信息,主要有Status當前實例狀態和applications服務列表,在從Eureka Server獲取服務列表正常的狀況下,Status使用Eureka Server上的InstanceRemoteStatus,不正常狀況下,代碼中有一些判斷邏輯public class EurekaClientAutoConfiguration { ...省略 /** * 一、註冊EurekaClientConfigBean */ @Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean() { EurekaClientConfigBean client = new EurekaClientConfigBean(); if ("bootstrap".equals(propertyResolver.getProperty("spring.config.name"))) { // We don't register during bootstrap by default, but there will be another // chance later. client.setRegisterWithEureka(false); } return client; } /** * 二、註冊EurekaInstanceConfigBean */ @Bean @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT) public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils) throws MalformedURLException { PropertyResolver eurekaPropertyResolver = new RelaxedPropertyResolver(this.env, "eureka.instance."); String hostname = eurekaPropertyResolver.getProperty("hostname"); boolean preferIpAddress = Boolean.parseBoolean(eurekaPropertyResolver.getProperty("preferIpAddress")); int nonSecurePort = Integer.valueOf(propertyResolver.getProperty("server.port", propertyResolver.getProperty("port", "8080"))); int managementPort = Integer.valueOf(propertyResolver.getProperty("management.port", String.valueOf(nonSecurePort))); String managementContextPath = propertyResolver.getProperty("management.contextPath", propertyResolver.getProperty("server.contextPath", "/")); EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils); instance.setNonSecurePort(nonSecurePort); instance.setInstanceId(getDefaultInstanceId(propertyResolver)); instance.setPreferIpAddress(preferIpAddress); if (managementPort != nonSecurePort && managementPort != 0) { if (StringUtils.hasText(hostname)) { instance.setHostname(hostname); } String statusPageUrlPath = eurekaPropertyResolver.getProperty("statusPageUrlPath"); String healthCheckUrlPath = eurekaPropertyResolver.getProperty("healthCheckUrlPath"); if (!managementContextPath.endsWith("/")) { managementContextPath = managementContextPath + "/"; } if (StringUtils.hasText(statusPageUrlPath)) { instance.setStatusPageUrlPath(statusPageUrlPath); } if (StringUtils.hasText(healthCheckUrlPath)) { instance.setHealthCheckUrlPath(healthCheckUrlPath); } String scheme = instance.getSecurePortEnabled() ? "https" : "http"; URL base = new URL(scheme, instance.getHostname(), managementPort, managementContextPath); instance.setStatusPageUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getStatusPageUrlPath(), '/')).toString()); instance.setHealthCheckUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getHealthCheckUrlPath(), '/')).toString()); } return instance; } /** * 三、註冊客戶端自動註冊相關組件 * EurekaRegistration: Eureka實例的服務註冊信息(在開啓客戶端自動註冊時纔會註冊) * EurekaServiceRegistry: Eureka服務註冊器 * EurekaAutoServiceRegistration: Eureka服務自動註冊器, * 經過EurekaServiceRegistry註冊器註冊EurekaRegistration信息 */ @Bean public EurekaServiceRegistry eurekaServiceRegistry() { return new EurekaServiceRegistry(); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager) { return EurekaRegistration.builder(instanceConfig) .with(applicationInfoManager) .with(eurekaClient) .with(healthCheckHandler) .build(); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); } /** * 四、註冊netflix的 EurekaClient 和 ApplicationInfoManager */ // 若是禁用客戶端自動註冊,在此方法debug打斷點會觸發服務註冊,狀態爲STARTING @Bean public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) { return new EurekaDiscoveryClient(config, client); } // 普通的EurekaClient配置(不可刷新) @Configuration @ConditionalOnMissingRefreshScope protected static class EurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired(required = false) private DiscoveryClientOptionalArgs optionalArgs; @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } } // 可刷新的EurekaClient配置類 @Configuration @ConditionalOnRefreshScope //知足@ConditionalOnClass(RefreshScope.class) // @ConditionalOnBean(RefreshAutoConfiguration.class) protected static class RefreshableEurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired(required = false) private DiscoveryClientOptionalArgs optionalArgs; // 註冊CloudEurekaClient,是com.netflix.discovery.EurekaClient接口的實現類 @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) { manager.getInfo(); // force initialization return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } // 註冊ApplicationInfoManager @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } } /** * 五、註冊 EurekaHealthIndicator */ @Configuration @ConditionalOnClass(Endpoint.class) protected static class EurekaHealthIndicatorConfiguration { @Bean @ConditionalOnMissingBean public EurekaHealthIndicator eurekaHealthIndicator(EurekaClient eurekaClient, EurekaInstanceConfig instanceConfig, EurekaClientConfig clientConfig) { return new EurekaHealthIndicator(eurekaClient, instanceConfig, clientConfig); } } }
如上,在知足一系列Conditional條件後,會向Spring容器中註冊CloudEurekaClient
,它是com.netflix.discovery.EurekaClient接口的實現類,具體繼承實現關係以下
如上圖所示,剛剛建立的CloudEurekaClient
是 com.netflix.discovery.DiscoveryClient
的子類,它們都實現了com.netflix.discovery.EurekaClient
接口
EurekaClient
是Netflix對服務發現客戶端抽象的接口,包含不少方法,而DiscoveryClient
是其默認實現,也是本文分析的重點,CloudEurekaClient
是spring cloud的實現,根據類上註釋,其主要重寫了onCacheRefreshed()方法
,這個方法主要是從Eureka Server fetchRegistry()
獲取服務列表以後用於以廣播方式通知緩存刷新事件的,其實DiscoveryClient
也有onCacheRefreshed()方法
的實現,但因爲DiscoveryClient
是Netflix的類,只發送了com.netflix.discovery.EurekaEvent,而CloudEurekaClient
使用Spring的ApplicationEventPublisher
,發送了HeartbeatEvent
注意:
上面說的都是netflix的DiscoveryClient
還有另外一個DiscoveryClient,是
org.springframework.cloud.client.discovery.DiscoveryClient
是Spring對服務發現客戶端的抽象
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { /** * AbstractDiscoveryClientOptionalArgs 是DiscoveryClient的可選參數,可理解爲擴展點 * 包含healthCheckHandlerProvider、healthCheckCallbackProvider、eventListeners等 * spring cloud默認實現爲MutableDiscoveryClientOptionalArgs,但此處相關成員變量賦值後認爲空 */ if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); // 若是 shouldFetchRegistry=true,註冊netflix servo監控 if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } // 若是 shouldRegisterWithEureka=true,註冊netflix servo監控 if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); // 若是既不要向eureka server註冊,又不要獲取服務列表,就什麼都不用初始化 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } // 【重點】建立各類Executor 和 eurekaTransport、instanceRegionChecker try { // 執行定時任務的定時器,定時線程名爲 DiscoveryClient-%d // 在定時器中用於定時執行TimedSupervisorTask監督任務,監督任務會強制超時 和 記錄監控數據 scheduler = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 執行heartbeat心跳任務的執行器,默認最大線程數=2,線程名爲:DiscoveryClient-HeartbeatExecutor-%d heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 執行服務列表緩存刷新的執行器,默認最大線程數=2,線程名爲:DiscoveryClient-CacheRefreshExecutor-%d cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); // 初始化eurekaTransport在服務註冊,獲取服務列表時的client scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // 若是須要從eureka server獲取服務列表,而且嘗試fetchRegistry(false)失敗,調用BackupRegistry if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // 【重點】初始化全部定時任務 initScheduledTasks(); // 添加servo監控 try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
上面的DiscoveryClient構造方法代碼比較多,但多數都是一些賦值,本次分析的重點在註釋中已經標出,建立了各類Executor 和 eurekaTransport、instanceRegionChecker,以後又調用initScheduledTasks()方法
初始化全部這些定時任務
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { // 一、若是要從Eureka Server獲取服務列表 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer // 從eureka服務器獲取註冊表信息的頻率(默認30s) // 同時也是單次獲取服務列表的超時時間 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); // 若是緩存刷新超時,下一次執行的delay最大是registryFetchIntervalSeconds的幾倍(默認10),默認每次執行是上一次的2倍 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); /** * 【#### 執行CacheRefreshThread,服務列表緩存刷新任務 ####】 * 執行TimedSupervisorTask監督任務的定時器,具體執行器爲cacheRefreshExecutor,任務爲CacheRefreshThread */ scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", //監控名 scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, //指定具體任務的超時時間 TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } // 二、若是要註冊到Eureka Server if (clientConfig.shouldRegisterWithEureka()) { // 續租的時間間隔(默認30s) int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 若是心跳任務超時,下一次執行的delay最大是renewalIntervalInSecs的幾倍(默認10),默認每次執行是上一次的2倍 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer /** * 【#### 執行HeartbeatThread,發送心跳數據 ####】 * 執行TimedSupervisorTask監督任務的定時器,具體執行器爲heartbeatExecutor,任務爲HeartbeatThread */ scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator /** * 【#### InstanceInfo複製器 ####】 * 啓動後臺定時任務scheduler,線程名爲 DiscoveryClient-InstanceInfoReplicator-%d * 默認每30s執行一次定時任務,查看Instance信息(DataCenterInfo、LeaseInfo、InstanceStatus)是否有變化 * 若是有變化,執行 discoveryClient.register() */ instanceInfoReplicator = new InstanceInfoReplicator( this, //當前DiscoveryClient instanceInfo, //當前實例信息 clientConfig.getInstanceInfoReplicationIntervalSeconds(),//InstanceInfo的複製間隔(默認30s) 2); // burstSize /** * 【StatusChangeListener 狀態改變監聽器】 */ statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } //使用InstanceInfo複製器 scheduler.submit()一個Runnable任務 //後臺立刻執行 discoveryClient.register() instanceInfoReplicator.onDemandUpdate(); } }; /** * 是否關注Instance狀態變化,使用後臺線程將狀態同步到eureka server(默認true) * 調用 ApplicationInfoManager#setInstanceStatus(status) 會觸發 * 將 StatusChangeListener 註冊到 ApplicationInfoManager */ if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 啓動InstanceInfo複製器 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } // 當前服務實例不註冊到Eureka Server else { logger.info("Not registering with Eureka server per configuration"); } }
總的來講initScheduledTasks()
作了如下幾件事:
CacheRefreshThread
,即從Eureka Server獲取服務列表,也刷新客戶端緩存HeartbeatThread
,即客戶端向Eureka Server發送心跳discoveryClient.register()
,將實例信息同步到Server端由建立DiscoveryClient的過程可知,建立了不少定時執行線程,如定時從Server端刷新服務列表的CacheRefreshThread,定時報心跳續約的HeartbeatThread,還有用於更新並複製本地實例狀態到Server端的InstanceInfo複製器定時線程,而正是InstanceInfoReplicator#run()
中的discoveryClient.register()
發起了註冊
那麼怎麼能夠觸發註冊行爲呢?
// InstanceInfoReplicator#run() public void run() { try { /** * 刷新 InstanceInfo * 一、刷新 DataCenterInfo * 二、刷新 LeaseInfo 租約信息 * 三、根據HealthCheckHandler獲取InstanceStatus,並更新,若是狀態發生變化會觸發全部StatusChangeListener */ discoveryClient.refreshInstanceInfo(); // 若是isInstanceInfoDirty=true,返回dirtyTimestamp,不然是null Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); //發起註冊 instanceInfo.unsetIsDirty(dirtyTimestamp); //isInstanceInfoDirty置爲false } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { // 繼續下次任務 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
如上,先刷新InstanceInfo,刷新後若是發現有髒數據,即實例發生了變動,還未同步給Server的數據,就發起註冊
那麼在Eureka Client啓動的這種場景下,怎樣會觸發有髒數據下的註冊?
onDemandUpdate()
按需更新方法,一旦調用,立刻會submit()任務,其中會cancel自動更新任務,立刻執行InstanceInfoReplicator#run()
InstanceInfoReplicator複製器在啓動建立DiscoveryClient時被建立並start()啓動
// InstanceInfoReplicator#start() public void start(int initialDelayMs) { // 默認40s if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register 初始化時會將instanceInfo設置爲dirty Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
因此當自動更新啓動時會設置InstanceInfo爲髒數據,由於要觸發第一次向Server同步,那麼在40s後會調用InstanceInfoReplicator#run()
,假設InstanceInfo並無其它變動,那麼也會發起discoveryClient.register()
注意:
正常狀況下是不會由延遲40s的第一次執行定時任務發起註冊,而是下面的onDemandUpdate() 主動按需更新發起註冊
若是設置@EnableDiscoveryClient(autoRegister = false) 或者 spring.cloud.service-registry.auto-registration.enabled=false,即放棄自動註冊,並在EurekaClientAutoConfiguration的以下方法打斷點
@Bean public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) { return new EurekaDiscoveryClient(config, client); }會在斷點生效時觸發EurekaClient的實例化,而此EurekaClient就是一個DiscoveryClient,會啓動InstanceInfoReplicator自動定時更新線程,但因爲new InstanceInfoFactory().create(config)時本地實例狀態爲STARTING,因此註冊到Server端的狀態也是STARTING
目前只有在ApplicationInfoManager#setInstanceStatus()更新實例狀態,且實例狀態真的發生變動,觸發StatusChangeListener狀態變動監聽器時,會調用onDemandUpdate
立刻submit任務執行InstanceInfoReplicator#run()
,再發起註冊
因爲Spring Cloud默認是啓用服務自動註冊AutoServiceRegistration的,因此在EurekaClientAutoConfiguration
自動配置時會註冊服務自動註冊相關組件(EurekaRegistration、EurekaServiceRegistry、EurekaAutoServiceRegistration),其中EurekaAutoServiceRegistration
實現了Spring的SmartLifecycle接口,會在Spring容器refresh要完畢時觸發生命週期方法start(),其中會使用EurekaServiceRegistry
服務註冊器註冊EurekaRegistration
這個本地實例信息
// EurekaServiceRegistry#register() public void register(EurekaRegistration reg) { maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } // 設置初始化狀態 reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); if (reg.getHealthCheckHandler() != null) { reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler()); } }
主要是設置初始化狀態步驟,而EurekaInstanceConfigBean
本地實例信息的initialStatus初始化狀態爲 InstanceStatus.UP,因此狀態與new InstanceInfo()時的STARTING不一樣,發生了狀態變動,觸發在建立DiscoveryClient時設置的StatusChangeListener
...省略 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } ...省略
其中會調用 InstanceInfoReplicator.onDemandUpdate() 實例信息複製器作按需更新,立刻將UP狀態更新/註冊到Server端
因此,以我判斷,Eureka Client啓動時的自動註冊大多數應該是Spring Cloud的服務自動註冊機制,在Spring容器基本啓動完畢時,觸發服務自動註冊操做,其中會使用ApplicationInfoManager更新實例狀態爲初始狀態UP,一旦實例狀態變動會被立刻監聽到,執行復制器的InstanceInfoReplicator.onDemandUpdate()按需更新,立刻執行一次discoveryClient.register()操做
因此,下面就是分析 discoveryClient.register() 是怎麼註冊服務的
// DiscoveryClient#register() boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
如上,註冊方法使用eurekaTransport
的註冊客戶端registrationClient
調用了register(instanceinfo)方法
EurekaTransport
是DiscoveryClient
的內部類,其中包含
EurekaHttpClient
和 EurekaHttpClientFactory
的實現類EurekaHttpClient
和 EurekaHttpClientFactory
的實現類那麼EurekaTransport
的相關組件,尤爲是registrationClient
註冊客戶端是如何初始化的呢?
初始化是在DiscoveryClient
的構造方法中
eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args);
主要是scheduleServerEndpointTask()
方法
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport, AbstractDiscoveryClientOptionalArgs args) { Collection<?> additionalFilters = args == null ? Collections.emptyList() : args.additionalFilters; EurekaJerseyClient providedJerseyClient = args == null ? null : args.eurekaJerseyClient; TransportClientFactories argsTransportClientFactories = null; if (args != null && args.getTransportClientFactories() != null) { argsTransportClientFactories = args.getTransportClientFactories(); } // Ignore the raw types warnings since the client filter interface changed between jersey 1/2 @SuppressWarnings("rawtypes") TransportClientFactories transportClientFactories = argsTransportClientFactories == null ? new Jersey1TransportClientFactories() : argsTransportClientFactories; // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity // 一、參數中是否提供了transportClientFactory的實現,沒有就使用Jersey1TransportClientFactories eurekaTransport.transportClientFactory = providedJerseyClient == null ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo()) : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient); ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() { @Override public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) { long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit); long delay = getLastSuccessfulRegistryFetchTimePeriod(); if (delay > thresholdInMs) { logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}", thresholdInMs, delay); return null; } else { return localRegionApps.get(); } } }; eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver( clientConfig, transportConfig, eurekaTransport.transportClientFactory, applicationInfoManager.getInfo(), applicationsSource ); /** * 是否要想Eureka Server註冊 * 二、建立RegistrationClient用於註冊的客戶端及工廠,並設置到eurekaTransport */ if (clientConfig.shouldRegisterWithEureka()) { EurekaHttpClientFactory newRegistrationClientFactory = null; EurekaHttpClient newRegistrationClient = null; try { newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, transportConfig ); newRegistrationClient = newRegistrationClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.registrationClientFactory = newRegistrationClientFactory; eurekaTransport.registrationClient = newRegistrationClient; } // /** * 是否要從Server端獲取服務列表 * 三、建立QueryClient用於查詢服務列表的客戶端及工廠,並設置到eurekaTransport */ // new method (resolve from primary servers for read) // Configure new transport layer (candidate for injecting in the future) if (clientConfig.shouldFetchRegistry()) { EurekaHttpClientFactory newQueryClientFactory = null; EurekaHttpClient newQueryClient = null; try { newQueryClientFactory = EurekaHttpClients.queryClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, clientConfig, transportConfig, applicationInfoManager.getInfo(), applicationsSource ); newQueryClient = newQueryClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.queryClientFactory = newQueryClientFactory; eurekaTransport.queryClient = newQueryClient; } }
因此,下面就是逐層深刻分析RegistrationClient用於註冊的客戶端及工廠是如何建立的?
因爲RegistrationClient實際上是一種EurekaHttpClient
,而EurekaHttpClient
是接口,其實現類不少
查看源碼發現,Netflix採用的是 Factory工廠 + 代理 的模式,從最外層建立的EurekaHttpClient工廠包含一個成員變量是另外一個代理的EurekaHttpClient工廠,每一個工廠生成的EurekaHttpClient功能不同,在從外層執行一個操做時,最外層的工廠執行其相關功能後,使用代理的工廠新建EurekaHttpClient實例,再調用其相同的方法,也實現這個EurekaHttpClient的相關功能,就這樣逐層深刻,各司其職後,最後使用Jersey發送POST請求到Eureka Server發起註冊,而這些EurekaHttpClient都是在com.netflix.discovery.shared.transport.decorator
EurekaHttpClient的包裝類的包下的,由外到內大體是:
// SessionedEurekaHttpClient#execute() @Override protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { long now = System.currentTimeMillis(); long delay = now - lastReconnectTimeStamp; // 若是上次重連時間到如今已經超過了currentSessionDurationMs,關閉當前EurekaHttpClient if (delay >= currentSessionDurationMs) { logger.debug("Ending a session and starting anew"); lastReconnectTimeStamp = now; currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs); TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null)); } // 若是EurekaHttpClient爲空,clientFactory.newClient()重建 EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get(); if (eurekaHttpClient == null) { eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient()); } // 繼續執行後續 return requestExecutor.execute(eurekaHttpClient); }
// RetryableEurekaHttpClient#execute() @Override protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { List<EurekaEndpoint> candidateHosts = null; int endpointIdx = 0; // 最多重試numberOfRetries(默認:3) for (int retry = 0; retry < numberOfRetries; retry++) { EurekaHttpClient currentHttpClient = delegate.get();//從AtomicReference<EurekaHttpClient>獲取當前EurekaHttpClient EurekaEndpoint currentEndpoint = null; if (currentHttpClient == null) { if (candidateHosts == null) { candidateHosts = getHostCandidates(); //返回候選集合 排除 已經失敗隔離的Host集合 if (candidateHosts.isEmpty()) { throw new TransportException("There is no known eureka server; cluster server list is empty"); } } if (endpointIdx >= candidateHosts.size()) { throw new TransportException("Cannot execute request on any known server"); } // 根據當前的下標獲取Endpoint,並新建 JerseyClient currentEndpoint = candidateHosts.get(endpointIdx++); currentHttpClient = clientFactory.newClient(currentEndpoint); } try { // 繼續後續執行 EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient); // 若是根據當前操做類型 和 返回狀態碼,知足狀態計算器,記錄currentHttpClient可用,下次繼續使用 // 返回狀態碼是:200、300、302,或者Register、SendHeartBeat狀況下是404 if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) { delegate.set(currentHttpClient); if (retry > 0) { logger.info("Request execution succeeded on retry #{}", retry); } return response; } logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode()); } catch (Exception e) { logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace } // Connection error or 5xx from the server that must be retried on another server // 請求失敗 或 報5xx錯誤,將delegate清空,重試另外一個Server,並將當前Endpoint放到隔離集合 delegate.compareAndSet(currentHttpClient, null); if (currentEndpoint != null) { quarantineSet.add(currentEndpoint); } } // 屢次重試後仍沒法成功返回結果,上拋異常 throw new TransportException("Retry limit reached; giving up on completing the request"); } //########## RetryableEurekaHttpClient#getHostCandidates() // 返回 全部候選的Host節點數據 與 隔離集合 的數據差集 private List<EurekaEndpoint> getHostCandidates() { List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints(); //全部候選節點數據 quarantineSet.retainAll(candidateHosts); //確保quarantineSet隔離集合中的數據都在candidateHosts中 //當candidateHosts發生變化時也能及時清理quarantineSet隔離集合 // If enough hosts are bad, we have no choice but start over again // 默認:0.66百分比 int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage()); // 隔離集合爲空 if (quarantineSet.isEmpty()) { // no-op } // 隔離數據已經大於閥值,不得已要從新開始,清空隔離集合 else if (quarantineSet.size() >= threshold) { logger.debug("Clearing quarantined list of size {}", quarantineSet.size()); quarantineSet.clear(); } // 隔離集合不爲空,也不大於閥值,排除隔離集合中的Endpoint後返回 else { List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size()); for (EurekaEndpoint endpoint : candidateHosts) { if (!quarantineSet.contains(endpoint)) { remainingHosts.add(endpoint); } } candidateHosts = remainingHosts; } return candidateHosts; } //########## ServerStatusEvaluators#LEGACY_EVALUATOR // Eureka Server返回狀態的計算器,計算不一樣場景下的不一樣狀態碼是否表明成功 private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() { @Override public boolean accept(int statusCode, RequestType requestType) { if (statusCode >= 200 && statusCode < 300 || statusCode == 302) { return true; } else if (requestType == RequestType.Register && statusCode == 404) { return true; } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) { return true; } else if (requestType == RequestType.Cancel) { // cancel is best effort return true; } else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) { return true; } return false; } };
//########## RedirectingEurekaHttpClient#executeOnNewServer // Server端返回302重定向時,客戶端shutdown原EurekaHttpClient,根據response header中的Location新建EurekaHttpClient private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor, AtomicReference<EurekaHttpClient> currentHttpClientRef) { URI targetUrl = null; // 最多重定向默認10次 for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount++) { EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get()); // 若是返回的不是302重定向,返回response if (httpResponse.getStatusCode() != 302) { if (followRedirectCount == 0) { logger.debug("Pinning to endpoint {}", targetUrl); } else { logger.info("Pinning to endpoint {}, after {} redirect(s)", targetUrl, followRedirectCount); } return httpResponse; } // 從response中獲取Location,用於重建EurekaHttpClient targetUrl = getRedirectBaseUri(httpResponse.getLocation()); if (targetUrl == null) { throw new TransportException("Invalid redirect URL " + httpResponse.getLocation()); } currentHttpClientRef.getAndSet(null).shutdown(); currentHttpClientRef.set(factory.newClient(new DefaultEndpoint(targetUrl.toString()))); } String message = "Follow redirect limit crossed for URI " + serviceEndpoint.getServiceUrl(); logger.warn(message); throw new TransportException(message); }
// MetricsCollectingEurekaHttpClient#execute() protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType()); Stopwatch stopwatch = requestMetrics.latencyTimer.start(); //統計執行延時 try { EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate); requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); //按狀態統計 return httpResponse; } catch (Exception e) { requestMetrics.connectionErrors.increment(); //統計錯誤 exceptionsMetric.count(e); //按異常名統計 throw e; } finally { stopwatch.stop(); } }
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient { protected final Client jerseyClient; //真正處理請求的Jersery客戶端 protected final String serviceUrl; //鏈接的Server端地址 protected AbstractJerseyEurekaHttpClient(Client jerseyClient, String serviceUrl) { this.jerseyClient = jerseyClient; this.serviceUrl = serviceUrl; logger.debug("Created client for url: {}", serviceUrl); } /** * 註冊方法 */ @Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); //請求Eureka Server的【/apps/應用名】接口地址 ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); //實例InstanceInfo數據,經過Post請求body發過去 return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } } ...省略
通過上面的步驟,客戶端已經能夠經過Jersery發送Http請求給Eureka Server端註冊,具體請求以下:
POST /eureka/apps/應用名 HTTP/1.1
Accept-Encoding: gzip
Content-Type: application/json
Accept: application/json
DiscoveryIdentity-Name: DefaultClient
DiscoveryIdentity-Version: 1.4
DiscoveryIdentity-Id: 192.168.70.132
Transfer-Encoding: chunked
Host: localhost:8001
Connection: Keep-Alive
User-Agent: Java-EurekaClient/v1.6.2
1a0
{"instance":{
"instanceId":"192.168.70.132:應用名:10001",
"hostName":"192.168.70.132",
"app":"應用名",
"ipAddr":"192.168.70.132",
"status":"UP",
"overriddenstatus":"UNKNOWN",
"port": { "\(":10001, "@enabled" : "true" }, "securePort": { "\)":443, "@enabled" : "false"},
"countryId":1,
"dataCenterInfo":{"@class":"com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
"name":"MyOwn"
}
大致來講,Eureka Client的註冊是由Spring Cloud的
AutoServiceRegistration自動註冊
發起,在設置應用實例Instance初始狀態爲UP時,觸發了InstanceInfoReplicator#onDemandUpdate()按需更新
方法,將實例Instance信息經過DiscoveryClient
註冊到Eureka Server,期間通過了一些EurekaHttpClient的裝飾類,實現了諸如按期重連、失敗重試、註冊重定向、統計收集Metrics信息等功能,最後由JerseryClient
發送POST請求調用Eureka Server的【/eureka/apps/應用名】端點,請求體攜帶InstanceInfo實例信息,完成註冊
EurekaAutoServiceRegistration#start(): 實現Spring的SmartLifecycle,在Spring容器refresh()最後一步finishRefresh()會調用生命週期的start()方法
EurekaServiceRegistry#register(EurekaRegistration): 使用服務註冊器註冊服務信息
ApplicationInfoManager#setInstanceStatus(初始狀態): 應用實例信息管理器更新初始狀態爲 UP
StatusChangeListener: 觸發實例狀態監聽(此Listener是在DiscoveryClient#initScheduledTasks()方法中設置的)
InstanceInfoReplicator.onDemandUpdate(): 實例狀態複製器執行按需狀態更新
DiscoveryClient#register(): DiscoveryClient發起註冊實例信息
EurekaHttpClientDecorator#execute(): 執行EurekaHttpClient的裝飾類,實現其各自功能
SessionedEurekaHttpClient: 定時重連
RetryableEurekaHttpClient: 候選範圍內失敗重試
RedirectingEurekaHttpClient: 按Eureka Server端要求重定向到新Server
MetricsCollectingEurekaHttpClient: 統計收集執行狀況