註冊中心 Eureka 源碼解析 —— 應用實例註冊發現(六)之全量獲取

摘要: 原創出處 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!html

本文主要基於 Eureka 1.8.X 版本java


🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:git

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右
  5. 認真的源碼交流微信羣。

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 獲取全量註冊信息的過程github

FROM 《深度剖析服務發現組件Netflix Eureka》
web

Eureka-Client 獲取註冊信息,分紅全量獲取增量獲取。默認配置下,Eureka-Client 啓動時,首先執行一次全量獲取進行本地緩存註冊信息,然後每 30增量獲取刷新本地緩存( 非「正常」狀況下會是全量獲取 )。spring

本文重點在於全量獲取segmentfault

推薦 Spring Cloud 書籍緩存

推薦 Spring Cloud 視頻微信

2. Eureka-Client 發起全量獲取

本小節調用關係以下:架構

2.1 初始化全量獲取

Eureka-Client 啓動時,首先執行一次全量獲取進行本地緩存註冊信息,首先代碼以下:

// DiscoveryClient.java
/** * Applications 在本地的緩存 */
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
                    
     // ... 省略無關代碼
     
    // 【3.2.5】初始化應用集合在本地的緩存
    localRegionApps.set(new Applications());
     
    // ... 省略無關代碼 
     
    // 【3.2.12】從 Eureka-Server 拉取註冊信息
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }
     
     // ... 省略無關代碼 
}
複製代碼
  • com.netflix.discovery.shared.Applications,註冊的應用集合。較爲容易理解,點擊 連接 連接查看帶中文註釋的類,這裏就不囉嗦了。Applications 與 InstanceInfo 類關係以下:

  • 配置 eureka.shouldFetchRegistry = true,開啓從 Eureka-Server 獲取註冊信息。默認值:true

  • 調用 #fetchRegistry(false) 方法,從 Eureka-Server 全量獲取註冊信息,在 「2.4 發起獲取註冊信息」 詳細解析。

2.2 定時獲取

Eureka-Client 在初始化過程當中,建立獲取註冊信息線程,固定間隔向 Eureka-Server 發起獲取註冊信息( fetch ),刷新本地註冊信息緩存。實現代碼以下:

// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
               Provider<BackupRegistry> backupRegistryProvider) {
    // ... 省略無關代碼
               
    // 【3.2.9】初始化線程池
    // default size of 2 - 1 each for heartbeat and cacheRefresh
    scheduler = Executors.newScheduledThreadPool(2,
         new ThreadFactoryBuilder()
                 .setNameFormat("DiscoveryClient-%d")
                 .setDaemon(true)
                 .build());
    
    cacheRefreshExecutor = new ThreadPoolExecutor(
         1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(),
         new ThreadFactoryBuilder()
                 .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                 .setDaemon(true)
                 .build()
     );  // use direct handoff
    
    // ... 省略無關代碼
    
    // 【3.2.14】初始化定時任務
    initScheduledTasks();
    
    // ... 省略無關代碼
}

private void initScheduledTasks() {
    // 向 Eureka-Server 心跳(續租)執行器
    if (clientConfig.shouldFetchRegistry()) {
       // registry cache refresh timer
       int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
       int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
       scheduler.schedule(
               new TimedSupervisorTask(
                       "cacheRefresh",
                       scheduler,
                       cacheRefreshExecutor,
                       registryFetchIntervalSeconds,
                       TimeUnit.SECONDS,
                       expBackOffBound,
                       new CacheRefreshThread()
               ),
               registryFetchIntervalSeconds, TimeUnit.SECONDS);
     }
     // ... 省略無關代碼
}
複製代碼

2.3 刷新註冊信息緩存

調用 #refreshRegistry(false) 方法,刷新註冊信息緩存,實現代碼以下:

// DiscoveryClient.java
  1: void refreshRegistry() {
  2:     try {
  3:         // TODO 芋艿:TODO[0009]:RemoteRegionRegistry
  4:         boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
  5: 
  6:         boolean remoteRegionsModified = false;
  7:         // This makes sure that a dynamic change to remote regions to fetch is honored.
  8:         String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
  9:         if (null != latestRemoteRegions) {
 10:             String currentRemoteRegions = remoteRegionsToFetch.get();
 11:             if (!latestRemoteRegions.equals(currentRemoteRegions)) {
 12:                 // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
 13:                 synchronized (instanceRegionChecker.getAzToRegionMapper()) {
 14:                     if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
 15:                         String[] remoteRegions = latestRemoteRegions.split(",");
 16:                         remoteRegionsRef.set(remoteRegions);
 17:                         instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
 18:                         remoteRegionsModified = true;
 19:                     } else {
 20:                         logger.info("Remote regions to fetch modified concurrently," +
 21:                                 " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
 22:                     }
 23:                 }
 24:             } else {
 25:                 // Just refresh mapping to reflect any DNS/Property change
 26:                 instanceRegionChecker.getAzToRegionMapper().refreshMapping();
 27:             }
 28:         }
 29: 
 30:         boolean success = fetchRegistry(remoteRegionsModified);
 31:         if (success) {
 32:             // 設置 註冊信息的應用實例數
 33:             registrySize = localRegionApps.get().size();
 34:             // 設置 最後獲取註冊信息時間
 35:             lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
 36:         }
 37: 
 38:         // 打印日誌
 39:         if (logger.isDebugEnabled()) {
 40:             StringBuilder allAppsHashCodes = new StringBuilder();
 41:             allAppsHashCodes.append("Local region apps hashcode: ");
 42:             allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
 43:             allAppsHashCodes.append(", is fetching remote regions? ");
 44:             allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
 45:             for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
 46:                 allAppsHashCodes.append(", Remote region: ");
 47:                 allAppsHashCodes.append(entry.getKey());
 48:                 allAppsHashCodes.append(" , apps hashcode: ");
 49:                 allAppsHashCodes.append(entry.getValue().getAppsHashCode());
 50:             }
 51:             logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
 52:                     allAppsHashCodes.toString());
 53:         }
 54:     } catch (Throwable e) {
 55:         logger.error("Cannot fetch registry from server", e);
 56:     }        
 57: }
複製代碼
  • 第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry

  • 第 30 行 :調用 #fetchRegistry(false) 方法,從 Eureka-Server 獲取註冊信息,在 「2.4 發起獲取註冊信息」 詳細解析。

  • 第 31 至 36 行 :獲取註冊信息成功,設置註冊信息的應用實例數,最後獲取註冊信息時間。變量代碼以下:

    /** * 註冊信息的應用實例數 */
    private volatile int registrySize = 0;
    /** * 最後成功從 Eureka-Server 拉取註冊信息時間戳 */
    private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
    複製代碼
  • 第 38 至 53 行 :打印調試日誌。

  • 第 54 至 56 行 :打印異常日誌。

2.4 發起獲取註冊信息

調用 #fetchRegistry(false) 方法,從 Eureka-Server 獲取註冊信息( 根據條件判斷,多是全量,也多是增量 ),實現代碼以下:

1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {
  2:     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
  3: 
  4:     try {
  5:         // 獲取 本地緩存的註冊的應用實例集合
  6:         // If the delta is disabled or if it is the first time, get all
  7:         // applications
  8:         Applications applications = getApplications();
  9: 
 10:         // 全量獲取
 11:         if (clientConfig.shouldDisableDelta() // 禁用增量獲取
 12:                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
 13:                 || forceFullRegistryFetch
 14:                 || (applications == null) // 空
 15:                 || (applications.getRegisteredApplications().size() == 0) // 空
 16:                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
 17:         {
 18:             logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
 19:             logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
 20:             logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
 21:             logger.info("Application is null : {}", (applications == null));
 22:             logger.info("Registered Applications size is zero : {}",
 23:                     (applications.getRegisteredApplications().size() == 0));
 24:             logger.info("Application version is -1: {}", (applications.getVersion() == -1));
 25:             // 執行 全量獲取
 26:             getAndStoreFullRegistry();
 27:         } else {
 28:             // 執行 增量獲取
 29:             getAndUpdateDelta(applications);
 30:         }
 31:         // 設置 應用集合 hashcode
 32:         applications.setAppsHashCode(applications.getReconcileHashCode());
 33:         // 打印 本地緩存的註冊的應用實例數量
 34:         logTotalInstances();
 35:     } catch (Throwable e) {
 36:         logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
 37:         return false;
 38:     } finally {
 39:         if (tracer != null) {
 40:             tracer.stop();
 41:         }
 42:     }
 43: 
 44:     // Notify about cache refresh before updating the instance remote status
 45:     onCacheRefreshed();
 46: 
 47:     // Update remote status based on refreshed data held in the cache
 48:     updateInstanceRemoteStatus();
 49: 
 50:     // registry was fetched successfully, so return true
 51:     return true;
 52: }
複製代碼
  • 第 5 至 8 行 :獲取本地緩存的註冊的應用實例集合,實現代碼以下:

    public Applications getApplications() {
       return localRegionApps.get();
    }
    複製代碼
  • 第 10 至 26 行 :全量獲取註冊信息。

    • 第 11 行 :配置 eureka.disableDelta = true ,禁用增量獲取註冊信息。默認值:false
    • 第 12 行 :只得到一個 vipAddress 對應的應用實例們的註冊信息。
    • 第 13 行 :方法參數 forceFullRegistryFetch 強制全量獲取註冊信息。
    • 第 14 至 15 行 :本地緩存爲空。
    • 第 25 至 26 行 :調用 #getAndStoreFullRegistry() 方法,全量獲取註冊信息,並設置到本地緩存。下文詳細解析。
  • 第 27 至 30 行 :增量獲取註冊信息,並刷新本地緩存,在 《Eureka 源碼解析 —— 應用實例註冊發現 (七)之增量獲取》 詳細解析。

  • 第 31 至 32 行 :計算應用集合 hashcode 。該變量用於校驗增量獲取的註冊信息和 Eureka-Server 全量的註冊信息是否一致( 完整 ),在 《Eureka 源碼解析 —— 應用實例註冊發現 (七)之增量獲取》 詳細解析。

  • 第 33 至 34 行 :打印調試日誌,輸出本地緩存的註冊的應用實例數量。實現代碼以下:

    private void logTotalInstances() {
       if (logger.isDebugEnabled()) {
           int totInstances = 0;
           for (Application application : getApplications().getRegisteredApplications()) {
               totInstances += application.getInstancesAsIsFromEureka().size();
           }
           logger.debug("The total number of all instances in the client now is {}", totInstances);
       }
    }
    複製代碼
  • 第 44 至 45 行 :觸發 CacheRefreshedEvent 事件,事件監聽器執行。目前 Eureka 未提供默認的該事件監聽器。

    • #onCacheRefreshed() 方法,實現代碼以下:

      /** * Eureka 事件監聽器 */
      private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();
      
      protected void onCacheRefreshed() {
          fireEvent(new CacheRefreshedEvent());
      }
      
      protected void fireEvent(final EurekaEvent event) {
          for (EurekaEventListener listener : eventListeners) {
              listener.onEvent(event);
          }
      }
      複製代碼
      • x
    • 筆者的YY :你能夠實現自定義的事件監聽器監聽 CacheRefreshedEvent 事件,以達到持久化最新的註冊信息到存儲器( 例如,本地文件 ),經過這樣的方式,配合實現 BackupRegistry 接口讀取存儲器。BackupRegistry 接口調用以下:

      // 【3.2.12】從 Eureka-Server 拉取註冊信息
      if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
          fetchRegistryFromBackup();
      }
      複製代碼
  • 第47 至 48 行 :更新本地緩存的當前應用實例在 Eureka-Server 的狀態。

    1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; 
      2: 
      3: private synchronized void updateInstanceRemoteStatus() {
      4:     // Determine this instance's status for this app and set to UNKNOWN if not found
      5:     InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
      6:     if (instanceInfo.getAppName() != null) {
      7:         Application app = getApplication(instanceInfo.getAppName());
      8:         if (app != null) {
      9:             InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
     10:             if (remoteInstanceInfo != null) {
     11:                 currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
     12:             }
     13:         }
     14:     }
     15:     if (currentRemoteInstanceStatus == null) {
     16:         currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
     17:     }
     18: 
     19:     // Notify if status changed
     20:     if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
     21:         onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
     22:         lastRemoteInstanceStatus = currentRemoteInstanceStatus;
     23:     }
     24: }
    複製代碼
    • 第 4 至 14 行 :從註冊信息中獲取當前應用在 Eureka-Server 的狀態。

    • 第 19 至 23 行 :對比本地緩存最新的的當前應用實例在 Eureka-Server 的狀態,若不一樣,更新本地緩存( 注意,只更新該緩存變量,不更新本地當前應用實例的狀態( instanceInfo.status ) ),觸發 StatusChangeEvent 事件,事件監聽器執行。目前 Eureka 未提供默認的該事件監聽器。#onRemoteStatusChanged(...) 實現代碼以下:

      protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
         fireEvent(new StatusChangeEvent(oldStatus, newStatus));
      }
      複製代碼

2.4.1 全量獲取註冊信息,並設置到本地緩存

調用 #getAndStoreFullRegistry() 方法,全量獲取註冊信息,並設置到本地緩存。下實現代碼以下:

1: private void getAndStoreFullRegistry() throws Throwable {
  2:     long currentUpdateGeneration = fetchRegistryGeneration.get();
  3: 
  4:     logger.info("Getting all instance registry info from the eureka server");
  5: 
  6:     // 全量獲取註冊信息
  7:     Applications apps = null;
  8:     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
  9:             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
 10:             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
 11:     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
 12:         apps = httpResponse.getEntity();
 13:     }
 14:     logger.info("The response status is {}", httpResponse.getStatusCode());
 15: 
 16:     // 設置到本地緩存
 17:     if (apps == null) {
 18:         logger.error("The application is null for some reason. Not storing this information");
 19:     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
 20:         localRegionApps.set(this.filterAndShuffle(apps));
 21:         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
 22:     } else {
 23:         logger.warn("Not updating applications as another thread is updating it already");
 24:     }
 25: }
