摘要: 原創出處 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!html
本文主要基於 Eureka 1.8.X 版本java
🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:git
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
本文主要分享 Eureka-Client 向 Eureka-Server 獲取全量註冊信息的過程。github
FROM 《深度剖析服務發現組件Netflix Eureka》
web
Eureka-Client 獲取註冊信息,分紅全量獲取和增量獲取。默認配置下,Eureka-Client 啓動時,首先執行一次全量獲取進行本地緩存註冊信息,然後每 30 秒增量獲取刷新本地緩存( 非「正常」狀況下會是全量獲取 )。spring
本文重點在於全量獲取。segmentfault
推薦 Spring Cloud 書籍:緩存
推薦 Spring Cloud 視頻:微信
本小節調用關係以下:架構
Eureka-Client 啓動時,首先執行一次全量獲取進行本地緩存註冊信息,首先代碼以下:
// DiscoveryClient.java
/** * Applications 在本地的緩存 */
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ... 省略無關代碼
// 【3.2.5】初始化應用集合在本地的緩存
localRegionApps.set(new Applications());
// ... 省略無關代碼
// 【3.2.12】從 Eureka-Server 拉取註冊信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// ... 省略無關代碼
}
複製代碼
com.netflix.discovery.shared.Applications
,註冊的應用集合。較爲容易理解,點擊 連接 連接查看帶中文註釋的類,這裏就不囉嗦了。Applications 與 InstanceInfo 類關係以下:
配置 eureka.shouldFetchRegistry = true
,開啓從 Eureka-Server 獲取註冊信息。默認值:true
。
調用 #fetchRegistry(false)
方法,從 Eureka-Server 全量獲取註冊信息,在 「2.4 發起獲取註冊信息」 詳細解析。
Eureka-Client 在初始化過程當中,建立獲取註冊信息線程,固定間隔向 Eureka-Server 發起獲取註冊信息( fetch ),刷新本地註冊信息緩存。實現代碼以下:
// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ... 省略無關代碼
// 【3.2.9】初始化線程池
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// ... 省略無關代碼
// 【3.2.14】初始化定時任務
initScheduledTasks();
// ... 省略無關代碼
}
private void initScheduledTasks() {
// 向 Eureka-Server 心跳(續租)執行器
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// ... 省略無關代碼
}
複製代碼
初始化定時任務代碼,和續租的定時任務代碼相似,在 《Eureka 源碼解析 —— 應用實例註冊發現(二)之續租 》 有詳細解析,這裏不重複分享。
com.netflix.discovery.DiscoveryClient.CacheRefreshThread
,註冊信息緩存刷新任務,實現代碼以下:
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
複製代碼
#refreshRegistry(false)
方法,刷新註冊信息緩存,在 「2.3 刷新註冊信息緩存」 詳細解析。調用 #refreshRegistry(false)
方法,刷新註冊信息緩存,實現代碼以下:
// DiscoveryClient.java
1: void refreshRegistry() {
2: try {
3: // TODO 芋艿:TODO[0009]:RemoteRegionRegistry
4: boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
5:
6: boolean remoteRegionsModified = false;
7: // This makes sure that a dynamic change to remote regions to fetch is honored.
8: String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
9: if (null != latestRemoteRegions) {
10: String currentRemoteRegions = remoteRegionsToFetch.get();
11: if (!latestRemoteRegions.equals(currentRemoteRegions)) {
12: // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
13: synchronized (instanceRegionChecker.getAzToRegionMapper()) {
14: if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
15: String[] remoteRegions = latestRemoteRegions.split(",");
16: remoteRegionsRef.set(remoteRegions);
17: instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
18: remoteRegionsModified = true;
19: } else {
20: logger.info("Remote regions to fetch modified concurrently," +
21: " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
22: }
23: }
24: } else {
25: // Just refresh mapping to reflect any DNS/Property change
26: instanceRegionChecker.getAzToRegionMapper().refreshMapping();
27: }
28: }
29:
30: boolean success = fetchRegistry(remoteRegionsModified);
31: if (success) {
32: // 設置 註冊信息的應用實例數
33: registrySize = localRegionApps.get().size();
34: // 設置 最後獲取註冊信息時間
35: lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
36: }
37:
38: // 打印日誌
39: if (logger.isDebugEnabled()) {
40: StringBuilder allAppsHashCodes = new StringBuilder();
41: allAppsHashCodes.append("Local region apps hashcode: ");
42: allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
43: allAppsHashCodes.append(", is fetching remote regions? ");
44: allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
45: for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
46: allAppsHashCodes.append(", Remote region: ");
47: allAppsHashCodes.append(entry.getKey());
48: allAppsHashCodes.append(" , apps hashcode: ");
49: allAppsHashCodes.append(entry.getValue().getAppsHashCode());
50: }
51: logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
52: allAppsHashCodes.toString());
53: }
54: } catch (Throwable e) {
55: logger.error("Cannot fetch registry from server", e);
56: }
57: }
複製代碼
第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry
第 30 行 :調用 #fetchRegistry(false)
方法,從 Eureka-Server 獲取註冊信息,在 「2.4 發起獲取註冊信息」 詳細解析。
第 31 至 36 行 :獲取註冊信息成功,設置註冊信息的應用實例數,最後獲取註冊信息時間。變量代碼以下:
/** * 註冊信息的應用實例數 */
private volatile int registrySize = 0;
/** * 最後成功從 Eureka-Server 拉取註冊信息時間戳 */
private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
複製代碼
第 38 至 53 行 :打印調試日誌。
第 54 至 56 行 :打印異常日誌。
調用 #fetchRegistry(false)
方法,從 Eureka-Server 獲取註冊信息( 根據條件判斷,多是全量,也多是增量 ),實現代碼以下:
1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {
2: Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
3:
4: try {
5: // 獲取 本地緩存的註冊的應用實例集合
6: // If the delta is disabled or if it is the first time, get all
7: // applications
8: Applications applications = getApplications();
9:
10: // 全量獲取
11: if (clientConfig.shouldDisableDelta() // 禁用增量獲取
12: || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
13: || forceFullRegistryFetch
14: || (applications == null) // 空
15: || (applications.getRegisteredApplications().size() == 0) // 空
16: || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
17: {
18: logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
19: logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
20: logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
21: logger.info("Application is null : {}", (applications == null));
22: logger.info("Registered Applications size is zero : {}",
23: (applications.getRegisteredApplications().size() == 0));
24: logger.info("Application version is -1: {}", (applications.getVersion() == -1));
25: // 執行 全量獲取
26: getAndStoreFullRegistry();
27: } else {
28: // 執行 增量獲取
29: getAndUpdateDelta(applications);
30: }
31: // 設置 應用集合 hashcode
32: applications.setAppsHashCode(applications.getReconcileHashCode());
33: // 打印 本地緩存的註冊的應用實例數量
34: logTotalInstances();
35: } catch (Throwable e) {
36: logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
37: return false;
38: } finally {
39: if (tracer != null) {
40: tracer.stop();
41: }
42: }
43:
44: // Notify about cache refresh before updating the instance remote status
45: onCacheRefreshed();
46:
47: // Update remote status based on refreshed data held in the cache
48: updateInstanceRemoteStatus();
49:
50: // registry was fetched successfully, so return true
51: return true;
52: }
複製代碼
第 5 至 8 行 :獲取本地緩存的註冊的應用實例集合,實現代碼以下:
public Applications getApplications() {
return localRegionApps.get();
}
複製代碼
第 10 至 26 行 :全量獲取註冊信息。
eureka.disableDelta = true
,禁用增量獲取註冊信息。默認值:false
。vipAddress
對應的應用實例們的註冊信息。forceFullRegistryFetch
強制全量獲取註冊信息。#getAndStoreFullRegistry()
方法,全量獲取註冊信息,並設置到本地緩存。下文詳細解析。第 27 至 30 行 :增量獲取註冊信息,並刷新本地緩存,在 《Eureka 源碼解析 —— 應用實例註冊發現 (七)之增量獲取》 詳細解析。
第 31 至 32 行 :計算應用集合 hashcode
。該變量用於校驗增量獲取的註冊信息和 Eureka-Server 全量的註冊信息是否一致( 完整 ),在 《Eureka 源碼解析 —— 應用實例註冊發現 (七)之增量獲取》 詳細解析。
第 33 至 34 行 :打印調試日誌,輸出本地緩存的註冊的應用實例數量。實現代碼以下:
private void logTotalInstances() {
if (logger.isDebugEnabled()) {
int totInstances = 0;
for (Application application : getApplications().getRegisteredApplications()) {
totInstances += application.getInstancesAsIsFromEureka().size();
}
logger.debug("The total number of all instances in the client now is {}", totInstances);
}
}
複製代碼
第 44 至 45 行 :觸發 CacheRefreshedEvent 事件,事件監聽器執行。目前 Eureka 未提供默認的該事件監聽器。
#onCacheRefreshed()
方法,實現代碼以下:
/** * Eureka 事件監聽器 */
private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();
protected void onCacheRefreshed() {
fireEvent(new CacheRefreshedEvent());
}
protected void fireEvent(final EurekaEvent event) {
for (EurekaEventListener listener : eventListeners) {
listener.onEvent(event);
}
}
複製代碼
筆者的YY :你能夠實現自定義的事件監聽器監聽 CacheRefreshedEvent 事件,以達到持久化最新的註冊信息到存儲器( 例如,本地文件 ),經過這樣的方式,配合實現 BackupRegistry 接口讀取存儲器。BackupRegistry 接口調用以下:
// 【3.2.12】從 Eureka-Server 拉取註冊信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
複製代碼
第47 至 48 行 :更新本地緩存的當前應用實例在 Eureka-Server 的狀態。
1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
2:
3: private synchronized void updateInstanceRemoteStatus() {
4: // Determine this instance's status for this app and set to UNKNOWN if not found
5: InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
6: if (instanceInfo.getAppName() != null) {
7: Application app = getApplication(instanceInfo.getAppName());
8: if (app != null) {
9: InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
10: if (remoteInstanceInfo != null) {
11: currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
12: }
13: }
14: }
15: if (currentRemoteInstanceStatus == null) {
16: currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
17: }
18:
19: // Notify if status changed
20: if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
21: onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
22: lastRemoteInstanceStatus = currentRemoteInstanceStatus;
23: }
24: }
複製代碼
第 4 至 14 行 :從註冊信息中獲取當前應用在 Eureka-Server 的狀態。
第 19 至 23 行 :對比本地緩存和最新的的當前應用實例在 Eureka-Server 的狀態,若不一樣,更新本地緩存( 注意,只更新該緩存變量,不更新本地當前應用實例的狀態( instanceInfo.status
) ),觸發 StatusChangeEvent 事件,事件監聽器執行。目前 Eureka 未提供默認的該事件監聽器。#onRemoteStatusChanged(...)
實現代碼以下:
protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
fireEvent(new StatusChangeEvent(oldStatus, newStatus));
}
複製代碼
調用 #getAndStoreFullRegistry()
方法,全量獲取註冊信息,並設置到本地緩存。下實現代碼以下:
1: private void getAndStoreFullRegistry() throws Throwable {
2: long currentUpdateGeneration = fetchRegistryGeneration.get();
3:
4: logger.info("Getting all instance registry info from the eureka server");
5:
6: // 全量獲取註冊信息
7: Applications apps = null;
8: EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
9: ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
10: : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
11: if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
12: apps = httpResponse.getEntity();
13: }
14: logger.info("The response status is {}", httpResponse.getStatusCode());
15:
16: // 設置到本地緩存
17: if (apps == null) {
18: logger.error("The application is null for some reason. Not storing this information");
19: } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
20: localRegionApps.set(this.filterAndShuffle(apps));
21: logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
22: } else {
23: logger.warn("Not updating applications as another thread is updating it already");
24: }
25: }
複製代碼
第 6 至 14 行 :全量獲取註冊信息,實現代碼以下:
// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "" : "regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
複製代碼
AbstractJerseyEurekaHttpClient#getApplications(...)
方法,GET 請求 Eureka-Server 的 apps/
接口,參數爲 regions
,返回格式爲 JSON ,實現全量獲取註冊信息。第 16 至 24 行 :設置到本地註冊信息緩存。
#filterAndShuffle(...)
方法,根據配置 eureka.shouldFilterOnlyUpInstances = true
( 默認值 :true
) 過濾只保留狀態爲開啓( UP )的應用實例,並隨機打亂應用實例順序。打亂後,實現調用應用服務的隨機性。代碼比較易懂,點擊連接查看方法實現。com.netflix.eureka.resources.ApplicationsResource
,處理全部應用的請求操做的 Resource ( Controller )。
接收全量獲取請求,映射 ApplicationsResource#getContainers()
方法,實現代碼以下:
1: @GET
2: public Response getContainers(@PathParam("version") String version, 3: @HeaderParam(HEADER_ACCEPT) String acceptHeader, 4: @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, 5: @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, 6: @Context UriInfo uriInfo, 7: @Nullable @QueryParam("regions") String regionsStr) {
8: // TODO[0009]:RemoteRegionRegistry
9: boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
10: String[] regions = null;
11: if (!isRemoteRegionRequested) {
12: EurekaMonitors.GET_ALL.increment();
13: } else {
14: regions = regionsStr.toLowerCase().split(",");
15: Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
16: EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
17: }
18:
19: // 判斷是否能夠訪問
20: // Check if the server allows the access to the registry. The server can
21: // restrict access if it is not
22: // ready to serve traffic depending on various reasons.
23: if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
24: return Response.status(Status.FORBIDDEN).build();
25: }
26:
27: // API 版本
28: CurrentRequestVersion.set(Version.toEnum(version));
29:
30: // 返回數據格式
31: KeyType keyType = Key.KeyType.JSON;
32: String returnMediaType = MediaType.APPLICATION_JSON;
33: if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
34: keyType = Key.KeyType.XML;
35: returnMediaType = MediaType.APPLICATION_XML;
36: }
37:
38: // 響應緩存鍵( KEY )
39: Key cacheKey = new Key(Key.EntityType.Application,
40: ResponseCacheImpl.ALL_APPS,
41: keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
42: );
43:
44: //
45: Response response;
46: if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
47: response = Response.ok(responseCache.getGZIP(cacheKey))
48: .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
49: .header(HEADER_CONTENT_TYPE, returnMediaType)
50: .build();
51: } else {
52: response = Response.ok(responseCache.get(cacheKey))
53: .build();
54: }
55: return response;
56: }
複製代碼
第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry
第 19 至 25 行 :Eureka-Server 啓動完成,可是未處於就緒( Ready )狀態,不接受請求全量應用註冊信息的請求,例如,Eureka-Server 啓動時,未能從其餘 Eureka-Server 集羣的節點獲取到應用註冊信息。
第 27 至 28 行 :設置 API 版本號。默認最新 API 版本爲 V2。實現代碼以下:
public enum Version {
V1, V2;
public static Version toEnum(String v) {
for (Version version : Version.values()) {
if (version.name().equalsIgnoreCase(v)) {
return version;
}
}
//Defaults to v2
return V2;
}
}
複製代碼
第 30 至 36 行 :設置返回數據格式,默認 JSON 。
第 38 至 42 行 :建立響應緩存( ResponseCache ) 的鍵( KEY ),在 「3.2.1 緩存鍵」詳細解析。
第 44 至 55 行 :從響應緩存讀取全量註冊信息,在 「3.3 緩存讀取」詳細解析。
com.netflix.eureka.registry.ResponseCache
,響應緩存接口,接口代碼以下:
public interface ResponseCache {
String get(Key key);
byte[] getGZIP(Key key);
void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
AtomicLong getVersionDelta();
AtomicLong getVersionDeltaWithRegions();
}
複製代碼
其中,#getVersionDelta()
和 #getVersionDeltaWithRegions()
已經廢棄。這裏保留的緣由主要是考慮兼容性。判斷依據來自以下代碼:
// Applications.java
@Deprecated
public void setVersion(Long version) {
this.versionDelta = version;
}
// AbstractInstanceRegistry.java
public Applications getApplicationDeltas() {
// ... 省略其它無關代碼
apps.setVersion(responseCache.getVersionDelta().get()); // 惟一調用到 ResponseCache#getVersionDelta() 方法的地方
// ... 省略其它無關代碼
}
複製代碼
#get()
:得到緩存。
#getGZIP()
:得到緩存,並 GZIP 。
#invalidate()
:過時緩存。
com.netflix.eureka.registry.Key
,緩存鍵。實現代碼以下:
public class Key {
public enum KeyType {
JSON, XML
}
/** * An enum to define the entity that is stored in this cache for this key. */
public enum EntityType {
Application, VIP, SVIP
}
/** * 實體名 */
private final String entityName;
/** * TODO[0009]:RemoteRegionRegistry */
private final String[] regions;
/** * 請求參數類型 */
private final KeyType requestType;
/** * 請求 API 版本號 */
private final Version requestVersion;
/** * hashKey */
private final String hashKey;
/** * 實體類型 * * {@link EntityType} */
private final EntityType entityType;
/** * {@link EurekaAccept} */
private final EurekaAccept eurekaAccept;
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();
}
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();
}
@Override
public int hashCode() {
String hashKey = getHashKey();
return hashKey.hashCode();
}
@Override
public boolean equals(Object other) {
if (other instanceof Key) {
return getHashKey().equals(((Key) other).getHashKey());
} else {
return false;
}
}
}
複製代碼
com.netflix.eureka.registry.ResponseCacheImpl
,響應緩存實現類。
在 ResponseCacheImpl 裏,將緩存拆分紅兩層 :
readOnlyCacheMap
)readWriteCacheMap
)默認配置下,緩存讀取策略以下:
緩存過時策略以下:
readWriteCacheMap
。readWriteCacheMap
寫入一段時間( 可配置 )後自動過時。readWriteCacheMap
和 readOnlyCacheMap
的緩存值,若不一致,之前者爲主。經過這樣的方式,實現了 readOnlyCacheMap
的定時過時。注意:應用實例註冊、下線、過時時,不會很快刷新到 readWriteCacheMap
緩存裏。默認配置下,最大延遲在 30 秒。
爲何可使用緩存?
在 CAP 的選擇上,Eureka 選擇了 AP ,不一樣於 Zookeeper 選擇了 CP 。
推薦閱讀:
調用 ResponseCacheImpl#get(...)
方法( #getGzip(...)
相似 ),讀取緩存,實現代碼以下:
1: private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
2:
3: private final LoadingCache<Key, Value> readWriteCacheMap;
4:
5: public String get(final Key key) {
6: return get(key, shouldUseReadOnlyResponseCache);
7: }
8:
9: String get(final Key key, boolean useReadOnlyCache) {
10: Value payload = getValue(key, useReadOnlyCache);
11: if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
12: return null;
13: } else {
14: return payload.getPayload();
15: }
16: }
17:
18: Value getValue(final Key key, boolean useReadOnlyCache) {
19: Value payload = null;
20: try {
21: if (useReadOnlyCache) {
22: final Value currentPayload = readOnlyCacheMap.get(key);
23: if (currentPayload != null) {
24: payload = currentPayload;
25: } else {
26: payload = readWriteCacheMap.get(key);
27: readOnlyCacheMap.put(key, payload);
28: }
29: } else {
30: payload = readWriteCacheMap.get(key);
31: }
32: } catch (Throwable t) {
33: logger.error("Cannot get value for key :" + key, t);
34: }
35: return payload;
36: }
複製代碼
第 5 至 7 行 :調用 #get(key, useReadOnlyCache)
方法,讀取緩存。其中 shouldUseReadOnlyResponseCache
經過配置 eureka.shouldUseReadOnlyResponseCache = true
(默認值 :true
) 開啓只讀緩存。若是你對數據的一致性有相對高的要求,能夠關閉這個開關,固然由於少了 readOnlyCacheMap
,性能會有必定的降低。
第 9 至 16 行 :調用 getValue(key, useReadOnlyCache)
方法,讀取緩存。從 readOnlyCacheMap
和 readWriteCacheMap
變量能夠看到緩存值的類爲 com.netflix.eureka.registry.ResponseCacheImpl.Value
,實現代碼以下:
public class Value {
/** * 原始值 */
private final String payload;
/** * GZIP 壓縮後的值 */
private byte[] gzipped;
public Value(String payload) {
this.payload = payload;
if (!EMPTY_PAYLOAD.equals(payload)) {
// ... 省略 GZIP 壓縮代碼
gzipped = bos.toByteArray();
} else {
gzipped = null;
}
}
public String getPayload() {
return payload;
}
public byte[] getGzipped() {
return gzipped;
}
}
複製代碼
第 21 至 31 行 :讀取緩存。
第 21 至 28 行 :先讀取 readOnlyCacheMap
。讀取不到,讀取 readWriteCacheMap
,並設置到 readOnlyCacheMap
。
第 29 至 31 行 :讀取 readWriteCacheMap
。
readWriteCacheMap
實現代碼以下:
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
// TODO[0009]:RemoteRegionRegistry
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
// // TODO[0009]:RemoteRegionRegistry
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
複製代碼
readWriteCacheMap
最大緩存數量爲 1000 。#generatePayload(key)
方法,生成緩存值。#generatePayload(key)
方法,實現代碼以下:
1: private Value generatePayload(Key key) {
2: Stopwatch tracer = null;
3: try {
4: String payload;
5: switch (key.getEntityType()) {
6: case Application:
7: boolean isRemoteRegionRequested = key.hasRegions();
8:
9: if (ALL_APPS.equals(key.getName())) {
10: if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
11: tracer = serializeAllAppsWithRemoteRegionTimer.start();
12: payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
13: } else {
14: tracer = serializeAllAppsTimer.start();
15: payload = getPayLoad(key, registry.getApplications());
16: }
17: } else if (ALL_APPS_DELTA.equals(key.getName())) {
18: // ... 省略增量獲取相關的代碼
19: } else {
20: tracer = serializeOneApptimer.start();
21: payload = getPayLoad(key, registry.getApplication(key.getName()));
22: }
23: break;
24: // ... 省略部分代碼
25: }
26: return new Value(payload);
27: } finally {
28: if (tracer != null) {
29: tracer.stop();
30: }
31: }
32: }
複製代碼
AbstractInstanceRegistry#getApplications()
方法,得到註冊的應用集合。後調用 #getPayLoad()
方法,將註冊的應用集合轉換成緩存值。🙂 這兩個方法代碼較多,下面詳細解析。調用 AbstractInstanceRegistry#getApplications()
方法,得到註冊的應用集合,實現代碼以下:
1: // AbstractInstanceRegistry.java
2:
3: private static final String[] EMPTY_STR_ARRAY = new String[0];
4:
5: public Applications getApplications() {
6: boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
7: if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry
8: return getApplicationsFromLocalRegionOnly();
9: } else {
10: return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.
11: }
12: }
13:
14: public Applications getApplicationsFromLocalRegionOnly() {
15: return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
16: }
複製代碼
第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry
第 9 至 16 行 :調用 #getApplicationsFromMultipleRegions(...)
方法,得到註冊的應用集合,實現代碼以下:
1: public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
2: // TODO[0009]:RemoteRegionRegistry
3: boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
4: logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
5: includeRemoteRegion, Arrays.toString(remoteRegions));
6: if (includeRemoteRegion) {
7: GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
8: } else {
9: GET_ALL_CACHE_MISS.increment();
10: }
11: // 得到得到註冊的應用集合
12: Applications apps = new Applications();
13: apps.setVersion(1L);
14: for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
15: Application app = null;
16:
17: if (entry.getValue() != null) {
18: for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
19: Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
20: if (app == null) {
21: app = new Application(lease.getHolder().getAppName());
22: }
23: app.addInstance(decorateInstanceInfo(lease));
24: }
25: }
26: if (app != null) {
27: apps.addApplication(app);
28: }
29: }
30: // TODO[0009]:RemoteRegionRegistry
31: if (includeRemoteRegion) {
32: for (String remoteRegion : remoteRegions) {
33: RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
34: if (null != remoteRegistry) {
35: Applications remoteApps = remoteRegistry.getApplications();
36: for (Application application : remoteApps.getRegisteredApplications()) {
37: if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
38: logger.info("Application {} fetched from the remote region {}",
39: application.getName(), remoteRegion);
40:
41: Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
42: if (appInstanceTillNow == null) {
43: appInstanceTillNow = new Application(application.getName());
44: apps.addApplication(appInstanceTillNow);
45: }
46: for (InstanceInfo instanceInfo : application.getInstances()) {
47: appInstanceTillNow.addInstance(instanceInfo);
48: }
49: } else {
50: logger.debug("Application {} not fetched from the remote region {} as there exists a "
51: + "whitelist and this app is not in the whitelist.",
52: application.getName(), remoteRegion);
53: }
54: }
55: } else {
56: logger.warn("No remote registry available for the remote region {}", remoteRegion);
57: }
58: }
59: }
60: // 設置 應用集合 hashcode
61: apps.setAppsHashCode(apps.getReconcileHashCode());
62: return apps;
63: }
複製代碼
hashcode
。該變量用於校驗增量獲取的註冊信息和 Eureka-Server 全量的註冊信息是否一致( 完整 ),在 《Eureka 源碼解析 —— 應用實例註冊發現 (七)之增量獲取》 詳細解析。調用 #getPayLoad()
方法,將註冊的應用集合轉換成緩存值,實現代碼以下:
/** * Generate pay load with both JSON and XML formats for all applications. */
private String getPayLoad(Key key, Applications apps) {
// 得到編碼器
EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
String result;
try {
// 編碼
result = encoderWrapper.encode(apps);
} catch (Exception e) {
logger.error("Failed to encode the payload for all apps", e);
return "";
}
if(logger.isDebugEnabled()) {
logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
}
return result;
}
複製代碼
應用實例註冊、下線、過時時,調用 ResponseCacheImpl#invalidate()
方法,主動過時讀寫緩存( readWriteCacheMap
),實現代碼以下:
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
invalidate(
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
複製代碼
調用 #invalidate(keys)
方法,逐個過時每一個緩存鍵值,實現代碼以下:
public void invalidate(Key... keys) {
for (Key key : keys) {
logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
// 過時讀寫緩存
readWriteCacheMap.invalidate(key);
// TODO[0009]:RemoteRegionRegistry
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion : keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
複製代碼
讀寫緩存( readWriteCacheMap
) 寫入後,一段時間自動過時,實現代碼以下:
expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
複製代碼
eureka.responseCacheAutoExpirationInSeconds
,設置寫入過時時長。默認值 :180 秒。定時任務對比 readWriteCacheMap
和 readOnlyCacheMap
的緩存值,若不一致,之前者爲主。經過這樣的方式,實現了 readOnlyCacheMap
的定時過時。實現代碼以下:
1: ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
2: // ... 省略無關代碼
3:
4: long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
5: // ... 省略無關代碼
6:
7: if (shouldUseReadOnlyResponseCache) {
8: timer.schedule(getCacheUpdateTask(),
9: new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
10: + responseCacheUpdateIntervalMs),
11: responseCacheUpdateIntervalMs);
12: }
13:
14: // ... 省略無關代碼
15: }
16:
17: private TimerTask getCacheUpdateTask() {
18: return new TimerTask() {
19: @Override
20: public void run() {
21: logger.debug("Updating the client cache from response cache");
22: for (Key key : readOnlyCacheMap.keySet()) { // 循環 readOnlyCacheMap 的緩存鍵
23: if (logger.isDebugEnabled()) {
24: Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
25: logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
26: }
27: try {
28: CurrentRequestVersion.set(key.getVersion());
29: Value cacheValue = readWriteCacheMap.get(key);
30: Value currentCacheValue = readOnlyCacheMap.get(key);
31: if (cacheValue != currentCacheValue) { // 不一致時,進行替換
32: readOnlyCacheMap.put(key, cacheValue);
33: }
34: } catch (Throwable th) {
35: logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
36: }
37: }
38: }
39: };
40: }
複製代碼
eureka.responseCacheUpdateIntervalMs
,設置任務執行頻率,默認值 :30 * 1000 毫秒。readOnlyCacheMap
的緩存鍵。爲何不循環 readWriteCacheMap
呢? readOnlyCacheMap
的緩存過時依賴 readWriteCacheMap
,所以緩存鍵會更多。readWriteCacheMap
和 readOnlyCacheMap
的緩存值,若不一致,之前者爲主。經過這樣的方式,實現了 readOnlyCacheMap
的定時過時。比預期,比想一想,長老多老多的一篇文章。細思極恐。
估計下一篇增量獲取會簡潔不少。
胖友,分享個人公衆號( 芋道源碼 ) 給你的胖友可好?