本文基於SpringCloud-Dalston.SR5spring
咱們從Ribbon提及:EurekaClient也存在緩存,應用服務實例列表信息在每一個EurekaClient服務消費端都有緩存。通常的,Ribbon的LoadBalancer會讀取這個緩存,來知道當前有哪些實例能夠調用,從而進行負載均衡。這個loadbalancer一樣也有緩存。緩存
首先看這個LoadBalancer的緩存更新機制,相關類是PollingServerListUpdater:網絡
final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { //從EurekaClient緩存中獲取服務實例列表,保存在本地緩存 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; //定時調度 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS );
這個updateAction.doUpdate();就是從EurekaClient緩存中獲取服務實例列表,保存在BaseLoadBalancer的本地緩存:多線程
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>()); public void setServersList(List lsrv) { //寫入allServerList的代碼,這裏略 } @Override public List<Server> getAllServers() { return Collections.unmodifiableList(allServerList); }
這裏的getAllServers會在每一個負載均衡規則中被調用,例如RoundRobinRule:併發
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); //獲取服務實例列表,調用的就是剛剛提到的getAllServers List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; }
這個緩存須要注意下,有時候咱們只修改了EurekaClient緩存的更新時間,可是沒有修改這個LoadBalancer的刷新本地緩存時間,就是ribbon.ServerListRefreshInterval
,這個參數能夠設置的很小,由於沒有從網絡讀取,就是從一個本地緩存刷到另外一個本地緩存(如何配置緩存配置來實現服務實例快速下線快速感知快速刷新,能夠參考個人另外一篇文章)。app
而後咱們來看一下EurekaClient自己的緩存,直接看關鍵類DiscoveryClient的相關源碼,咱們這裏只關心本地Region的,多Region配置咱們先忽略:負載均衡
//本地緩存,能夠理解爲是一個軟連接 private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>(); private void initScheduledTasks() { //若是配置爲須要拉取服務列表,則設置定時拉取任務,這個配置默認是須要拉取服務列表 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } //其餘定時任務初始化的代碼,忽略 } //定時從EurekaServer拉取服務列表的任務 class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } void refreshRegistry() { try { //多Region配置處理代碼,忽略 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } //日誌代碼,忽略 } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } } //定時從EurekaServer拉取服務列表的核心方法 private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { Applications applications = getApplications(); //判斷,若是是第一次拉取,或者app列表爲空,就進行全量拉取,不然就會進行增量拉取 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } //緩存更新完成,發送個event給觀察者,目前沒啥用 onCacheRefreshed(); // 檢查下遠端的服務實例列表裏面包括本身,而且狀態是否對,這裏咱們不關心 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } //全量拉取代碼 private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications apps = null; //訪問/eureka/apps接口,拉取全部服務實例信息 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } //增量拉取代碼 private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; //訪問/eureka/delta接口,拉取全部服務實例增量信息 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { //若是delta爲空,拉取增量失敗,就全量拉取 logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //這裏設置原子鎖的緣由是怕某次調度網絡請求時間過長,致使同一時間有多線程拉取到增量信息併發修改 //拉取增量成功,檢查hashcode是否同樣,不同的話也會全量拉取 logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
以上就是對於EurekaClient拉取服務實例信息的源代碼分析,總結EurekaClient 重要緩存以下:ide
在EurekaServer端,全部的讀取請求都是讀的ReadOnlyMap(這個能夠配置) 有定時任務會定時從ReadWriteMap同步到ReadOnlyMap這個時間配置是:fetch
#eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上 #默認30s eureka.server.responseCacheUpdateInvervalMs=3000
相關代碼:ui
if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()}; logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache", th); } } } }; }
ReadWriteMap是一個LoadingCache,將Registry中的服務實例信息封裝成要返回的http響應(分別是通過gzip壓縮和非壓縮的),同時還有兩個特殊key,ALL_APPS和ALL_APPS_DELTA ALL_APPS就是全部服務實例信息 ALL_APPS_DELTA就是以前講註冊說的RecentlyChangedQueue裏面的實例列表封裝的http響應信息