本文主要研究下springcloud的EurekaClientAutoConfigurationhtml
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaClientAutoConfiguration.javajava
@Configuration @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @Import(DiscoveryClientOptionalArgsConfiguration.class) @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) @AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"}) public class EurekaClientAutoConfiguration { private ConfigurableEnvironment env; public EurekaClientAutoConfiguration(ConfigurableEnvironment env) { this.env = env; } @Bean public HasFeatures eurekaFeature() { return HasFeatures.namedFeature("Eureka Client", EurekaClient.class); } @Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { EurekaClientConfigBean client = new EurekaClientConfigBean(); if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) { // We don't register during bootstrap by default, but there will be another // chance later. client.setRegisterWithEureka(false); } return client; } @Bean @ConditionalOnMissingBean public ManagementMetadataProvider serviceManagementMetadataProvider() { return new DefaultManagementMetadataProvider(); } private String getProperty(String property) { return this.env.containsProperty(property) ? this.env.getProperty(property) : ""; } @Bean @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT) public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) { String hostname = getProperty("eureka.instance.hostname"); boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address")); String ipAddress = getProperty("eureka.instance.ipAddress"); boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled")); String serverContextPath = env.getProperty("server.context-path", "/"); int serverPort = Integer.valueOf(env.getProperty("server.port", env.getProperty("port", "8080"))); Integer managementPort = env.getProperty("management.server.port", Integer.class);// nullable. should be wrapped into optional String managementContextPath = env.getProperty("management.server.context-path");// nullable. should be wrapped into optional Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);//nullable EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils); instance.setNonSecurePort(serverPort); instance.setInstanceId(getDefaultInstanceId(env)); instance.setPreferIpAddress(preferIpAddress); instance.setSecurePortEnabled(isSecurePortEnabled); if (StringUtils.hasText(ipAddress)) { instance.setIpAddress(ipAddress); } if(isSecurePortEnabled) { instance.setSecurePort(serverPort); } if (StringUtils.hasText(hostname)) { instance.setHostname(hostname); } String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path"); String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path"); if (StringUtils.hasText(statusPageUrlPath)) { instance.setStatusPageUrlPath(statusPageUrlPath); } if (StringUtils.hasText(healthCheckUrlPath)) { instance.setHealthCheckUrlPath(healthCheckUrlPath); } ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath, managementContextPath, managementPort); if(metadata != null) { instance.setStatusPageUrl(metadata.getStatusPageUrl()); instance.setHealthCheckUrl(metadata.getHealthCheckUrl()); if(instance.isSecurePortEnabled()) { instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl()); } Map<String, String> metadataMap = instance.getMetadataMap(); if (metadataMap.get("management.port") == null) { metadataMap.put("management.port", String.valueOf(metadata.getManagementPort())); } } setupJmxPort(instance, jmxPort); return instance; } private void setupJmxPort(EurekaInstanceConfigBean instance, Integer jmxPort) { Map<String, String> metadataMap = instance.getMetadataMap(); if (metadataMap.get("jmx.port") == null && jmxPort != null) { metadataMap.put("jmx.port", String.valueOf(jmxPort)); } } @Bean public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) { return new EurekaDiscoveryClient(config, client); } @Bean public EurekaServiceRegistry eurekaServiceRegistry() { return new EurekaServiceRegistry(); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) { return EurekaRegistration.builder(instanceConfig) .with(applicationInfoManager) .with(eurekaClient) .with(healthCheckHandler) .build(); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); } @Configuration @ConditionalOnMissingRefreshScope protected static class EurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired private AbstractDiscoveryClientOptionalArgs<?> optionalArgs; @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } } //...... }
能夠看到這裏建立了EurekaClientConfigBean、EurekaInstanceConfigBean兩個基本配置,以及EurekaServiceRegistry
使用EurekaInstanceConfig,經過new InstanceInfoFactory().create(config)建立
使用InstanceInfo以及EurekaInstanceConfig建立:new ApplicationInfoManager(config, instanceInfo)
使用ApplicationInfoManager、EurekaClientConfig建立:new CloudEurekaClient(manager, config, this.optionalArgs,this.context)
經過EurekaInstanceConfig、EurekaClient建立:new EurekaDiscoveryClient(config, client)
經過EurekaClient、CloudEurekaInstanceConfig、ApplicationInfoManager來建立
經過EurekaServiceRegistry、EurekaRegistration來建立
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaAutoServiceRegistration.javaspring
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered { private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class); private AtomicBoolean running = new AtomicBoolean(false); private int order = 0; private AtomicInteger port = new AtomicInteger(0); private ApplicationContext context; private EurekaServiceRegistry serviceRegistry; private EurekaRegistration registration; public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry, EurekaRegistration registration) { this.context = context; this.serviceRegistry = serviceRegistry; this.registration = registration; } @Override public void start() { //...... } @Override public void stop() { //...... } @Override public boolean isRunning() { return this.running.get(); } @Override public int getPhase() { return 0; } @Override public boolean isAutoStartup() { return true; } @Override public void stop(Runnable callback) { stop(); callback.run(); } @Override public int getOrder() { return this.order; } @EventListener(ServletWebServerInitializedEvent.class) public void onApplicationEvent(ServletWebServerInitializedEvent event) { // TODO: take SSL into account int localPort = event.getWebServer().getPort(); if (this.port.get() == 0) { log.info("Updating port to " + localPort); this.port.compareAndSet(0, localPort); start(); } } @EventListener(ContextClosedEvent.class) public void onApplicationEvent(ContextClosedEvent event) { if( event.getApplicationContext() == context ) { stop(); } } }
這裏主要看實現SmartLifecycle接口的方法
spring-context-5.0.5.RELEASE-sources.jar!/org/springframework/context/SmartLifecycle.javabootstrap
/** * Start this component. * <p>Should not throw an exception if the component is already running. * <p>In the case of a container, this will propagate the start signal to all * components that apply. * @see SmartLifecycle#isAutoStartup() */ void start(); /** * Stop this component, typically in a synchronous fashion, such that the component is * fully stopped upon return of this method. Consider implementing {@link SmartLifecycle} * and its {@code stop(Runnable)} variant when asynchronous stop behavior is necessary. * <p>Note that this stop notification is not guaranteed to come before destruction: On * regular shutdown, {@code Lifecycle} beans will first receive a stop notification before * the general destruction callbacks are being propagated; however, on hot refresh during a * context's lifetime or on aborted refresh attempts, only destroy methods will be called. * <p>Should not throw an exception if the component isn't started yet. * <p>In the case of a container, this will propagate the stop signal to all components * that apply. * @see SmartLifecycle#stop(Runnable) * @see org.springframework.beans.factory.DisposableBean#destroy() */ void stop(); /** * Check whether this component is currently running. * <p>In the case of a container, this will return {@code true} only if <i>all</i> * components that apply are currently running. * @return whether the component is currently running */ boolean isRunning(); /** * Returns {@code true} if this {@code Lifecycle} component should get * started automatically by the container at the time that the containing * {@link ApplicationContext} gets refreshed. * <p>A value of {@code false} indicates that the component is intended to * be started through an explicit {@link #start()} call instead, analogous * to a plain {@link Lifecycle} implementation. * @see #start() * @see #getPhase() * @see LifecycleProcessor#onRefresh() * @see ConfigurableApplicationContext#refresh() */ boolean isAutoStartup(); /** * Indicates that a Lifecycle component must stop if it is currently running. * <p>The provided callback is used by the {@link LifecycleProcessor} to support * an ordered, and potentially concurrent, shutdown of all components having a * common shutdown order value. The callback <b>must</b> be executed after * the {@code SmartLifecycle} component does indeed stop. * <p>The {@link LifecycleProcessor} will call <i>only</i> this variant of the * {@code stop} method; i.e. {@link Lifecycle#stop()} will not be called for * {@code SmartLifecycle} implementations unless explicitly delegated to within * the implementation of this method. * @see #stop() * @see #getPhase() */ void stop(Runnable callback);
前面三個是Lifecycle接口的方法,後面兩個是SmartLifecycle擴展的方法
@Override public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent( new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true); } }
這裏調用this.serviceRegistry.register(this.registration)方法,把自身應用實例的信息註冊到eureka server中
@Override public void stop() { this.serviceRegistry.deregister(this.registration); this.running.set(false); }
這裏調用this.serviceRegistry.deregister(this.registration)方法,告知eureka server自身服務要下線
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaServiceRegistry.java併發
@Override public void register(EurekaRegistration reg) { maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler)); } private void maybeInitializeClient(EurekaRegistration reg) { // force initialization of possibly scoped proxies reg.getApplicationInfoManager().getInfo(); reg.getEurekaClient().getApplications(); }
這裏調用ApplicationInfoManager的setInstanceStatus方法來更改狀態
@Override public void deregister(EurekaRegistration reg) { if (reg.getApplicationInfoManager().getInfo() != null) { if (log.isInfoEnabled()) { log.info("Unregistering application " + reg.getInstanceConfig().getAppname() + " with eureka with status DOWN"); } reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN); //shutdown of eureka client should happen with EurekaRegistration.close() //auto registration will create a bean which will be properly disposed //manual registrations will need to call close() } }
這裏調用ApplicationInfoManager的setInstanceStatus方法來將狀態設置爲InstanceInfo.InstanceStatus.DOWN
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/ApplicationInfoManager.javaapp
/** * Set the status of this instance. Application can use this to indicate * whether it is ready to receive traffic. Setting the status here also notifies all registered listeners * of a status change event. * * @param status Status of the instance */ public synchronized void setInstanceStatus(InstanceStatus status) { InstanceStatus next = instanceStatusMapper.map(status); if (next == null) { return; } InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }
這裏發佈了StatusChangeEvent事件
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.javaless
if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); }
這裏註冊了StatusChangeListener,以後觸發instanceInfoReplicator.onDemandUpdate()
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/InstanceInfoReplicator.javaasync
public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); return false; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } } public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
這裏onDemandUpdate()方法主要是執行InstanceInfoReplicator.this.run()
而這個run方法主要是判斷是否dirty,若是是則調用discoveryClient.register()
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.javaide
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
register(),這個纔是真正去與遠程的Eureka Server交互,註冊或更新信息
EurekaClientAutoConfiguration構造了EurekaClientConfigBean、EurekaInstanceConfigBean以及EurekaServiceRegistry,以後在這幾個對象的基礎上進一步構建ApplicationInfoManager、CloudEurekaClient等。其中ApplicationInfoManager負責變動實例狀態併發布StatusChangeEvent事件,而CloudEurekaClient繼承了com.netflix.discovery.DiscoveryClient,裏頭包含了statusChangeListener用於響應StatusChangeEvent,最後觸發的是DiscoveryClient.register方法,與遠程的Eureka Server通訊,同步實例狀態。oop