在Eureka - Client服務啓動咱們看到,註冊表獲取的地方有兩個,一個是EurekaClient構造函數,一個是定時器每隔30秒去獲取。咱們先看看定時器TimedSupervisorTask的run方法segmentfault
這個方法有三個比較重要的參數,timeoutMillis、delay、maxDelay。好比頻率是30s,那這個maxDelay就是30*10=300s(這個10的來源參考上一篇),delay在這裏是每次都翻倍,可是不能比maxDelay大。
整個設計思路是,若是調用30s超時,那就用60秒,若是再超時,就一直翻,可是不能超過300s。若是在後面的調用中正常了,那delay就恢復到30s,下次超時繼續翻倍。緩存
@Override public void run() { Future<?> future = null; try { // 執行任務 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); // 指定超時時間 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout //設置delay delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); // 獲取delay long currentDelay = delay.get(); // maxDelay和currentDelay的2倍中取最小值 long newDelay = Math.min(maxDelay, currentDelay * 2); // 設置爲上面最小值 delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { // 其餘的略 rejectedCounter.increment(); } catch (Throwable e) { // 其餘的略 throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } // 把任務放入定時器,時間是delay if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
判斷增量獲取仍是全量獲取服務器
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { Applications applications = getApplications(); // 禁用增量、強制全量更新、沒有註冊信息的時候,進行全量更新 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { // 其餘日誌打印略 getAndStoreFullRegistry(); } else { //增量更新 getAndUpdateDelta(applications); } // 計算hash applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { // 其餘日誌打印略 return false; } finally { if (tracer != null) { tracer.stop(); } } // 其餘略 return true; }
全量獲取,調用註冊中心地址+apps/獲取。app
private void getAndStoreFullRegistry() throws Throwable { // 其餘略 Applications apps = null; // 調用註冊中心地址+apps/獲取 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } // 其餘略 }
增量更新,獲取後,經過hash要判斷是否和服務器一致,不一致就全量ide
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; // 調用註冊中心地址+apps/delta獲取 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { //獲取不到數據,全力更新 getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; //加鎖 if (fetchRegistryUpdateLock.tryLock()) { try { // 和本地的合併 updateDelta(delta); // 計算hash reconcileHashCode = getReconcileHashCode(applications); } finally { //解鎖 fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason // 和服務器的hash對比 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { // 不一致說明和服務器的不一致,直接全量 reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { // 其餘略 } }
和本地數據的合併,包括新增、修改、刪除。函數
private void updateDelta(Applications delta) { int deltaCount = 0; // 遍歷全部增量 for (Application app : delta.getRegisteredApplications()) { // 遍歷全部實例 for (InstanceInfo instance : app.getInstances()) { // 本地緩存 Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); // 是否爲同一個region,若是不是,則取同一個region的applications if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; if (ActionType.ADDED.equals(instance.getActionType())) { // 新增處理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改處理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId()); // 和新增都是調用addInstance方法,由於Application#addInstance裏是先移除再新增,全部修改也能夠調用 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.DELETED.equals(instance.getActionType())) { // 刪除處理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { applications.removeApplication(existingApp); } } } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount); getApplications().setVersion(delta.getVersion()); getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } }