本文使用Spring Cloud Eureka分析java
Spring Cloud版本: Dalston.SR5node
spring-cloud-starter-eureka版本: 1.3.6.RELEASEgit
netflix eureka版本: 1.6.2github
繼續 從Eureka Client發起註冊請求到Eureka Server處理的整個服務註冊過程(上) 分析spring
目錄:docker
建立Spring Cloud Eureka Server首先要使用@EnableEurekaServer
註解,其實質是:json
@EnableDiscoveryClient @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { }
@EnableDiscoveryClient
: 引入服務發現客戶端相關配置(身爲Server的同時,在Server集羣複製時也會做爲Client)EurekaServerMarkerConfiguration
: 激活EurekaServerAutoConfiguration
因此,@EnableEurekaServer
註解和上一篇分析的Client啓動註解都是經過向Spring容器注入Maker的形式激活xxAutoConfiguration配置類,Eureka Client是EurekaClientAutoConfiguration
,Eureka Server是EurekaServerAutoConfiguration
bootstrap
如下是對自動注入的各個組件的簡單分析:緩存
頭部註解
@Import(EurekaServerInitializerConfiguration.class):導入Eureka Server初始化的配置類,其實現SmartLifecycle接口,會在Spring容器基本refresh完畢時調用EurekaServerBootstrap#contextInitialized()
Eureka Server啓動分析重點
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
EurekaDashboardProperties
是儀表盤相關屬性
InstanceRegistryProperties
是實例註冊相關屬性
@ConfigurationProperties(PREFIX) public class InstanceRegistryProperties { public static final String PREFIX = "eureka.instance.registry"; /* Default number of expected renews per minute, defaults to 1. * Setting expectedNumberOfRenewsPerMin to non-zero to ensure that even an isolated * server can adjust its eviction policy to the number of registrations (when it's * zero, even a successful registration won't reset the rate threshold in * InstanceRegistry.register()). * 每分鐘默認續約數量爲1 * 將expectedNumberOfRenewsPerMin設置爲非零 * 以確保即便是隔離的服務器也能夠根據註冊數量調整其驅逐策略 * (當它爲零時,即便成功註冊也不會重置InstanceRegistry.register()中的速率閾值) */ @Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") // for backwards compatibility // 爲了向後兼容 private int expectedNumberOfRenewsPerMin = 1; /** * Value used in determining when leases are cancelled, default to 1 for standalone. * Should be set to 0 for peer replicated eurekas * 決定租約什麼時候取消的值 * 單機默認值爲1,對於同行複製的eurekas,應設置爲0 */ @Value("${eureka.server.defaultOpenForTrafficCount:1}") // for backwards compatibility private int defaultOpenForTrafficCount = 1;
@PropertySource("classpath:/eureka/server.properties") :在spring-cloud-netflix-eureka-server-xxx.jar中,只包含 spring.http.encoding.force=false
EurekaServerFeature: 訪問/features
端點時會顯示啓用的Eureka Server自動配置類爲EurekaServerAutoConfiguration
EurekaServerConfig: 注入Eureka Server配置類,EurekaServerConfig
是netflix的接口,裏面有不少記錄eureka服務器運行所需的配置信息,netflix的默認實現類是DefaultEurekaServerConfig
,spring cloud的默認實現類是EurekaServerConfigBean
@Configuration protected static class EurekaServerConfigBeanConfiguration { @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); //建立EurekaServerConfigBean // 若是當前Eureka Server自己也須要做爲客戶端註冊(集羣模式必須開啓??) if (clientConfig.shouldRegisterWithEureka()) { // Set a sensible default if we are supposed to replicate // 設置EurekaServer在啓動期間eureka節點嘗試從對等放獲取註冊表信息的重試次數 server.setRegistrySyncRetries(5); } return server; } }
EurekaController:Eureka Server Dashborad 對應的 Controller(默認path: /)
PeerAwareInstanceRegistry: 直譯是對等體可見的應用實例註冊器,就是在註冊實例時會考慮集羣狀況下其它Node相關操做的註冊器
@Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization // 強制初始化eurekaClient,在以前看RefreshScope的bug時,也使用到了這種方式強制建立eurekaClient // 建立InstanceRegistry(是spring cloud的實現) // 繼承了PeerAwareInstanceRegistryImpl,PeerAwareInstanceRegistry接口的實現類 return new InstanceRegistry( this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); }
PeerEurekaNodes: 用來管理PeerEurekaNode的幫助類
EurekaServerConfigBean.peerEurekaNodesUpdateIntervalMs=10 * MINUTES
,調用時機是:DefaultEurekaServerContext在@PostConstruct調用initialize()-->peerEurekaNodes.start()PeerEurekaNode#shutdown()
,在添加新的能夠PeerEurekaNodes#createPeerEurekaNode()
EurekaServerContext: Eureka Server啓動分析重點
Eureka Server上下文接口,包含initialize()、shutdown()方法,EurekaServerConfig配置,PeerEurekaNodes節點管理幫助類,PeerAwareInstanceRegistry對等體可見的應用實例註冊器,ApplicationInfoManager當前應用實例info信息管理器(是由Client配置初始化的)
默認實現類 com.netflix.eureka.DefaultEurekaServerContext
@PostConstruct方法包含一些初始化邏輯(說明初始化方法是在DefaultEurekaServerContext構造後由@PostConstruct觸發的?)
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); // PeerEurekaNode的幫助類start // 會啓動更新PeerNode列表的定時線程 peerEurekaNodes.start(); // PeerAwareInstanceRegistry初始化 // 啓動numberOfReplicationsLastMin定時線程、initializedResponseCache()、scheduleRenewalThresholdUpdateTask()、initRemoteRegionRegistry(),還有添加JMX監控 registry.init(peerEurekaNodes); logger.info("Initialized"); }
EurekaServerBootstrap: Eureka Server啓動引導,會在Spring容器基本refresh()完畢時由EurekaServerInitializerConfiguration#run()方法真正調用eurekaServerBootstrap.contextInitialized()
初始化,其中會initEurekaEnvironment()
、initEurekaServerContext()
Eureka Server啓動分析重點
註冊 Jersey filter: 全部/eureka
的請求都須要通過Jersery Filter,其處理類是com.sun.jersey.spi.container.servlet.ServletContainer,其既是Filter,也是Servlet,包含Jersey的處理邏輯。在構造時已經將com.netflix.discovery包 和 com.netflix.eureka包 下的類做爲處理請求的資源導入,如處理單個應用請求的com.netflix.eureka.resources.ApplicationResource
通過上面的EurekaServerAutoConfiguration自動配置類分析後,我的感受有幾個重點:
一、DefaultEurekaServerContext(Eureka Server上下文) 初始化
由於netflix設計的EurekaServerContext接口自己包含不少成員變量,如PeerEurekaNodes管理對等節點、PeerAwareInstanceRegistry考慮對等節點的實例註冊器等,在Eureka Server上下文初始化時會對這些組件初始化,還會啓動一些定時線程
二、EurekaServerBootstrap初始化
EurekaServerBootstrap是spring cloud實現的Eureka Server的啓動引導類,在netflix對應的是
EurekaBootstrap
。而這個啓動引導類初始化是在EurekaServerInitializerConfiguration這個Spring的SmartLifecycle bean的生命週期方法中觸發的,在refresh()幾乎完成的時候,因此會在Eureka Server上下文初始化以後三、jerseyFilter,用於處理全部到/eureka的請求
首先看Netflix的EurekaServerContext接口是如何定義的:
public interface EurekaServerContext { void initialize() throws Exception; void shutdown() throws Exception; EurekaServerConfig getServerConfig(); PeerEurekaNodes getPeerEurekaNodes(); ServerCodecs getServerCodecs(); PeerAwareInstanceRegistry getRegistry(); ApplicationInfoManager getApplicationInfoManager(); }
除了初始化initialize()方法,shutdown()方法,還有一些組件EurekaServerConfig、PeerEurekaNodes、ServerCodecs、PeerAwareInstanceRegistry、ApplicationInfoManager,而在自動配置構造DefaultEurekaServerContext時,這些組件都已設置好
@Inject public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.registry = registry; this.peerEurekaNodes = peerEurekaNodes; this.applicationInfoManager = applicationInfoManager; }
接下來是由@PostConstruct
觸發的初始化方法
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); peerEurekaNodes.start(); registry.init(peerEurekaNodes); logger.info("Initialized"); }
主要調用了2個組件的初始化方法:PeerEurekaNodes
和 PeerAwareInstanceRegistry
public void start() { // 後臺運行的單線程定時任務執行器,定時線程名:Eureka-PeerNodesUpdater taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 解析Eureka Server URL,並更新PeerEurekaNodes列表 updatePeerEurekaNodes(resolvePeerUrls()); // 啓動定時執行任務peersUpdateTask(定時默認10min,由peerEurekaNodesUpdateIntervalMs配置) Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { // 定時任務中仍然是 解析Eureka Server URL,並更新PeerEurekaNodes列表 updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } // 打印對等體節點(應該沒有當前節點本身) for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: " + node.getServiceUrl()); } }
PeerEurekaNodes啓動主要作了2件事:
protected List<String> resolvePeerUrls() { // 當前Eureka Server本身的InstanceInfo信息 InstanceInfo myInfo = applicationInfoManager.getInfo(); // 當前Eureka Server所在的zone,默認是 defaultZone String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); // 獲取配置的service-url List<String> replicaUrls = EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); // 遍歷service-url,排除本身 int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; }
isThisMyUrl()
是如何判斷是本身的URL,進而排除的呢?
public boolean isThisMyUrl(String url) { return isInstanceURL(url, applicationInfoManager.getInfo()); } public boolean isInstanceURL(String url, InstanceInfo instance) { // 根據配置項的url獲取host主機信息 String hostName = hostFromUrl(url); // 根據當前Eureka Server的Instance實例信息獲取host主機信息 String myInfoComparator = instance.getHostName(); // 若是eureka.client.transport.applicationsResolverUseIp==true,即按照IP解析URL // 那麼將當前Eureka Server的Instance實例信息轉換爲IP if (clientConfig.getTransportConfig().applicationsResolverUseIp()) { myInfoComparator = instance.getIPAddr(); } // 比較配置項的hostName 和 當前Eureka Server的Instance實例信息 return hostName != null && hostName.equals(myInfoComparator); }
其中配置項中的hostName基本上就是 http:// 和 端口號 之間的部分,而當前Eureka Server實例的用於比較的myInfoComparator信息是
EurekaClientAutoConfiguration
中建立EurekaInstanceConfigBean
時使用的InetUtils中獲取,InetUtils是spring cloud網絡相關的工具類,其首先根據第一個非迴環網卡獲取IP(注意:docker容器環境有坑),再根據InetAddress獲取與IP對應的hostname,我已知的是從如Linux的 /etc/hosts配置文件中獲取 或者 從hostname環境變量獲取// PeerEurekaNodes#updatePeerEurekaNodes() // newPeerUrls爲本次要更新的Eureka對等體URL列表 protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } // 計算 原peerEurekaNodeUrls - 新newPeerUrls 的差集,就是多餘可shutdown節點 Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); // 計算 新newPeerUrls - 原peerEurekaNodeUrls 的差集,就是須要新增節點 Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 沒有變動 return; } // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); // shutDown多餘節點 if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // Add new peers // 添加新的peerEurekaNode - createPeerEurekaNode() if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); }
根據上一步初始化好的peerEurekaNodes,來初始化PeerAwareInstanceRegistry,考慮集羣中的對等體的實例註冊器
// PeerAwareInstanceRegistryImpl#init() @Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { // 【重要】啓動用於統計最後xx毫秒續約狀況的定時線程 this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; // 【重要】初始化ResponseCache: 對客戶端查詢服務列表信息的緩存(全部服務列表、增量修改、單個應用) // 默認responseCacheUpdateIntervalMs=30s initializedResponseCache(); // 【重要】按期更新續約閥值的任務,默認900s執行一次 // 調用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold() scheduleRenewalThresholdUpdateTask(); // 初始化 遠程區域註冊 相關信息(默認沒有遠程Region,都是使用us-east-1) initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
numberOfReplicationsLastMin是com.netflix.eureka.util.MeasuredRate
用於統計測量上一分鐘來自對等節點複製的續約數
// MeasuredRate#start() public synchronized void start() { if (!isActive) { timer.schedule(new TimerTask() { @Override public void run() { try { // Zero out the current bucket. // 將當前的桶的統計數據放到lastBucket,當前桶置爲0 lastBucket.set(currentBucket.getAndSet(0)); } catch (Throwable e) { logger.error("Cannot reset the Measured Rate", e); } } }, sampleInterval, sampleInterval); isActive = true; } } /** * Returns the count in the last sample interval. * 返回上一分鐘的統計數 */ public long getCount() { return lastBucket.get(); } /** * Increments the count in the current sample interval. * 增長當前桶的計數,在如下2個場景有調用: * AbstractInstanceRegistry#renew() - 續約 * PeerAwareInstanceRegistryImpl#replicateToPeers() - */ public void increment() { currentBucket.incrementAndGet(); }
ResponseCache主要是緩存服務列表信息,根據註釋可知,緩存以壓縮和非壓縮形式維護,用於三類請求: all applications,增量更改和單個application
// ResponseCacheImpl構造 private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); private final LoadingCache<Key, Value> readWriteCacheMap; ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; // 根據配置eureka.server.useReadOnlyResponseCache判斷,是否使用只讀ResponseCache,默認true // 因爲ResponseCache維護這一個可讀可寫的readWriteCacheMap,還有一個只讀的readOnlyCacheMap // 此配置控制在get()應用數據時,是去只讀Map讀,仍是讀寫Map讀,應該只讀Map是按期更新的 this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; // eureka.server.responseCacheUpdateIntervalMs緩存更新頻率,默認30s long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); // 建立讀寫Map,com.google.common.cache.LoadingCache // 能夠設置初始值,數據寫入過時時間,刪除監聽器等 this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } }); // 若是啓用只讀緩存,那麼每隔responseCacheUpdateIntervalMs=30s,執行getCacheUpdateTask() if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } }
可見ResponseCache維護了兩個Map,一個可讀可寫的readWriteCacheMap,應該每一個操做都會寫入,一個只讀的readOnlyCacheMap,默認應該每30s更新一次,下面具體看看getCacheUpdateTask()
// ResponseCacheImpl#getCacheUpdateTask() private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); // 遍歷只讀Map for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()}; logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); // 若是隻讀Map中的值 和 讀寫Map中的值不一樣,用讀寫Map更新只讀Map if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache", th); } } } }; }
每30s會比較只讀Map和讀寫Map中的值,以讀寫Map中的爲準
/** * Schedule the task that updates <em>renewal threshold</em> periodically. * The renewal threshold would be used to determine if the renewals drop * dramatically because of network partition and to protect expiring too * many instances at a time. * 每隔 eureka.server.renewalThresholdUpdateIntervalMs=900秒 更新一次續約閥值 */ private void scheduleRenewalThresholdUpdateTask() { timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); }
更新續約閥值在updateRenewalThreshold()
方法
// PeerAwareInstanceRegistryImpl#updateRenewalThreshold() /** * Updates the <em>renewal threshold</em> based on the current number of * renewals. The threshold is a percentage as specified in * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals * received per minute {@link #getNumOfRenewsInLastMin()}. */ private void updateRenewalThreshold() { try { Applications apps = eurekaClient.getApplications(); int count = 0; // 統計全部Instance實例個數 for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { // Update threshold only if the threshold is greater than the // current expected threshold of if the self preservation is disabled. // 只有當閥值大於當前預期值時,才更新 或者 關閉了自我保護模式 if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold()); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } }
其實大致意思是:先計算全部Instance實例個數,默認每一個實例1分鐘應該續約2次(30s一次)
但如上代碼是有問題的,不管是註釋仍是判斷邏輯,當前版本:eureka-core-1.6.2
直到 v1.9.3版本才修復
https://github.com/Netflix/eureka/commit/a4dd6b22ad447c706234e63fe83cb58413f7618b#diff-4aec7ea96457f5084840fc40f501c320
以後又有兩個版本,修改了這裏的計算邏輯和作了方法抽取
Extract calculation of renews threshold to separate method
上面的自動配置過程當中已經註冊了處理全部 /eureka/** 請求的Jersey Filter,這樣全部Client的註冊、續約等請求均可以處理了。而還有一些工做是經過EurekaServerBootstrap#contextInitialized()
完成的,在Spring容器基本上refresh()完畢的時候
EurekaServerBootstrap是 spring cloud的實現,而netflix的Eureka Server啓動引導的實現是 EurekaBootStrap
// EurekaServerBootstrap#contextInitialized() public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); // 初始化環境 initEurekaServerContext(); // 初始化上下文 context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
這兩個裏面咱們主要關注上下文的初始化initEurekaServerContext()
// EurekaServerBootstrap#initEurekaServerContext() protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); // 是否爲AWS環境 if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } // 將serverContext由Holder保管 EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node // 從相鄰的eureka節點拷貝註冊列表信息 int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
有兩個重要環接:
/** * Populates the registry information from a peer eureka node. This * operation fails over to other nodes until the list is exhausted if the * communication fails. */ @Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; // 循環,最多重試RegistrySyncRetries次(默認 5) // eurekaClient中的邏輯會重試其它的eureka節點 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); //30s } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } // 從eurekaClient獲取服務列表 Applications apps = eurekaClient.getApplications(); // 循環服務列表,並依次註冊 for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
// InstanceRegistry#openForTraffic() /** * If * {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)} * is called with a zero argument, it means that leases are not automatically * cancelled if the instance hasn't sent any renewals recently. This happens for a * standalone server. It seems like a bad default, so we set it to the smallest * non-zero value we can, so that any instances that subsequently register can bump up * the threshold. */ @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // 若是count==0,即沒有從相鄰eureka節點獲得服務列表,如單機啓動模式,defaultOpenForTrafficCount=1 super.openForTraffic(applicationInfoManager, count == 0 ? this.defaultOpenForTrafficCount : count); } // PeerAwareInstanceRegistryImpl#openForTraffic() @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 每分鐘期待的續約數(默認30s續約,60s就是2次) this.expectedNumberOfRenewsPerMin = count * 2; // 每分鐘續約的閥值:85% * expectedNumberOfRenewsPerMin this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got " + count + " instances from neighboring DS node"); logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { //可count默認值是1,那麼peerInstancesTransferEmptyOnStartup始終不會是true //在PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)方法有用 this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 開啓新的【EvictionTask】 super.postInit(); } // AbstractInstanceRegistry#postInit() protected void postInit() { renewsLastMin.start(); //統計上一分鐘續約數的監控Timer if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), //默認60s serverConfig.getEvictionIntervalTimerInMs()); }
通過上面的Eureka Server自動配置及初始化,Eureka Server已經成功啓動並能夠經過Jersey處理各類請求,具體的註冊請求是由com.netflix.eureka.resources.ApplicationResource#addInstance()
處理的
// ApplicationResource#addInstance() @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields // 驗證Instance實例的全部必填字段 if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data // 處理客戶端可能正在使用缺乏數據的錯誤DataCenterInfo註冊的狀況 DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } // 【 使用PeerAwareInstanceRegistry集羣實例註冊器register當前實例 】 // isReplication表示此操做是不是節點間的複製,此處isReplication==null registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible // 註冊成功返回204狀態碼 }
重點是 registry.register(info, "true".equals(isReplication))
,即便用PeerAwareInstanceRegistry集羣實例註冊器register當前實例
// PeerAwareInstanceRegistryImpl#register() /** * Registers the information about the {@link InstanceInfo} and replicates * this information to all peer eureka nodes. If this is replication event * from other replica nodes then it is not replicated. * 註冊有關InstanceInfo信息,並將此信息複製到全部對等的eureka節點 * 若是這是來自其餘節點的複製事件,則不會繼續複製它 * * @param info * the {@link InstanceInfo} to be registered and replicated. * @param isReplication * true if this is a replication event from other replica nodes, * false otherwise. */ @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; //默認的租約持續時間是90s // 若是當前Instance實例的租約信息中有leaseDuration持續時間,使用實例的leaseDuration if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 【 當前Eureka Server註冊實例信息 】 super.register(info, leaseDuration, isReplication); // 【 將註冊實例信息複製到集羣中其它節點 】 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
/** * Registers a new instance with a given duration. * * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); //讀鎖 // registry是保存全部應用實例信息的Map:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> // 從registry中獲取當前appName的全部實例信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); //註冊統計+1 // 若是當前appName實例信息爲空,新建Map if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } // 獲取實例的Lease租約信息 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease // 若是已經有租約,則保留最後一個髒時間戳而不覆蓋它 // (比較當前請求實例租約 和 已有租約 的LastDirtyTimestamp,選擇靠後的) if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration // 若是以前不存在實例的租約,說明是新實例註冊 // expectedNumberOfRenewsPerMin期待的每分鐘續約數+2(由於30s一個) // 並更新numberOfRenewsPerMinThreshold每分鐘續約閥值(85%) synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); //當前實例信息放到維護註冊信息的Map // 同步維護最近註冊隊列 synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens // 若是當前實例已經維護了OverriddenStatus,將其也放到此Eureka Server的overriddenInstanceStatusMap中 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules // 根據overridden status規則,設置狀態 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp // 若是租約以UP狀態註冊,設置租賃服務時間戳 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); //ActionType爲 ADD recentlyChangedQueue.add(new RecentlyChangedItem(lease)); //維護recentlyChangedQueue registrant.setLastUpdatedTimestamp(); //更新最後更新時間 // 使當前應用的ResponseCache失效 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); //讀鎖 } }
維護當前Instance實例的Lease租約信息,並放到Eureka Server維護註冊信息的Map:【ConcurrentHashMap<String, Map<String, Lease
若是是新註冊,expectedNumberOfRenewsPerMin期待的每分鐘續約數+2, 並更新numberOfRenewsPerMinThreshold每分鐘續約閥值
維護 recentRegisteredQueue最近註冊隊列,recentlyChangedQueue最近更改隊列,維護的目的是能夠獲取最近xx操做的狀況
若是本次註冊實例已經維護了OverriddenStatus,根據必定規則,維護本Server節點當前實例的OverriddenStatus
設置Instance實例的最後更新時間戳
對當前應用對應的ResponseCache緩存失效
responseCache 用於緩存查詢的應用實例信息
其使用guava cache維護了一個可讀可寫的LocalLoadingCache本地緩存【readWriteCacheMap】,還有一個只讀的ConcurrentMap 【readOnlyCacheMap】
在 get(key, useReadOnlyCache)時首先會檢查【readOnlyCacheMap】只讀緩存,如沒有,再查【readWriteCacheMap】,而【readWriteCacheMap】的
get()
其含義實際是getOrLoad()
,若是獲取不到從CacheLoader加載,而CacheLoader會到維護應用實例註冊信息的Map中獲取【readWriteCacheMap】是直接與維護應用實例註冊信息Map交互的,查詢時會Load加載,註冊新實例時會失效整個應用的
【readOnlyCacheMap】是在【readWriteCacheMap】之上的只讀緩存,由配置 eureka.server.useReadOnlyResponseCache控制,默認true,每隔 eureka.server.responseCacheUpdateIntervalMs=30s 與【readWriteCacheMap】同步一次
// PeerAwareInstanceRegistryImpl#replicateToPeers() /** * Replicates all eureka actions to peer eureka nodes except for replication * traffic to this node. */ private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { // 若是是複製操做(針對當前節點,false) if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication // 若是它已是複製,請不要再次複製,直接return if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // 遍歷集羣全部節點(除當前節點外) for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 複製Instance實例操做到某個node節點 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
下面是replicateInstanceActionsToPeers()
複製Instance實例操做到其它節點
// PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers() /** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */ private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: //取消 node.cancel(appName, id); break; case Heartbeat: //心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: //註冊 node.register(info); break; case StatusUpdate: //狀態更新 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: //刪除OverrideStatus infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
本次只關心節點的註冊操做
// PeerEurekaNode#register() /** * Sends the registration information of {@link InstanceInfo} receiving by * this node to the peer node represented by this class. * * @param info * the instance information {@link InstanceInfo} of any instance * that is send to this instance. * @throws Exception */ public void register(final InstanceInfo info) throws Exception { // 當前時間 + 30s後 過時 long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); // 提交相同的操做到批量複製任務處理 batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, overriddenStatus:null, replicateInstanceInfo:true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }
而以後就和Eureka Client發起註冊請求的調用差很少 replicationClient.register(info)
至此,Spring Cloud Eureka Server的整個自動配置及初始化,以及接收註冊請求,並複製到集羣中的對等節點就分析完了
大致時序流程參考:
參考:
Dive into Eureka: 宋順