SpringCloud學習筆記(3、SpringCloud Netflix Eureka)

目錄:

  • 服務發現簡介
  • SpringCloud Netflix Eureka應用
  • Eureka高可用
  • Eureka源碼分析 >>> Eureka Client初始化(客戶端定時獲取服務列表、客戶端定時發送心跳續約、客戶端定時註冊)源碼分析、服務下線源碼分析

服務發現簡介:

一、什麼是服務發現spring

程序經過一個標識獲取服務列表,且這個服務列表可以跟隨服務的狀態而動態變動緩存

二、服務發現的兩種模式安全

)客戶端模式:調用微服務時,首先到註冊中心獲取服務列表,而後再根據調用本地的負載均衡策略進行服務調用,而且本地會緩存一份服務列表服務器

)服務端模式:調用端直接向註冊中心發起請求,註冊中心再經過自身的負載均衡策略進行服務調用調用端自身是不須要維護服務發現邏輯app

三、客戶端、服務端兩種模式的比較負載均衡

客戶端ide

a、獲取列表爲週期性,在調用上減小了一次鏈路,但每一個客戶端都須要維護獲取服務列表的邏輯函數

b、可用性高,由於本地緩存了一份服務列表的緣由,因此即便註冊中心出現故障了也不會影響客戶端的正常使用微服務

c、服務端上下線先會對客戶端有必定的影響,會出現短暫的調用失敗源碼分析

服務端

a、簡單,客戶端不須要維護獲取服務列表的邏輯

b、可用性由服務管理者覺定,若服務管理者發送故障則全部的客戶端將不可用;同時,全部的調用及存儲都有服務管理者來完成,這樣服務管理者可能會負載太高

c、服務端上下線客戶端無感知

SpringCloud Netflix Eureka應用:

一、Eureka服務端

)添加Eureka Server依賴

1 <dependency>
2     <groupId>org.springframework.cloud</groupId>
3     <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
4 </dependency>

)啓動類加上@EnableEurekaServer註解

)配置properties

 1 ## Eureka註冊中心實例名  2 spring.application.name=eureka-server  3 ## Eureka註冊中心端口  4 server.port=9090
 5 
 6 ## 關閉Actuator驗證開關  7 management.security.enabled=false
 8 
 9 ## 不向註冊中心獲取服務列表 10 ## eureka服務端的eureka.client.fetch-registry配置必須爲false讓他不要去找server節點,由於它自己就是server節點 11 eureka.client.fetch-registry=false
12 ## 不註冊到註冊中心上 13 eureka.client.register-with-eureka=false
14 
15 ## 配置 註冊中心的 地址 16 eureka.client.service-url.defaultZone=http://localhost:9090/eureka

啓動後可經過http://localhost:9090/eureka查看服務端狀況

二、Eureka客戶端

)添加Eureka Client依賴

1 <dependency>
2     <groupId>org.springframework.cloud</groupId>
3     <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
4 </dependency>

)啓動類加上@EnableEurekaClient或@EnableDiscoveryClient

兩者的共同點是:都是可以讓註冊中心可以發現,掃描到該服務。

不一樣點:@EnableEurekaClient只適用於Eureka做爲註冊中心,而@EnableDiscoveryClient能夠是其餘的註冊中心。

)配置properties

 1 ## Eureka服務提供者實例名  2 spring.application.name=eureka-provider  3 ## Eureka服務提供者端口  4 server.port=8070
 5 
 6 ## 關閉Actuator驗證開關  7 management.security.enabled=false
 8 
 9 ## 配置 註冊中心的 地址 10 eureka.client.service-url.defaultZone=http://localhost:9090/eureka/

三、一些經常使用的配置

)客戶端配置

)服務實例配置

Eureka高可用:

咱們都知道Eureka分爲服務端和客戶端,因此搭建高可用的Eureka時兩者都須要高可用。

一、服務端

服務端的高可用其實就是讓客戶端獲取服務列表時儘量的少失敗,因此咱們只須要啓動兩個Eureka Server,讓他們相互複製服務列表便可

1 server.port=9091
2 eureka.client.service-url.defaultZone=http://localhost:9092/eureka
1 server.port=9092
2 eureka.client.service-url.defaultZone=http://localhost:9091/eureka

二、客戶端

客戶端的高可用就是在獲取服務列表時儘量的少失敗,因此咱們只須要配置多個註冊中心便可

1 eureka.client.service-url.defaultZone=http://localhost:9091/eureka,http://localhost:9092/eureka

Eureka Client初始化: 

首先咱們知道若想將一個應用程序註冊爲Eureka的客戶端那最主要的即是在啓動類上加上@EnableDiscoveryClient這個註解,這即是Eureka Client的初始化。

閱讀目的:爲何加上@EnableDiscoveryClient註解後且配置eureka.client.service-url.defaultZone=http後,就能將此服務註冊到eureka,且可以讓其它註冊的服務發現此服務。

一、首先以@EnableDiscoveryClient註解來分析

從註解的字面意思來看就是啓動DiscoveryClient,咱們來大膽的猜想下 φ(>ω<*) 

emmmmm,程序中應該有DiscoveryClient這個類吧,Ctrl + n,(⊙o⊙)…果然有這個類!

根據IDEA的檢索結果,發現知足條件的有兩個,一個是com.netflix.discovery.DiscoveryClient,一個是org.springframework.cloud.client.discovery.DiscoveryClient

看源碼嘛,確定先看結構咯~~~

com.netflix.discovery.DiscoveryClient

org.springframework.cloud.client.discovery.DiscoveryClient是一個接口,其實現有不少,但Eureka的是org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient

因此兩個DiscoveryClient的邏輯結構便以下圖:

二、瞭解結構後,咱們再略讀下這兩個DiscoveryClient的實現

)首先從SpringCloud的看起,實現類org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient

a、經過略讀能夠看出比較重要的public List<ServiceInstance> getInstances(String serviceId)、public List<String> getServices()兩個方法都有使用一個eurekaClient的屬性

b、而eurekaClient正是,Netflix的EurekaClient接口,因此咱們能夠得知SpringCloud應該僅是對Netflix的一個包裝

c、因此咱們直接看Netflix的EurekaClient接口(com.netflix.discovery.EurekaClient)的實現 >>> com.netflix.discovery.DiscoveryClient

com.netflix.discovery.DiscoveryClient的實現

a、通常看類的實現是從構造函數入手,因此咱們先找到最全的構造函數:DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider);

b、此構造一開始是大量的判斷,這塊就不看了,咱們僅看最重要那部分(正常邏輯下,大量判斷中的代碼邏輯不是對bug的處理就是特殊狀況的處理,因此咱們先看那些不在if中的代碼或者是少許if的代碼) >>> initScheduledTasks()

c、initScheduledTasks():初始化定時器,其主要分爲三大塊 >>> 客戶端定時獲取服務列表、客戶端定時發送心跳續約、客戶端定時註冊

客戶端定時獲取服務列表源碼分析:

源碼示例(客戶端):

1 if (clientConfig.shouldFetchRegistry()) { 2     // registry cache refresh timer
 3     int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); 4     int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); 5 scheduler.schedule( 6         new TimedSupervisorTask( 7             "cacheRefresh", 8 scheduler, 9 cacheRefreshExecutor, 10 registryFetchIntervalSeconds, 11 TimeUnit.SECONDS, 12 expBackOffBound, 13             new CacheRefreshThread() // 定時獲取服務列表thread
14 ), 15 registryFetchIntervalSeconds, TimeUnit.SECONDS); 16 }

一、首先咱們看第1行,clientConfig.shouldFetchRegistry() == true纔會執行獲取服務列表的job,咱們點進去看,發現其實就是咱們properties配置的eureka.client.fetch-registry,而默認值爲true。

二、而後執行job的週期單位爲秒(11行),執行週期爲registryFetchIntervalSeconds,也就是第3行;第3行和第1行同理,爲properties配置的eureka.client.registry-fetchInterval-seconds,而默認值爲30

三、最後咱們看看其核心線程(13行),能夠看到其調用的函數實際上是void refreshRegistry();函數最開始一大堆判斷,最後一堆debug,這些都不用細究,咱們直接看最核心的哪行代碼boolean success = fetchRegistry(remoteRegionsModified)

 1 private boolean fetchRegistry(boolean forceFullRegistryFetch) {  2     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();  3 
 4     try {  5         // If the delta is disabled or if it is the first time, get all  6         // applications
 7         Applications applications = getApplications();  8 
 9         if (clientConfig.shouldDisableDelta() 10                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) 11                 || forceFullRegistryFetch 12                 || (applications == null) 13                 || (applications.getRegisteredApplications().size() == 0) 14                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
15  { 16             logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); 17             logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); 18             logger.info("Force full registry fetch : {}", forceFullRegistryFetch); 19             logger.info("Application is null : {}", (applications == null)); 20             logger.info("Registered Applications size is zero : {}", 21                     (applications.getRegisteredApplications().size() == 0)); 22             logger.info("Application version is -1: {}", (applications.getVersion() == -1)); 23  getAndStoreFullRegistry(); 24         } else { 25  getAndUpdateDelta(applications); 26  } 27  applications.setAppsHashCode(applications.getReconcileHashCode()); 28  logTotalInstances(); 29     } catch (Throwable e) { 30         logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); 31         return false; 32     } finally { 33         if (tracer != null) { 34  tracer.stop(); 35  } 36  } 37 
38     // Notify about cache refresh before updating the instance remote status
39  onCacheRefreshed(); 40 
41     // Update remote status based on refreshed data held in the cache
42  updateInstanceRemoteStatus(); 43 
44     // registry was fetched successfully, so return true
45     return true; 46 }

從代碼中咱們能夠看出要麼是全量註冊(23行)要麼是增量註冊(25行):

)全量註冊

 1 private void getAndStoreFullRegistry() throws Throwable {  2     long currentUpdateGeneration = fetchRegistryGeneration.get();  3 
 4     logger.info("Getting all instance registry info from the eureka server");  5 
 6     Applications apps = null;  7     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
 8             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())  9  : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); 10     if (httpResponse.getStatusCode() == Response.Status.OK.getStatusCode()) { 11         apps = httpResponse.getEntity(); 12  } 13     logger.info("The response status is {}", httpResponse.getStatusCode()); 14 
15     if (apps == null) { 16         logger.error("The application is null for some reason. Not storing this information"); 17     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { 18         localRegionApps.set(this.filterAndShuffle(apps)); 19         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); 20     } else { 21         logger.warn("Not updating applications as another thread is updating it already"); 22  } 23 }

)從中能夠看出18行將apps從httpResponse獲取,因此服務列表應該是從服務端獲取的;故看下http調用,第8行調到其實現類com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient,看到其調用的http url是apps/

1 @Override 2 public EurekaHttpResponse<Applications> getApplications(String... regions) { 3     return getApplicationsInternal("apps/", regions); 4 }

再點進去看,發現其實就是調用了http://服務端ip:服務端port/eureka/apps(GET請求),而且將結果放入Applications

)增量註冊,增量註冊與全量同理,但調用的是http://服務端ip:服務端port/eureka/apps/delta接口

全量增量註冊講完後咱們來看看服務端的代碼(着重看全量,增量與全量原理差很少)

源碼示例(服務端):

咱們知道全量註冊調用的是http://服務端ip:服務端port/eureka/apps接口,咱們找下這個接口的實現,com.netflix.eureka.resources.ApplicationsResource#getContainers,並找到最重要的代碼塊

 1 Response response;  2 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {  3     response = Response.ok(responseCache.getGZIP(cacheKey))  4  .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)  5  .header(HEADER_CONTENT_TYPE, returnMediaType)  6  .build();  7 } else {  8     response = Response.ok(responseCache.get(cacheKey))  9  .build(); 10 } 11 return response;

