Eureka 源碼分析之 Eureka Server

文章首發於公衆號《程序員果果》 地址 : mp.weixin.qq.com/s/FfJrAGQuH…java

簡介

上一篇文章《Eureka 源碼分析之 Eureka Client》 經過源碼知道 ,eureka Client 是經過 http rest來 與 eureka server 交互,實現 註冊服務,續約服務,服務下線 等。本篇探究下eureka server。node

源碼分析

從 @EnableEurekaServer 註解爲入口分析,經過源碼能夠看出他是一個標記註解:程序員

/** * Annotation to activate Eureka Server related configuration {@link */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}
複製代碼

從註釋能夠知道,用來激活 eureka server 的 配置類 EurekaServerAutoConfiguration 中相關配置,EurekaServerAutoConfiguration 的關鍵代碼以下:bootstrap

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
	/** * List of packages containing Jersey resources required by the Eureka server */
	private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
			"com.netflix.eureka" };

	@Autowired
	private ApplicationInfoManager applicationInfoManager;

	@Autowired
	private EurekaServerConfig eurekaServerConfig;

	@Autowired
	private EurekaClientConfig eurekaClientConfig;

	@Autowired
	private EurekaClient eurekaClient;

	@Autowired
	private InstanceRegistryProperties instanceRegistryProperties;

	public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();

	@Bean
	public HasFeatures eurekaServerFeature() {
		return HasFeatures.namedFeature("Eureka Server",
				EurekaServerAutoConfiguration.class);
	}

	@Configuration
	protected static class EurekaServerConfigBeanConfiguration {
		// 建立並加載EurekaServerConfig的實現類,主要是Eureka-server的配置信息
		@Bean
		@ConditionalOnMissingBean
		public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
			EurekaServerConfigBean server = new EurekaServerConfigBean();
			if (clientConfig.shouldRegisterWithEureka()) {
				// Set a sensible default if we are supposed to replicate
				server.setRegistrySyncRetries(5);
			}
			return server;
		}
	}

	//加載EurekaController,SpringCloud 提供了一些額外的接口,用來獲取eurekaServer的信息
	@Bean
	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
	public EurekaController eurekaController() {
		return new EurekaController(this.applicationInfoManager);
	}

	//省略 ...
	
	// 接收客戶端的註冊等請求就是經過InstanceRegistry來處理的,是真正處理業務的類,接下來會詳細分析
	@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}
	
	//配置服務節點信息,這裏的做用主要是爲了配置Eureka的peer節點,也就是說當有收到有節點註冊上來的時候,須要通知給哪些節點
	@Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) {
		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
	}
	
	//省略 ... 
	
	//EurekaServer的上下文
	@Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}
	
	// 初始化Eureka-server,會同步其餘註冊中心的數據到當前註冊中心
	@Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}

	// 配置攔截器,ServletContainer裏面實現了jersey框架,經過他來實現eurekaServer對外的restFull接口
	@Bean
	public FilterRegistrationBean jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) {
		FilterRegistrationBean bean = new FilterRegistrationBean();
		bean.setFilter(new ServletContainer(eurekaJerseyApp));
		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
		bean.setUrlPatterns(
				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

		return bean;
	}

	//省略 ...
	

}
複製代碼

從EurekaServerAutoConfiguration 類上的註解@Import(EurekaServerInitializerConfiguration.class) 能夠到,實例化類EurekaServerAutoConfiguration以前,已經實例化了EurekaServerInitializerConfiguration類,代碼以下:app

@Configuration
@CommonsLog
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
 
   // 此處省略部分代碼
 
   @Override
   public void start() {
      // 啓動一個線程
      new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               //初始化EurekaServer,同時啓動Eureka Server ,後面着重講這裏
               eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
               log.info("Started Eureka Server");
                // 發佈EurekaServer的註冊事件
               publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                // 設置啓動的狀態爲true
               EurekaServerInitializerConfiguration.this.running = true;
                // 發送Eureka Start 事件 , 其餘還有各類事件,咱們能夠監聽這種時間,而後作一些特定的業務需求,後面會講到。
               publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
            }
            catch (Exception ex) {
               // Help!
               log.error("Could not initialize Eureka servlet context", ex);
            }
         }
      }).start();
   }
 
   // 此處省略部分代碼
 
}
複製代碼