複製代碼
  • 第 6 至 14 行 :全量獲取註冊信息,實現代碼以下:

    // AbstractJerseyEurekaHttpClient.java
    @Override
    public EurekaHttpResponse<Applications> getApplications(String... regions) {
       return getApplicationsInternal("apps/", regions);
    }
    
    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
       ClientResponse response = null;
       String regionsParamValue = null;
       try {
           WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
           if (regions != null && regions.length > 0) {
               regionsParamValue = StringUtil.join(regions);
               webResource = webResource.queryParam("regions", regionsParamValue);
           }
           Builder requestBuilder = webResource.getRequestBuilder();
           addExtraHeaders(requestBuilder);
           response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
    
           Applications applications = null;
           if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
               applications = response.getEntity(Applications.class);
           }
           return anEurekaHttpResponse(response.getStatus(), Applications.class)
                   .headers(headersOf(response))
                   .entity(applications)
                   .build();
       } finally {
           if (logger.isDebugEnabled()) {
               logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                       serviceUrl, urlPath,
                       regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                       response == null ? "N/A" : response.getStatus()
               );
           }
           if (response != null) {
               response.close();
           }
       }
    }
    複製代碼
    • 調用 AbstractJerseyEurekaHttpClient#getApplications(...) 方法,GET 請求 Eureka-Server 的 apps/ 接口,參數爲 regions ,返回格式爲 JSON ,實現全量獲取註冊信息
  • 第 16 至 24 行 :設置到本地註冊信息緩存

    • 第 19 行 :TODO[0025] :併發更新的狀況???
    • 第 20 行 :調用 #filterAndShuffle(...) 方法,根據配置 eureka.shouldFilterOnlyUpInstances = true ( 默認值 :true ) 過濾只保留狀態爲開啓( UP )的應用實例,並隨機打亂應用實例順序。打亂後,實現調用應用服務的隨機性。代碼比較易懂,點擊連接查看方法實現。

3. Eureka-Server 接收全量獲取

3.1 接收全量獲取請求

com.netflix.eureka.resources.ApplicationsResource,處理全部應用的請求操做的 Resource ( Controller )。

接收全量獲取請求,映射 ApplicationsResource#getContainers() 方法,實現代碼以下:

1: @GET
  2: public Response getContainers(@PathParam("version") String version, 3: @HeaderParam(HEADER_ACCEPT) String acceptHeader, 4: @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, 5: @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, 6: @Context UriInfo uriInfo, 7: @Nullable @QueryParam("regions") String regionsStr) {
  8:     // TODO[0009]:RemoteRegionRegistry
  9:     boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
 10:     String[] regions = null;
 11:     if (!isRemoteRegionRequested) {
 12:         EurekaMonitors.GET_ALL.increment();
 13:     } else {
 14:         regions = regionsStr.toLowerCase().split(",");
 15:         Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
 16:         EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
 17:     }
 18: 
 19:     // 判斷是否能夠訪問
 20:     // Check if the server allows the access to the registry. The server can
 21:     // restrict access if it is not
 22:     // ready to serve traffic depending on various reasons.
 23:     if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
 24:         return Response.status(Status.FORBIDDEN).build();
 25:     }
 26: 
 27:     // API 版本
 28:     CurrentRequestVersion.set(Version.toEnum(version));
 29: 
 30:     // 返回數據格式
 31:     KeyType keyType = Key.KeyType.JSON;
 32:     String returnMediaType = MediaType.APPLICATION_JSON;
 33:     if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
 34:         keyType = Key.KeyType.XML;
 35:         returnMediaType = MediaType.APPLICATION_XML;
 36:     }
 37: 
 38:     // 響應緩存鍵( KEY )
 39:     Key cacheKey = new Key(Key.EntityType.Application,
 40:             ResponseCacheImpl.ALL_APPS,
 41:             keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
 42:     );
 43: 
 44:     //
 45:     Response response;
 46:     if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
 47:         response = Response.ok(responseCache.getGZIP(cacheKey))
 48:                 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
 49:                 .header(HEADER_CONTENT_TYPE, returnMediaType)
 50:                 .build();
 51:     } else {
 52:         response = Response.ok(responseCache.get(cacheKey))
 53:                 .build();
 54:     }
 55:     return response;
 56: }