進入responseCache.getZIP(cacheKey)後,咱們能夠知道代碼很簡單,就是從緩存中獲取數據給客戶端。

參數useReadOnlyCache也是客戶端配置的,默認爲true;6-12行很簡單,緩存有取緩存,沒有則從readWriteCacheMap拿到後再放入緩存。

 1 @VisibleForTesting  2 ResponseCacheImpl.Value getValue(final Key key, boolean useReadOnlyCache) {  3     ResponseCacheImpl.Value payload = null;  4     try {  5         if (useReadOnlyCache) {  6             final ResponseCacheImpl.Value currentPayload = readOnlyCacheMap.get(key);  7             if (currentPayload != null) {  8                 payload = currentPayload;  9             } else { 10                 payload = readWriteCacheMap.get(key); 11  readOnlyCacheMap.put(key, payload); 12  } 13         } else { 14             payload = readWriteCacheMap.get(key); 15  } 16     } catch (Throwable t) { 17         logger.error("Cannot get value for key :" + key, t); 18  } 19     return payload; 20 }

客戶端定時發送心跳續約:

 1 scheduler.schedule(  2         new TimedSupervisorTask(  3                 "heartbeat",  4  scheduler,  5  heartbeatExecutor,  6  renewalIntervalInSecs,  7  TimeUnit.SECONDS,  8  expBackOffBound,  9                 new HeartbeatThread() 10  ), 11         renewalIntervalInSecs, TimeUnit.SECONDS);

簡單的地方咱們就不一一看了,直接進入重點,發現第9行就是不斷刷新lastSuccessfulHeartbeatTimestamp使之爲當前時間戳(永不過時)

1 if (renew()) { 2     lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); 3 }

那咱們再看看更新lastSuccessfulHeartbeatTimestamp的條件renew唄!

查看renew後能夠得知,心跳續約調用了客戶端的/apps/appName/id接口(PUT請求);而後咱們卡卡客戶端實現

一、首先接口在com.netflix.eureka.resources.InstanceResource#renewLease

二、咱們想其核心代碼boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode)裏跟進

 1 public boolean renew(String appName, String id, boolean isReplication) {  2  RENEW.increment(isReplication);  3     // 仍是同樣的從registry中拿InstanceInfo
 4     Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);  5     Lease<InstanceInfo> leaseToRenew = null;  6     if (gMap != null) {  7         leaseToRenew = gMap.get(id);  8  }  9     // 若是拿不到InstanceInfo就表示服務掛了,心跳續約失敗
10     if (leaseToRenew == null) { 11  RENEW_NOT_FOUND.increment(isReplication); 12         logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); 13         return false; 14     } else { 15         InstanceInfo instanceInfo = leaseToRenew.getHolder(); 16         if (instanceInfo != null) { 17             // touchASGCache(instanceInfo.getASGName());
18             InstanceInfo.InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( 19  instanceInfo, leaseToRenew, isReplication); 20             if (overriddenInstanceStatus == InstanceInfo.InstanceStatus.UNKNOWN) { 21                 logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
22                         + "; re-register required", instanceInfo.getId()); 23  RENEW_NOT_FOUND.increment(isReplication); 24                 return false; 25  } 26             if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { 27                 Object[] args = { 28  instanceInfo.getStatus().name(), 29  instanceInfo.getOverriddenStatus().name(), 30  instanceInfo.getId() 31  }; 32  logger.info( 33                         "The instance status {} is different from overridden instance status {} for instance {}. "
34                                 + "Hence setting the status to overridden status", args); 35  instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); 36  } 37  } 38  renewsLastMin.increment(); 39         // 若是能拿到InstanceInfo就作一個續約
40  leaseToRenew.renew(); 41         return true; 42  } 43 }

三、進一步看下續約leaseToRenew.renew()方法

1 public void renew() { 2     lastUpdateTimestamp = System.currentTimeMillis() + duration; 3 }

代碼很簡單,就是延長lastUpdateTimestamp的時間,duration則是經過構造傳入的;若duration有執行則用構造指定的,若沒有默認90秒