這個start方法中開啓了一個新的線程,而後進行一些Eureka Server的初始化工做,好比調用eurekaServerBootstrap的contextInitialized方法,EurekaServerBootstrap代碼以下:框架

public class EurekaServerBootstrap {

	// 此處省略部分代碼

	public void contextInitialized(ServletContext context) {
	   try {
	      // 初始化Eureka的環境變量
	      initEurekaEnvironment();
	      // 初始化Eureka的上下文
	      initEurekaServerContext();
	 
	      context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
	   }
	   catch (Throwable e) {
	      log.error("Cannot bootstrap eureka server :", e);
	      throw new RuntimeException("Cannot bootstrap eureka server :", e);
	   }
	}
	 
	protected void initEurekaEnvironment() throws Exception {
	   log.info("Setting the eureka configuration..");
	 
	   String dataCenter = ConfigurationManager.getConfigInstance()
	         .getString(EUREKA_DATACENTER);
	   if (dataCenter == null) {
	      log.info(
	            "Eureka data center value eureka.datacenter is not set, defaulting to default");
	      ConfigurationManager.getConfigInstance()
	            .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
	   }
	   else {
	      ConfigurationManager.getConfigInstance()
	            .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
	   }
	   String environment = ConfigurationManager.getConfigInstance()
	         .getString(EUREKA_ENVIRONMENT);
	   if (environment == null) {
	      ConfigurationManager.getConfigInstance()
	            .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
	      log.info(
	            "Eureka environment value eureka.environment is not set, defaulting to test");
	   }
	   else {
	      ConfigurationManager.getConfigInstance()
	            .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
	   }
	}
	 
	protected void initEurekaServerContext() throws Exception {
	   // For backward compatibility
	   JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
	         XStream.PRIORITY_VERY_HIGH);
	   XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
	         XStream.PRIORITY_VERY_HIGH);
	 
	   if (isAws(this.applicationInfoManager.getInfo())) {
	      this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
	            this.eurekaClientConfig, this.registry, this.applicationInfoManager);
	      this.awsBinder.start();
	   }
	  
	   //初始化eureka server上下文
	   EurekaServerContextHolder.initialize(this.serverContext);
	 
	   log.info("Initialized server context");
	 
	   // Copy registry from neighboring eureka node
	   // 從相鄰的eureka節點複製註冊表 
	   int registryCount = this.registry.syncUp();
	    // 默認每30秒發送心跳,1分鐘就是2次
	    // 修改eureka狀態爲up 
	    // 同時,這裏面會開啓一個定時任務,用於清理 60秒沒有心跳的客戶端。自動下線
	   this.registry.openForTraffic(this.applicationInfoManager, registryCount);
	 
	   // Register all monitoring statistics.
	   EurekaMonitors.registerAllStats();
	}
	public void contextDestroyed(ServletContext context) {
	   try {
	      log.info("Shutting down Eureka Server..");
	      context.removeAttribute(EurekaServerContext.class.getName());
	 
	      destroyEurekaServerContext();
	      destroyEurekaEnvironment();
	 
	   }
	   catch (Throwable e) {
	      log.error("Error shutting down eureka", e);
	   }
	   log.info("Eureka Service is now shutdown...");
	}
	
}
複製代碼

在初始化Eureka Server上下文環境後,就會繼續執行openForTraffic方法,這個方法主要是設置了指望每分鐘接收到的心跳次數,並將服務實例的狀態設置爲UP,最後又經過方法postInit來開啓一個定時任務,用於每隔一段時間(默認60秒)將沒有續約的服務實例(默認90秒沒有續約)清理掉。openForTraffic的方法代碼以下:ide

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 計算每分鐘最大續約數
    this.expectedNumberOfRenewsPerMin = count * 2;
    // 計算每分鐘最小續約數
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 修改服務實例的狀態爲UP
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    // 開啓定時任務,每隔一段時間(默認60秒)將沒有續約的服務實例(默認90秒沒有續約)清理掉
    super.postInit();
}
複製代碼

postInit方法開啓了一個新的定時任務,代碼以下:源碼分析

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}
複製代碼

這裏的時間間隔都來自於EurekaServerConfigBean類,能夠在配置文件中以eureka.server開頭的配置來進行設置。post

參考

www.e-learn.cn/content/qit… nobodyiam.com/2016/06/25/… blog.csdn.net/Lammonpeter…ui

關注我

相關文章
相關標籤/搜索