-node
一、鑑於一些朋友的提問並提議講解下eureka的源碼分析,由此應運而產生的本章節的內容; 二、因此我站在自個人理解角度試着整理了這篇Eureka源碼的分析,但願對你們有所幫助; 三、因爲篇幅太長不能在一篇裏面發佈出來,因此拆分了上下篇;
一、Eureka Server 提供服務註冊服務,各個節點啓動後,會在Eureka Server中進行註冊,這樣Eureka Server中的服務註冊表中將會存儲全部可用服務節點的信息,服務節點的信息能夠在界面中直觀的看到。 二、Eureka Client 是一個Java 客戶端,用於簡化與Eureka Server的交互,客戶端同時也具有一個內置的、使用輪詢負載算法的負載均衡器。 三、在應用啓動後,將會向Eureka Server發送心跳(默認週期爲30秒),若是Eureka Server在多個心跳週期沒有收到某個節點的心跳,Eureka Server 將會從服務註冊表中把這個服務節點移除(默認90秒)。 四、Eureka Server之間將會經過複製的方式完成數據的同步; 五、Eureka Client具備緩存的機制,即便全部的Eureka Server 都掛掉的話,客戶端依然能夠利用緩存中的信息消費其它服務的API;
2017-10-22 18:14:17.635 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'environmentManager': registering with JMX server as MBean [org.springframework.cloud.context.environment:name=environmentManager,type=EnvironmentManager] 2017-10-22 18:14:17.650 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'restartEndpoint': registering with JMX server as MBean [org.springframework.cloud.context.restart:name=restartEndpoint,type=RestartEndpoint] 2017-10-22 18:14:17.661 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'refreshScope': registering with JMX server as MBean [org.springframework.cloud.context.scope.refresh:name=refreshScope,type=RefreshScope] 2017-10-22 18:14:17.674 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'configurationPropertiesRebinder': registering with JMX server as MBean [org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=335b5620,type=ConfigurationPropertiesRebinder] 2017-10-22 18:14:17.683 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'refreshEndpoint': registering with JMX server as MBean [org.springframework.cloud.endpoint:name=refreshEndpoint,type=RefreshEndpoint] 2017-10-22 18:14:17.926 INFO 5288 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 2017-10-22 18:14:17.927 INFO 5288 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application unknown with eureka with status UP 2017-10-22 18:14:17.927 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : Setting the eureka configuration.. 2017-10-22 18:14:17.948 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : isAws returned false 2017-10-22 18:14:17.949 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : Initialized server context 2017-10-22 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Got 1 instances from neighboring DS node 2017-10-22 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Renew threshold is: 1 2017-10-22 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Changing status to UP 2017-10-22 18:14:17.958 INFO 5288 --- [ Thread-10] e.s.EurekaServerInitializerConfiguration : Started Eureka Server 2017-10-22 18:14:18.019 INFO 5288 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8761 (http) 2017-10-22 18:14:18.020 INFO 5288 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761 2017-10-22 18:14:18.023 INFO 5288 --- [ main] c.s.cloud.EurekaServerApplication : Started EurekaServerApplication in 8.299 seconds (JVM running for 8.886) 【【【【【【 Eureka微服務 】】】】】】已啓動. 【分析】:發現有這麼一句日誌打印「Setting the eureka configuration..」,eureka 開始進行配置,說不定也許就是Eureka Server 流程啓動的開 始呢?咱們抱着懷疑的心態進入這行日誌打印的EurekaServerBootstrap類去看看。
protected void initEurekaEnvironment() throws Exception { log.info("Setting the eureka configuration.."); 。。。 } 【分析一】:咱們看到日誌在 initEurekaEnvironment 方法中被打印出來,而後我順着這個方法尋找該方法被調用的地方; public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } 【分析二】:接着發現 contextInitialized 這個方法裏面調用了 initEurekaEnvironment 方法,接着咱們再往上層尋找被調用的地方; 【分析三】:接着咱們看到 EurekaServerInitializerConfiguration 類中有個 start 方法,該方法建立了一個線程來後臺執行 EurekaServer 的初始化流程;
@Override public void start() { // 打上斷點 new Thread(new Runnable() { @Override public void run() { try { //TODO: is this class even needed now? eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); } 【分析一】:看到 log.info("Started Eureka Server"); 這行代碼,相信你們已經釋然了,這裏就是所謂的啓動了 EurekaServer 了,其實也就是 eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext) 初始化了一些咱們未知的東西; 【分析二】:當打印完啓動Eureka Server日誌後,調用了兩次 publish 方法,該方法最終調用的是 this.applicationContext.publishEvent (event) 方法,目的是利用Spring中ApplicationContext對事件傳遞性質,事件發佈者(applicationContext)來發布事件(event),可是缺乏的是監聽 者,其實你仔細搜索下代碼,發現好像沒有地方對 EurekaServerStartedEvent、EurekaRegistryAvailableEvent 進行監聽,奇了怪了,這是咋了呢? 【分析三】:而後找到 EurekaServerStartedEvent 所在的目錄下,EurekaInstanceCanceledEvent、EurekaInstanceRegisteredEvent、 EurekaInstanceRenewedEvent、EurekaRegistryAvailableEvent、EurekaServerStartedEvent 有這麼幾個事件的類,服務下線事件、服務註冊事 件、服務續約事件、註冊中心啓動事件、Eureka Server啓動事件,這麼幾個事件都沒有被監聽,那麼咱們是否是給添加上監聽,是否是就能夠了呢?像這樣 @EventListener public void listen(EurekaInstanceCanceledEvent event) { 。。。處下線邏輯 },添加 EventListener 監聽註解,就可 以在咱們本身的代碼邏輯中收到這個事件的回調了,因此想一想SpringCloud仍是挺機制的,提供回調接口讓咱們本身實現本身的業務邏輯,真心不錯; 【分析四】:那麼反過來想一想,爲啥會平白無故 start 方法就被調用了呢?那麼反向繼續向上找調用 start 方法的地方,結果找到了 DefaultLifecycleProcessor類的doStart方法調用了 bean.start(); 這麼一段代碼;
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) { // 打上斷點 Lifecycle bean = lifecycleBeans.remove(beanName); if (bean != null && !this.equals(bean)) { String[] dependenciesForBean = this.beanFactory.getDependenciesForBean(beanName); for (String dependency : dependenciesForBean) { doStart(lifecycleBeans, dependency, autoStartupOnly); } if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) { if (logger.isDebugEnabled()) { logger.debug("Starting bean '" + beanName + "' of type [" + bean.getClass() + "]"); } try { bean.start(); } catch (Throwable ex) { throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex); } if (logger.isDebugEnabled()) { logger.debug("Successfully started bean '" + beanName + "'"); } } } } 【分析一】:看到在 bean.isRunning 等一系列狀態的判斷下才去調用 bean.start() 方法的,咱們再往上尋找被調用地方; public void start() { // 打上斷點 if (this.members.isEmpty()) { return; } if (logger.isInfoEnabled()) { logger.info("Starting beans in phase " + this.phase); } Collections.sort(this.members); for (LifecycleGroupMember member : this.members) { if (this.lifecycleBeans.containsKey(member.name)) { doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); } } } 【分析二】:該類是DefaultLifecycleProcessor中內部類LifecycleGroup的一個方法,再往上尋找調用方; private void startBeans(boolean autoStartupOnly) { Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans(); Map<Integer, LifecycleGroup> phases = new HashMap<Integer, LifecycleGroup>(); for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) { Lifecycle bean = entry.getValue(); if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) { int phase = getPhase(bean); LifecycleGroup group = phases.get(phase); if (group == null) { group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly); phases.put(phase, group); } group.add(entry.getKey(), bean); } } if (phases.size() > 0) { List<Integer> keys = new ArrayList<Integer>(phases.keySet()); Collections.sort(keys); for (Integer key : keys) { phases.get(key).start(); } } } 【分析三】:startBeans 屬於 DefaultLifecycleProcessor 類的一個私有方法,startBeans 方法第一行就是獲取 getLifecycleBeans() 生命週期 Bean對象,因而可知彷佛 Eureka Server 之因此會被啓動,是否是實現了某個接口或者重寫了某個方法,纔會致使因爲容易在初始化的過程當中因調用某些特 殊方法或者某些類才啓動的,所以咱們回頭去看看 EurekaServerInitializerConfiguration 這個類; 【分析四】:結果發現 EurekaServerInitializerConfiguration 這個類實現了 SmartLifecycle 這麼樣的一個接口,而 SmartLifecycle 接口又繼 承了 Lifecycle 生命週期接口類,因此真想已經重見天日了,原來是實現了 Lifecycle 這樣的一個接口,而後實現了 start 方法,所以 Eureka Server 就這麼稀裏糊塗的就被莫名其妙的啓動起來了?
【分析一】:咱們以前僅僅只是經過了日誌來逆向分析,可是咱們是否是忘了咱們本應該標誌是Eureka Server的這個註解了呢?沒錯,咱們在分析的過程當中 已經將 @EnableEurekaServer 這個註解遺忘了,那麼咱們如今先回到這個註解類來看看;
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerConfiguration.class) public @interface EnableEurekaServer { } 【分析一】:咱們不難發現 EnableEurekaServer 類上有個 @Import 註解,引用了一個 class 文件,由此咱們進入觀察;
【分析一】:果不其然,這個類有不少 @Bean、@Configuration 註解過的方法,那是否是咱們能夠認爲剛纔 3.1~3.4 的推論是否是就是因爲被實例化了這麼一個 Bean,而後就慢慢的調用到了 start 方法了呢? 【分析二】:搜索 「Bootstrap」 字樣,還真發現了有這麼一個方法; @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); } 【分析三】:既然有這麼一個 Bean,那麼是否是和剛開始順着日誌逆向分析也是有必定道理的,沒有這麼一個Bean的存在,那麼 DefaultLifecycleProcessor.startBeans 方法中 getLifecycleBeans 的這個也就沒那麼順暢被找到了呢?不過個人猜測是這樣的,本人沒有將源碼下載下來,將 eurekaServerBootstrap 方法中的 @Bean 註解註釋掉試試,不過推理起來也八九不離十,這個疑問懸念就留給你們嘗試嘗試吧; 【分析四】:既然找到了一個 @Bean 註解過的方法,那咱們再找找其餘的一些被註解過的方法,好比一些通用全局用的相似詞眼,好比 Context,Bean,Init、Server 之類的; @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); } @Bean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) { return new PeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager); } @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } @Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); } 【分析五】:DefaultEurekaServerContext.initialize 初始化了一些東西,如今還不知道幹啥用的,先放這裏,打上斷點; 【分析六】:PeerEurekaNodes.start 方法,又是一個 start 方法,可是該類沒有實現任何類,姑且先放這裏,打上斷點; 【分析七】:InstanceRegistry.register 方法,並且還有幾個呢,多是客戶端註冊用的,也先放這裏,都打上斷點,或者將 這個類的全部方法都斷點上,斷點打完後發現有註冊的,有續約的,有註銷的; 【分析八】:打完這些斷點後,感受沒有思路了,索性就斷點跑一把,看看有什麼新的發現點;
【分析一】:DefaultEurekaServerContext.initialize 方法被調用了,證明了剛纔想法,EurekaServerConfiguration 不是白寫的,仍是有它的做用的; @PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); peerEurekaNodes.start(); registry.init(peerEurekaNodes); logger.info("Initialized"); } 【分析二】:進入 initialize 方法中 peerEurekaNodes.start(); public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; // 註釋:間隔 600000 毫秒,即 10分鐘 間隔執行一次服務集羣數據同步; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: " + node.getServiceUrl()); } } 【分析三】: start 方法中會看到一個定時調度的任務,updatePeerEurekaNodes(resolvePeerUrls()); 間隔 600000 毫秒,即 10分鐘 間隔執行一次服務集羣數據同步; 【分析四】: 而後斷點放走放下走,進入 initialize 方法中 registry.init(peerEurekaNodes); @Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; // 註釋:初始化 Eureka Server 響應緩存,默認緩存時間爲30s initializedResponseCache(); // 註釋:定時任務,多久重置一下心跳閾值,900000 毫秒,即 15分鐘 的間隔時間,會重置心跳閾值 scheduleRenewalThresholdUpdateTask(); // 註釋:初始化遠端註冊 initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } } 【分析五】: 緩存也配置好了,定時任務也配置好了,彷佛應該沒啥了,那麼咱們把斷點放開,看看下一步會走到哪裏?
【分析一】:先是 DefaultLifecycleProcessor.doStart 方法進斷點,而後纔是 EurekaServerInitializerConfiguration.start 方法進斷點; 【分析二】:再一次證實剛剛的逆向分析僅僅只是缺了個從頭EnableEurekaServer分析罷了,可是最終方法論分析思路仍是對的,因爲開始分析過這裏,然而咱們就跳過,繼續放開斷點向後繼續看看;
【分析一】:這不就是咱們剛纔在 「步驟3.7之分析七」 打的斷點麼?看下堆棧信息,正是 「步驟3.2之分析一」 中 initEurekaServerContext 方法中有 這麼一句 this.registry.openForTraffic(this.applicationInfoManager, registryCount); 調用到了,因果輪迴,代碼變幻無窮,打上斷點還有有好處的,結果仍是回到了開始日誌逆向分析的地方。 【分析二】:進入 super.openForTraffic 方法; @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 註釋:每30秒續約一次,那麼每分鐘續約就是2次,因此纔是 count * 2 的結果; this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got " + count + " instances from neighboring DS node"); logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); // 註釋:修改 Eureka Server 爲上電狀態,就是說設置 Eureka Server 已經處於活躍狀態了,那就是意味着 EurekaServer 基本上說能夠正常使用了; applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 註釋:定時任務,60000 毫秒,即 1分鐘 的間隔時間,Eureke Server按期進行失效節點的清理 super.postInit(); } 【分析三】:這裏主要設置了服務狀態,以及開啓了定時清理失效節點的定時任務,每分鐘掃描一次;
@EventListener(EmbeddedServletContainerInitializedEvent.class) public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) { // TODO: take SSL into account when Spring Boot 1.2 is available int localPort = event.getEmbeddedServletContainer().getPort(); if (this.port.get() == 0) { log.info("Updating port to " + localPort); this.port.compareAndSet(0, localPort); start(); } } 【分析一】:設置端口,當看到 Updating port to 8761 這樣的日誌打印出來的話,說明 Eureka Server 整個啓動也就差很少Over了。如今回頭看看, 發現分析了很多的方法和流程,有種感受被掏空的感受了。
一、初始化Eureka環境,Eureka上下文; 二、初始化EurekaServer的緩存 三、啓動了一些定時任務,好比充值心跳閾值定時任務,清理失效節點定時任務; 四、更新EurekaServer上電狀態,更新EurekaServer端口; 雖然我從列舉的流程裏面大概總結了這麼幾點,可是仍是有些是我沒關注到的,若是你們有關注到的,能夠和我共同討論分析分析。
【分析一】:因爲咱們剛纔在 org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每一個方法都打了一個斷點,並且如今 EurekaServer 已經處於 Debug 運行狀態,那麼咱們就隨便找一個被 @EnableEurekaClient 的微服務啓動試試,要麼就拿 springms-provider-user 微服務來試試吧,直接 Run。 【分析二】:猜想,若是如咱們分析所想,當 springms-provider-user 啓動後,就必定會調用註冊register方法,那麼就接着往下看,拭目以待;
【分析一】:因爲 InstanceRegistry.register 是咱們剛剛打斷點的地方,那麼咱們順着堆棧信息往上看,原來是 ApplicationResource.addInstance 方法被調用了,那麼咱們就看看 addInstance 這個方法,並在 addInstance 這裏打上斷點;接着咱們從新殺死 springms-provider-user 服務,而後再重啓 springms-provider-user 服務;
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible } 【分析一】:這裏的寫法貌似看起來和咱們以前 Controller 的 RESTFUL 寫法有點不同,仔細一看,原來是Jersey RESTful 框架,是一個產品級的 RESTful service 和 client 框架。與Struts相似,它一樣能夠和hibernate,spring框架整合。 【分析二】:緊接着,咱們看到 registry.register(info, "true".equals(isReplication)); 這麼一段代碼,註冊啊,原來EurekaClient客戶端啓 動後會調用會經過Http(s)請求,直接調到 ApplicationResource.addInstance 方法,那麼總算明白了,只要是和註冊有關的,都會調用這個方法。 【分析三】:接着咱們深刻 registry.register(info, "true".equals(isReplication)) 查看; @Override public void register(final InstanceInfo info, final boolean isReplication) { handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication); super.register(info, isReplication); } 【分析四】:看看 handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法; private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) { log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication); publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication)); } 【分析五】:該方法僅僅只是打了一個日誌,而後經過 ApplicationContext 發佈了一個事件 EurekaInstanceRegisteredEvent 服務註冊事件,正如 「步驟3.3之分析三」 所提到的,用戶能夠給 EurekaInstanceRegisteredEvent 添加監聽事件,那麼用戶就能夠在此刻實現本身想要的一些業務邏輯。 而後咱們再來看看 super.register(info, isReplication) 方法,該方法是 InstanceRegistry 的父類 PeerAwareInstanceRegistryImpl 的方法。
@Override public void register(final InstanceInfo info, final boolean isReplication) { // 註釋:續約時間,默認時間是常量值 90 秒 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; // 註釋:續約時間,固然也能夠從配置文件中取出來,因此說續約時間值也是可讓咱們本身自定義配置的 if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 註釋:將註冊方的信息寫入 EurekaServer 的註冊表,父類爲 AbstractInstanceRegistry super.register(info, leaseDuration, isReplication); // 註釋:EurekaServer 節點之間的數據同步,複製到其餘Peer replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } 【分析一】:進入 super.register(info, leaseDuration, isReplication) 看看是如何寫入 EurekaServer 的註冊表的,即進入 AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); // 註釋:registry 這個變量,就是咱們所謂的註冊表,註冊表是保存在內存中的; Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } } 【分析二】:發現這個方法有點長,大體閱讀,主要更新了註冊表的時間以外,還更新了緩存等其它東西,你們有興趣的能夠深究閱讀該方法;
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication // 註釋:若是已經複製過,就再也不復制 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // 遍歷Eureka Server集羣中的全部節點,進行復制操做 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 沒有複製過,遍歷Eureka Server集羣中的node節點,依次操做,包括取消、註冊、心跳、狀態更新等。 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } 【分析一】:走到這裏,我不難理解,每當有註冊請求,首先更新 EurekaServer 的註冊表,而後再將信息同步到其它EurekaServer的節點上去; 【分析二】:接下來咱們看看 node 節點是如何進行復制操做的,進入 replicateInstanceActionsToPeers 方法。 private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } } 【分析三】:節點之間的複製狀態操做,都在這裏體現的淋漓盡致,那麼咱們就拿 Register 類型 node.register(info) 來看,咱們來看看 node 到底是 如何作到同步信息的,進入 node.register(info) 方法看看;
public void register(final InstanceInfo info) throws Exception { // 註釋:任務過時時間給任務分發器處理,默認時間偏移當前時間 30秒 long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); } 【分析一】:這裏涉及到了 Eureka 的任務批處理,一般狀況下Peer之間的同步須要調用屢次,若是EurekaServer一多的話,那麼將會有不少http請求,所 以天然而然的孕育出了任務批處理,可是也在必定程度上致使了註冊和下線的一些延遲,突出優點的同時也勢必會形成一些劣勢,可是這些延遲狀況仍是能符合 常理在容忍範圍以內的。 【分析二】:在 expiryTime 超時時間以內,批次處理要作的事情就是合併任務爲一個List,而後發送請求的時候,將這個批次List直接打包發送請求出去,這樣的話,在這個批次的List裏面,可能包含取消、註冊、心跳、狀態等一系列狀態的集合List。 【分析三】:咱們再接着看源碼,batchingDispatcher.process 這麼一調用,而後咱們就直接看這個 TaskDispatchers.createBatchingTaskDispatcher 方法。 public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor<T> taskProcessor) { final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher<ID, T>() { @Override public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } @Override public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; } 【分析四】:這裏的 process 方法會將任務添加到隊列中,有入隊列天然有出隊列,具體怎麼取任務,我就不一一給你們講解了,我就講講最後是怎麼觸發任務的。進入 final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor) 這句代碼的 TaskExecutors.batchExecutors 方法。 static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) { final AtomicBoolean isShutdown = new AtomicBoolean(); final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name); return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() { @Override public WorkerRunnable<ID, T> create(int idx) { return new BatchWorkerRunnable<>("TaskBatchingWorker-" +name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor); } }, workerCount, isShutdown); } 【分析五】:咱們發現 TaskExecutors 類中的 batchExecutors 這個靜態方法,有個 BatchWorkerRunnable 返回的實現類,所以咱們再次進入 BatchWorkerRunnable 類看看究竟,並且既然是 Runnable,那麼勢必會有 run 方法。 @Override public void run() { try { while (!isShutdown.get()) { // 註釋:獲取信號量釋放 batchWorkRequests.release(),返回任務集合列表 List<TaskHolder<ID, T>> holders = getWork(); metrics.registerExpiryTimes(holders); List<T> tasks = getTasksOf(holders); // 註釋:將批量任務打包請求Peer節點 ProcessingResult result = processor.process(tasks); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); } } 【分析六】:這就是咱們 BatchWorkerRunnable 類的 run 方法,這裏面首先要獲取信號量釋放,才能得到任務集合,一旦獲取到了任務集合的話,那麼就直接調用 processor.process(tasks) 方法請求 Peer 節點同步數據,接下來咱們看看 ReplicationTaskProcessor.process 方法; @Override public ProcessingResult process(List<ReplicationTask> tasks) { ReplicationList list = createReplicationListOf(tasks); try { // 註釋:這裏經過 JerseyReplicationClient 客戶端對象直接發送list請求數據 EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list); int statusCode = response.getStatusCode(); if (!isSuccess(statusCode)) { if (statusCode == 503) { logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId); return ProcessingResult.Congestion; } else { // Unexpected error returned from the server. This should ideally never happen. logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size()); return ProcessingResult.PermanentError; } } else { handleBatchResponse(tasks, response.getEntity().getResponseList()); } } catch (Throwable e) { if (isNetworkConnectException(e)) { logNetworkErrorSample(null, e); return ProcessingResult.TransientError; } else { logger.error("Not re-trying this exception because it does not seem to be a network exception", e); return ProcessingResult.PermanentError; } } return ProcessingResult.Success; } 【分析七】:感受快要見到真相了,因此咱們火燒眉毛的進入 JerseyReplicationClient.submitBatchUpdates(ReplicationList replicationList) 方法一窺究竟。 @Override public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) { ClientResponse response = null; try { response = jerseyApacheClient.resource(serviceUrl) // 註釋:這纔是重點,請求目的相對路徑,peerreplication/batch/ .path(PeerEurekaNode.BATCH_URL_PATH) .accept(MediaType.APPLICATION_JSON_TYPE) .type(MediaType.APPLICATION_JSON_TYPE) .post(ClientResponse.class, replicationList); if (!isSuccess(response.getStatus())) { return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build(); } ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse.class); return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build(); } finally { if (response != null) { response.close(); } } } 【分析八】:看到了相對路徑地址,咱們搜索下"batch"這樣的字符串看看有沒有對應的接收方法或者被@Path註解進入的;在 eureka-core-1.4.12.jar 這個包下面,果真搜到到了 @Path("batch") 這樣的字樣,直接進入,發現這是 PeerReplicationResource 類的方法 batchReplication,咱們進入這方法看看。 @Path("batch") @POST public Response batchReplication(ReplicationList replicationList) { try { ReplicationListResponse batchResponse = new ReplicationListResponse(); // 註釋:這裏將收到的任務列表,依次循環解析處理,主要核心方法在 dispatch 方法中。 for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) { try { batchResponse.addResponse(dispatch(instanceInfo)); } catch (Exception e) { batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null)); logger.error(instanceInfo.getAction() + " request processing failed for batch item " + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e); } } return Response.ok(batchResponse).build(); } catch (Throwable e) { logger.error("Cannot execute batch Request", e); return Response.status(Status.INTERNAL_SERVER_ERROR).build(); } } 【分析九】:看到了循環一次遍歷任務進行處理,不知不覺以爲心花盛開,勝利的重點立刻就要到來了,咱們進入 PeerReplicationResource.dispatch 方法看看。 private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { ApplicationResource applicationResource = createApplicationResource(instanceInfo); InstanceResource resource = createInstanceResource(instanceInfo, applicationResource); String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); String instanceStatus = toString(instanceInfo.getStatus()); Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case Register: singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); break; } return singleResponseBuilder.build(); } 【分析十】:隨便抓一個類型,那咱們也拿 Register 類型來看,進入 PeerReplicationResource.handleRegister 看看。 private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) { // 註釋:private static final String REPLICATION = "true"; 定義的一個常量值,並且仍是回調 ApplicationResource.addInstance 方法 applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION); return new Builder().setStatusCode(Status.OK.getStatusCode()); } 【分析十一】:Peer節點的同步旅程終於結束了,最終又回調到了 ApplicationResource.addInstance 這個方法,這個方法在最終是EurekaClient啓動後註冊調用的方法,然而Peer節點的信息同步也調用了這個方法,僅僅只是經過一個變量 isReplication 爲true仍是false來判斷是不是節點複製。剩下的ApplicationResource.addInstance流程前面已經提到過了,相信你們已經明白了註冊的流程是如何扭轉的,包括批量任務是如何處理EurekaServer節點之間的信息同步的了。
詳見 SpringCloud(第 050 篇)Netflix Eureka 源碼深刻剖析(下)git
https://gitee.com/ylimhhmily/SpringCloudTutorial.git算法
SpringCloudTutorial交流QQ羣: 235322432spring
SpringCloudTutorial交流微信羣: 微信溝通羣二維碼圖片連接json
歡迎關注,您的確定是對我最大的支持!!!bootstrap