客戶端定時註冊:

1 instanceInfoReplicator = new InstanceInfoReplicator( 2         this, 3  instanceInfo, 4  clientConfig.getInstanceInfoReplicationIntervalSeconds(), 5         2); // burstSize

進入InstanceInfoReplicator後會發現這個類實現Runnable接口,那既然是線程就去看run方法咯

 1 public void run() {  2     try {  3  discoveryClient.refreshInstanceInfo();  4 
 5         Long dirtyTimestamp = instanceInfo.isDirtyWithTime();  6         if (dirtyTimestamp != null) {  7  discoveryClient.register();  8  instanceInfo.unsetIsDirty(dirtyTimestamp);  9  } 10     } catch (Throwable t) { 11         logger.warn("There was a problem with the instance info replicator", t); 12     } finally { 13         Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); 14  scheduledPeriodicRef.set(next); 15  } 16 }

而後咱們看第7行,一步步根進去發現其實調用了"apps/" + info.getAppName()接口

接下來咱們來看看"apps/" + info.getAppName()接口的實現

一、找到com.netflix.eureka.resources.ApplicationResourceaddInstance方法,找到其中的registry.register(info, "true".equals(isReplication));根進去

二、找到com.netflix.eureka.registry.AbstractInstanceRegistry的register方法

 1 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {  2     try {  3  read.lock();  4         // 根據實例名registrant.getAppName()獲取InstanceInfo
 5         Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());  6  REGISTER.increment(isReplication);  7         if (gMap == null) {  8             final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();  9             // 若registry沒有此實例時,註冊一個空的示例
10             gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); 11             if (gMap == null) { 12                 gMap = gNewMap; 13  } 14  } 15         Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); 16         // Retain the last dirty timestamp without overwriting it, if there is already a lease
17         if (existingLease != null && (existingLease.getHolder() != null)) { 18             Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); 19             Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); 20             logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 21 
22             // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted 23             // InstanceInfo instead of the server local copy. 24             // 若eureka服務器存在的示例的時間戳大於傳入新實例的時間戳,則用已存在的(說明eureka server使用的實例都是最新的)
25             if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 26                 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
27                         " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 28                 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); 29                 registrant = existingLease.getHolder(); 30  } 31         } else { 32             // The lease does not exist and hence it is a new registration
33             synchronized (lock) { 34                 if (this.expectedNumberOfRenewsPerMin > 0) { 35                     // Since the client wants to cancel it, reduce the threshold 36                     // (1 37                     // for 30 seconds, 2 for a minute)
38                     this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; 39                     this.numberOfRenewsPerMinThreshold =
40                             (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); 41  } 42  } 43             logger.debug("No previous lease information found; it is new registration"); 44  } 45         // 根據新的示例建立新的租約
46         Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); 47         if (existingLease != null) { 48  lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 49  } 50  gMap.put(registrant.getId(), lease); 51         synchronized (recentRegisteredQueue) { 52             recentRegisteredQueue.add(new Pair<Long, String>( 53  System.currentTimeMillis(), 54                     registrant.getAppName() + "(" + registrant.getId() + ")")); 55  } 56         // This is where the initial state transfer of overridden status happens
57         if (!InstanceInfo.InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 58             logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
59                     + "overrides", registrant.getOverriddenStatus(), registrant.getId()); 60             if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { 61                 logger.info("Not found overridden id {} and hence adding it", registrant.getId()); 62  overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); 63  } 64  } 65         InstanceInfo.InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); 66         if (overriddenStatusFromMap != null) { 67             logger.info("Storing overridden status {} from map", overriddenStatusFromMap); 68  registrant.setOverriddenStatus(overriddenStatusFromMap); 69  } 70 
71         // Set the status based on the overridden status rules
72         InstanceInfo.InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); 73  registrant.setStatusWithoutDirty(overriddenInstanceStatus); 74 
75         // If the lease is registered with UP status, set lease service up timestamp
76         if (InstanceInfo.InstanceStatus.UP.equals(registrant.getStatus())) { 77  lease.serviceUp(); 78  } 79  registrant.setActionType(InstanceInfo.ActionType.ADDED); 80         recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease)); 81  registrant.setLastUpdatedTimestamp(); 82  invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 83         logger.info("Registered instance {}/{} with status {} (replication={})", 84  registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); 85     } finally { 86  read.unlock(); 87  } 88 }