複製代碼
  • 第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry

  • 第 19 至 25 行 :Eureka-Server 啓動完成,可是未處於就緒( Ready )狀態,不接受請求全量應用註冊信息的請求,例如,Eureka-Server 啓動時,未能從其餘 Eureka-Server 集羣的節點獲取到應用註冊信息。

  • 第 27 至 28 行 :設置 API 版本號。默認最新 API 版本爲 V2。實現代碼以下:

    public enum Version {
        V1, V2;
    
        public static Version toEnum(String v) {
            for (Version version : Version.values()) {
                if (version.name().equalsIgnoreCase(v)) {
                    return version;
                }
            }
            //Defaults to v2
            return V2;
        }
    }
    複製代碼
  • 第 30 至 36 行 :設置返回數據格式,默認 JSON 。

  • 第 38 至 42 行 :建立響應緩存( ResponseCache ) 的鍵( KEY ),在 「3.2.1 緩存鍵」詳細解析。

  • 第 44 至 55 行 :從響應緩存讀取全量註冊信息,在 「3.3 緩存讀取」詳細解析。

3.2 響應緩存 ResponseCache

com.netflix.eureka.registry.ResponseCache,響應緩存接口,接口代碼以下:

public interface ResponseCache {

    String get(Key key);
    
    byte[] getGZIP(Key key);
    
    void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);

    AtomicLong getVersionDelta();
    
    AtomicLong getVersionDeltaWithRegions();

}
複製代碼
  • 其中,#getVersionDelta()#getVersionDeltaWithRegions() 已經廢棄。這裏保留的緣由主要是考慮兼容性。判斷依據來自以下代碼:

    // Applications.java
    @Deprecated
    public void setVersion(Long version) {
       this.versionDelta = version;
    }
    
    // AbstractInstanceRegistry.java
    public Applications getApplicationDeltas() {
        // ... 省略其它無關代碼
        apps.setVersion(responseCache.getVersionDelta().get()); // 惟一調用到 ResponseCache#getVersionDelta() 方法的地方
        // ... 省略其它無關代碼
    }
    複製代碼
  • #get() :得到緩存。

  • #getGZIP() :得到緩存,並 GZIP 。

  • #invalidate() :過時緩存。

3.2.1 緩存鍵

com.netflix.eureka.registry.Key,緩存鍵。實現代碼以下:

public class Key {

    public enum KeyType {
        JSON, XML
    }

    /** * An enum to define the entity that is stored in this cache for this key. */
    public enum EntityType {
        Application, VIP, SVIP
    }

    /** * 實體名 */
    private final String entityName;
    /** * TODO[0009]:RemoteRegionRegistry */
    private final String[] regions;
    /** * 請求參數類型 */
    private final KeyType requestType;
    /** * 請求 API 版本號 */
    private final Version requestVersion;
    /** * hashKey */
    private final String hashKey;
    /** * 實體類型 * * {@link EntityType} */
    private final EntityType entityType;
    /** * {@link EurekaAccept} */
    private final EurekaAccept eurekaAccept;
    
    public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
        this.regions = regions;
        this.entityType = entityType;
        this.entityName = entityName;
        this.requestType = type;
        this.requestVersion = v;
        this.eurekaAccept = eurekaAccept;
        hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
    }
    
    public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
        this.regions = regions;
        this.entityType = entityType;
        this.entityName = entityName;
        this.requestType = type;
        this.requestVersion = v;
        this.eurekaAccept = eurekaAccept;
        hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
    }
    
    @Override
    public int hashCode() {
        String hashKey = getHashKey();
        return hashKey.hashCode();
    }

    @Override
    public boolean equals(Object other) {
        if (other instanceof Key) {
            return getHashKey().equals(((Key) other).getHashKey());
        } else {
            return false;
        }
    }
    
}
複製代碼

3.2.2 響應緩存實現類

com.netflix.eureka.registry.ResponseCacheImpl,響應緩存實現類。

在 ResponseCacheImpl 裏,將緩存拆分紅兩層 :

  • 只讀緩存( readOnlyCacheMap )
  • 固定過時 + 固定大小讀寫緩存( readWriteCacheMap )

默認配置下,緩存讀取策略以下:

緩存過時策略以下:

  • 應用實例註冊、下線、過時時,只只只過時 readWriteCacheMap
  • readWriteCacheMap 寫入一段時間( 可配置 )後自動過時。
  • 定時任務對比 readWriteCacheMapreadOnlyCacheMap 的緩存值,若不一致,之前者爲主。經過這樣的方式,實現了 readOnlyCacheMap 的定時過時。

注意:應用實例註冊、下線、過時時,不會很快刷新到 readWriteCacheMap 緩存裏。默認配置下,最大延遲在 30 秒。

爲何可使用緩存?

CAP 的選擇上,Eureka 選擇了 AP ,不一樣於 Zookeeper 選擇了 CP 。

推薦閱讀:

3.3 緩存讀取

調用 ResponseCacheImpl#get(...) 方法( #getGzip(...) 相似 ),讀取緩存,實現代碼以下:

1: private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
  2: 
  3: private final LoadingCache<Key, Value> readWriteCacheMap;
  4: 
  5: public String get(final Key key) {
  6:     return get(key, shouldUseReadOnlyResponseCache);
  7: }
  8: 
  9: String get(final Key key, boolean useReadOnlyCache) {
 10:     Value payload = getValue(key, useReadOnlyCache);
 11:     if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
 12:         return null;
 13:     } else {
 14:         return payload.getPayload();
 15:     }
 16: }
 17: 
 18: Value getValue(final Key key, boolean useReadOnlyCache) {
 19:     Value payload = null;
 20:     try {
 21:         if (useReadOnlyCache) {
 22:             final Value currentPayload = readOnlyCacheMap.get(key);
 23:             if (currentPayload != null) {
 24:                 payload = currentPayload;
 25:             } else {
 26:                 payload = readWriteCacheMap.get(key);
 27:                 readOnlyCacheMap.put(key, payload);
 28:             }
 29:         } else {
 30:             payload = readWriteCacheMap.get(key);
 31:         }
 32:     } catch (Throwable t) {
 33:         logger.error("Cannot get value for key :" + key, t);
 34:     }
 35:     return payload;
 36: }
複製代碼
  • 第 5 至 7 行 :調用 #get(key, useReadOnlyCache) 方法,讀取緩存。其中 shouldUseReadOnlyResponseCache 經過配置 eureka.shouldUseReadOnlyResponseCache = true (默認值 :true ) 開啓只讀緩存。若是你對數據的一致性有相對高的要求,能夠關閉這個開關,固然由於少了 readOnlyCacheMap ,性能會有必定的降低。

  • 第 9 至 16 行 :調用 getValue(key, useReadOnlyCache) 方法,讀取緩存。從 readOnlyCacheMapreadWriteCacheMap 變量能夠看到緩存值的類爲 com.netflix.eureka.registry.ResponseCacheImpl.Value ,實現代碼以下:

    public class Value {
    
       /** * 原始值 */
       private final String payload;
       /** * GZIP 壓縮後的值 */
       private byte[] gzipped;
    
       public Value(String payload) {
           this.payload = payload;
           if (!EMPTY_PAYLOAD.equals(payload)) {
               // ... 省略 GZIP 壓縮代碼
               gzipped = bos.toByteArray();
           } else {
               gzipped = null;
           }
       }
    
       public String getPayload() {
           return payload;
       }
    
       public byte[] getGzipped() {
           return gzipped;
       }
    
    }
    複製代碼
  • 第 21 至 31 行 :讀取緩存。

    • 第 21 至 28 行 :先讀取 readOnlyCacheMap 。讀取不到,讀取 readWriteCacheMap ,並設置到 readOnlyCacheMap

    • 第 29 至 31 行 :讀取 readWriteCacheMap

    • readWriteCacheMap 實現代碼以下:

      this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(1000)
                    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            // TODO[0009]:RemoteRegionRegistry
                            Key removedKey = notification.getKey();
                            if (removedKey.hasRegions()) {
                                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                            }
                        }
                    })
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            // // TODO[0009]:RemoteRegionRegistry
                            if (key.hasRegions()) {
                                Key cloneWithNoRegions = key.cloneWithoutRegions();
                                regionSpecificKeys.put(cloneWithNoRegions, key);
                            }
                            Value value = generatePayload(key);
                            return value;
                        }
                    });
      複製代碼
      • readWriteCacheMap 最大緩存數量爲 1000 。
      • 調用 #generatePayload(key) 方法,生成緩存值。
  • #generatePayload(key) 方法,實現代碼以下:

    1: private Value generatePayload(Key key) {
      2:     Stopwatch tracer = null;
      3:     try {
      4:         String payload;
      5:         switch (key.getEntityType()) {
      6:             case Application:
      7:                 boolean isRemoteRegionRequested = key.hasRegions();
      8: 
      9:                 if (ALL_APPS.equals(key.getName())) {
     10:                     if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
     11:                         tracer = serializeAllAppsWithRemoteRegionTimer.start();
     12:                         payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
     13:                     } else {
     14:                         tracer = serializeAllAppsTimer.start();
     15:                         payload = getPayLoad(key, registry.getApplications());
     16:                     }
     17:                 } else if (ALL_APPS_DELTA.equals(key.getName())) {
     18:                     // ... 省略增量獲取相關的代碼
     19:                  } else {
     20:                     tracer = serializeOneApptimer.start();
     21:                     payload = getPayLoad(key, registry.getApplication(key.getName()));
     22:                 }
     23:                 break;
     24:             // ... 省略部分代碼 
     25:         }
     26:         return new Value(payload);
     27:     } finally {
     28:         if (tracer != null) {
     29:             tracer.stop();
     30:         }
     31:     }
     32: }
    複製代碼
    • 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
    • 第 13 至 16 行 :調用 AbstractInstanceRegistry#getApplications() 方法,得到註冊的應用集合。後調用 #getPayLoad() 方法,將註冊的應用集合轉換成緩存值。🙂 這兩個方法代碼較多,下面詳細解析。
    • 第 17 至 18 行 :獲取增量註冊信息的緩存值,在 《Eureka 源碼解析 —— 應用實例註冊發現 (七)之增量獲取》 詳細解析。

3.3.1 得到註冊的應用集合

調用 AbstractInstanceRegistry#getApplications() 方法,得到註冊的應用集合,實現代碼以下:

1: // AbstractInstanceRegistry.java
  2: 
  3: private static final String[] EMPTY_STR_ARRAY = new String[0];
  4: 
  5: public Applications getApplications() {
  6:    boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
  7:    if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry
  8:        return getApplicationsFromLocalRegionOnly();
  9:    } else {
 10:        return getApplicationsFromAllRemoteRegions();  // Behavior of falling back to remote region can be disabled.
 11:    }
 12: }
 13: 
 14: public Applications getApplicationsFromLocalRegionOnly() {
 15:    return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
 16: }
複製代碼
  • 第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry

  • 第 9 至 16 行 :調用 #getApplicationsFromMultipleRegions(...) 方法,得到註冊的應用集合,實現代碼以下:

    1: public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
      2:     // TODO[0009]:RemoteRegionRegistry
      3:     boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
      4:     logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
      5:             includeRemoteRegion, Arrays.toString(remoteRegions));
      6:     if (includeRemoteRegion) {
      7:         GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
      8:     } else {
      9:         GET_ALL_CACHE_MISS.increment();
     10:     }
     11:     // 得到得到註冊的應用集合
     12:     Applications apps = new Applications();
     13:     apps.setVersion(1L);
     14:     for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
     15:         Application app = null;
     16: 
     17:         if (entry.getValue() != null) {
     18:             for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
     19:                 Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
     20:                 if (app == null) {
     21:                     app = new Application(lease.getHolder().getAppName());
     22:                 }
     23:                 app.addInstance(decorateInstanceInfo(lease));
     24:             }
     25:         }
     26:         if (app != null) {
     27:             apps.addApplication(app);
     28:         }
     29:     }
     30:     // TODO[0009]:RemoteRegionRegistry
     31:     if (includeRemoteRegion) {
     32:         for (String remoteRegion : remoteRegions) {
     33:             RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
     34:             if (null != remoteRegistry) {
     35:                 Applications remoteApps = remoteRegistry.getApplications();
     36:                 for (Application application : remoteApps.getRegisteredApplications()) {
     37:                     if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
     38:                         logger.info("Application {} fetched from the remote region {}",
     39:                                 application.getName(), remoteRegion);
     40: 
     41:                         Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
     42:                         if (appInstanceTillNow == null) {
     43:                             appInstanceTillNow = new Application(application.getName());
     44:                             apps.addApplication(appInstanceTillNow);
     45:                         }
     46:                         for (InstanceInfo instanceInfo : application.getInstances()) {
     47:                             appInstanceTillNow.addInstance(instanceInfo);
     48:                         }
     49:                     } else {
     50:                         logger.debug("Application {} not fetched from the remote region {} as there exists a "
     51:                                         + "whitelist and this app is not in the whitelist.",
     52:                                 application.getName(), remoteRegion);
     53:                     }
     54:                 }
     55:             } else {
     56:                 logger.warn("No remote registry available for the remote region {}", remoteRegion);
     57:             }
     58:         }
     59:     }
     60:     // 設置 應用集合 hashcode
     61:     apps.setAppsHashCode(apps.getReconcileHashCode());
     62:     return apps;
     63: }
    複製代碼
    • 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry
    • 第 11 至 29 行 :得到得到註冊的應用集合。
    • 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry
    • 第 61 行 :計算應用集合 hashcode 。該變量用於校驗增量獲取的註冊信息和 Eureka-Server 全量的註冊信息是否一致( 完整 ),在 《Eureka 源碼解析 —— 應用實例註冊發現 (七)之增量獲取》 詳細解析。