服務下線: 

若要將此客戶端下線的話,要分兩步走

)啓動下線接口(配置properties)

1 # 啓用下線功能 2 endpoints.shutdown.enabled=true
3 # 關閉下線功能的安全校驗 4 endpoints.shutdown.sensitive=false

)調用下線接口(http://當前客戶端ip/當前客戶端port/shutdown)

一、客戶端

客戶端代碼很簡單,主要分爲兩步

)cancelScheduledTasks() >>> 中止客戶端初始化的三個job(客戶端定時獲取服務列表、客戶端定時發送心跳續約、客戶端定時註冊)

)unregister() >>> 註銷(調用apps/appName/id接口,delete方式)

二、服務端

首先咱們來找到客戶端調的那個接口:com.netflix.eureka.resources.InstanceResource#cancelLease

 1 @DELETE  2 public Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {  3     boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication));  4 
 5     if (isSuccess) {  6         logger.debug("Found (Cancel): " + app.getName() + " - " + id);  7         return Response.ok().build();  8     } else {  9         logger.info("Not Found (Cancel): " + app.getName() + " - " + id); 10         return Response.status(Status.NOT_FOUND).build(); 11  } 12 }

咱們看核心代碼(第3行),並找到方法的實現com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#cancel

 1 @Override  2 public boolean cancel(final String appName, final String id, final boolean isReplication) {  3     if (super.cancel(appName, id, isReplication)) {  4         replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Cancel, appName, id, null, null, isReplication);  5         synchronized (lock) {  6             if (this.expectedNumberOfRenewsPerMin > 0) {  7                 // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
 8                 this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;  9                 this.numberOfRenewsPerMinThreshold =
10                         (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); 11  } 12  } 13         return true; 14  } 15     return false; 16 }

看第三行,而後一步步跟進後找到其底層代碼:

 1 protected boolean internalCancel(String appName, String id, boolean isReplication) {  2     try {  3  read.lock();  4  CANCEL.increment(isReplication);  5         // 仍是一樣的從registry中拿取數據
 6         Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);  7         Lease<InstanceInfo> leaseToCancel = null;  8         if (gMap != null) {  9             // 若拿到數據則移除該數據
10             leaseToCancel = gMap.remove(id); 11  } 12         synchronized (recentCanceledQueue) { 13             recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); 14  } 15         InstanceInfo.InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); 16         if (instanceStatus != null) { 17             logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); 18  } 19         if (leaseToCancel == null) { 20  CANCEL_NOT_FOUND.increment(isReplication); 21             logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); 22             return false; 23         } else { 24             // 將此InstanceInfo移除
25  leaseToCancel.cancel(); 26             InstanceInfo instanceInfo = leaseToCancel.getHolder(); 27             String vip = null; 28             String svip = null; 29             if (instanceInfo != null) { 30  instanceInfo.setActionType(InstanceInfo.ActionType.DELETED); 31                 recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(leaseToCancel)); 32  instanceInfo.setLastUpdatedTimestamp(); 33                 vip = instanceInfo.getVIPAddress(); 34                 svip = instanceInfo.getSecureVipAddress(); 35  } 36  invalidateCache(appName, vip, svip); 37             logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); 38             return true; 39  } 40     } finally { 41  read.unlock(); 42  } 43 }

最後服務下單其實就是調用leaseToCancel.cancel(),經過更新evictionTimestamp來取消租賃

1 public void cancel() { 2     if (evictionTimestamp <= 0) { 3         evictionTimestamp = System.currentTimeMillis(); 4  } 5 }
相關文章
相關標籤/搜索