3.3.2 轉換成緩存值

調用 #getPayLoad() 方法,將註冊的應用集合轉換成緩存值,實現代碼以下:

/** * Generate pay load with both JSON and XML formats for all applications. */
private String getPayLoad(Key key, Applications apps) {
   // 得到編碼器
   EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
   String result;
   try {
       // 編碼
       result = encoderWrapper.encode(apps);
   } catch (Exception e) {
       logger.error("Failed to encode the payload for all apps", e);
       return "";
   }
   if(logger.isDebugEnabled()) {
       logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
   }
   return result;
}
複製代碼

3.4 主動過時讀寫緩存

應用實例註冊、下線、過時時,調用 ResponseCacheImpl#invalidate() 方法,主動過時讀寫緩存( readWriteCacheMap ),實現代碼以下:

public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
   for (Key.KeyType type : Key.KeyType.values()) {
       for (Version v : Version.values()) {
           invalidate(
                   new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                   new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                   new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                   new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                   new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                   new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
           );
           if (null != vipAddress) {
               invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
           }
           if (null != secureVipAddress) {
               invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
           }
       }
   }
}
複製代碼
  • 調用 #invalidate(keys) 方法,逐個過時每一個緩存鍵值,實現代碼以下:

    public void invalidate(Key... keys) {
       for (Key key : keys) {
           logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
           // 過時讀寫緩存
           readWriteCacheMap.invalidate(key);
           // TODO[0009]:RemoteRegionRegistry
           Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
           if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
               for (Key keysWithRegion : keysWithRegions) {
                   logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                           key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                   readWriteCacheMap.invalidate(keysWithRegion);
               }
           }
       }
    }
    複製代碼

3.5 被動過時讀寫緩存

讀寫緩存( readWriteCacheMap ) 寫入後,一段時間自動過時,實現代碼以下:

expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
複製代碼
  • 配置 eureka.responseCacheAutoExpirationInSeconds ,設置寫入過時時長。默認值 :180 秒。

3.6 定時刷新只讀緩存

定時任務對比 readWriteCacheMapreadOnlyCacheMap 的緩存值,若不一致,之前者爲主。經過這樣的方式,實現了 readOnlyCacheMap 的定時過時。實現代碼以下:

1: ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
  2:     // ... 省略無關代碼 
  3: 
  4:     long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
  5:     // ... 省略無關代碼
  6: 
  7:     if (shouldUseReadOnlyResponseCache) {
  8:         timer.schedule(getCacheUpdateTask(),
  9:                 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
 10:                         + responseCacheUpdateIntervalMs),
 11:                 responseCacheUpdateIntervalMs);
 12:     }
 13: 
 14:     // ... 省略無關代碼
 15: }
 16: 
 17: private TimerTask getCacheUpdateTask() {
 18:     return new TimerTask() {
 19:         @Override
 20:         public void run() {
 21:             logger.debug("Updating the client cache from response cache");
 22:             for (Key key : readOnlyCacheMap.keySet()) { // 循環 readOnlyCacheMap 的緩存鍵
 23:                 if (logger.isDebugEnabled()) {
 24:                     Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
 25:                     logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
 26:                 }
 27:                 try {
 28:                     CurrentRequestVersion.set(key.getVersion());
 29:                     Value cacheValue = readWriteCacheMap.get(key);
 30:                     Value currentCacheValue = readOnlyCacheMap.get(key);
 31:                     if (cacheValue != currentCacheValue) { // 不一致時,進行替換
 32:                         readOnlyCacheMap.put(key, cacheValue);
 33:                     }
 34:                 } catch (Throwable th) {
 35:                     logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
 36:                 }
 37:             }
 38:         }
 39:     };
 40: }
複製代碼
  • 第 7 至 12 行 :初始化定時任務。配置 eureka.responseCacheUpdateIntervalMs,設置任務執行頻率,默認值 :30 * 1000 毫秒。
  • 第 17 至 39 行 :建立定時任務。
    • 第 22 行 :循環 readOnlyCacheMap 的緩存鍵。爲何不循環 readWriteCacheMapreadOnlyCacheMap 的緩存過時依賴 readWriteCacheMap,所以緩存鍵會更多。
    • 第 28 行 至 33 行 :對比 readWriteCacheMapreadOnlyCacheMap 的緩存值,若不一致,之前者爲主。經過這樣的方式,實現了 readOnlyCacheMap 的定時過時。

666. 彩蛋

知識星球

比預期,比想一想,長老多老多的一篇文章。細思極恐。

估計下一篇增量獲取會簡潔不少。

胖友,分享個人公衆號( 芋道源碼 ) 給你的胖友可好?

相關文章
相關標籤/搜索