Eureka - 服務發現

Eureka - Client服務啓動咱們看到,註冊表獲取的地方有兩個,一個是EurekaClient構造函數,一個是定時器每隔30秒去獲取。咱們先看看定時器TimedSupervisorTask的run方法segmentfault

定時器

這個方法有三個比較重要的參數,timeoutMillis、delay、maxDelay。好比頻率是30s,那這個maxDelay就是30*10=300s(這個10的來源參考上一篇),delay在這裏是每次都翻倍,可是不能比maxDelay大。
整個設計思路是,若是調用30s超時,那就用60秒,若是再超時,就一直翻,可是不能超過300s。若是在後面的調用中正常了,那delay就恢復到30s,下次超時繼續翻倍。緩存

@Override
public void run() {
    Future<?> future = null;
    try {
        // 執行任務
        future = executor.submit(task);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        // 指定超時時間
        future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
        //設置delay
        delay.set(timeoutMillis);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        successCounter.increment();
    } catch (TimeoutException e) {
        logger.warn("task supervisor timed out", e);
        timeoutCounter.increment();
        // 獲取delay
        long currentDelay = delay.get();
        // maxDelay和currentDelay的2倍中取最小值
        long newDelay = Math.min(maxDelay, currentDelay * 2);
        // 設置爲上面最小值
        delay.compareAndSet(currentDelay, newDelay);

    } catch (RejectedExecutionException e) {
        // 其餘的略
        rejectedCounter.increment();
    } catch (Throwable e) {
        // 其餘的略
        throwableCounter.increment();
    } finally {
        if (future != null) {
            future.cancel(true);
        }
        // 把任務放入定時器,時間是delay
        if (!scheduler.isShutdown()) {
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}

服務發現

判斷增量獲取仍是全量獲取服務器

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {
        Applications applications = getApplications();
        // 禁用增量、強制全量更新、沒有註冊信息的時候,進行全量更新
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            // 其餘日誌打印略
            getAndStoreFullRegistry();
        } else {
            //增量更新
            getAndUpdateDelta(applications);
        }
        // 計算hash
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        // 其餘日誌打印略
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    // 其餘略
    return true;
}

全量獲取

全量獲取,調用註冊中心地址+apps/獲取。app

private void getAndStoreFullRegistry() throws Throwable {
    // 其餘略
    Applications apps = null;
    // 調用註冊中心地址+apps/獲取
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    // 其餘略
}

增量獲取

增量更新,獲取後,經過hash要判斷是否和服務器一致,不一致就全量ide

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    // 調用註冊中心地址+apps/delta獲取
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }

    if (delta == null) {
        //獲取不到數據,全力更新
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        //加鎖
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 和本地的合併
                updateDelta(delta);
                // 計算hash
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                //解鎖
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        // 和服務器的hash對比
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            // 不一致說明和服務器的不一致,直接全量
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        // 其餘略
    }
}

和本地數據的合併,包括新增、修改、刪除。函數

private void updateDelta(Applications delta) {
    int deltaCount = 0;
    // 遍歷全部增量
    for (Application app : delta.getRegisteredApplications()) {
        // 遍歷全部實例
        for (InstanceInfo instance : app.getInstances()) {
            // 本地緩存
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            // 是否爲同一個region,若是不是,則取同一個region的applications
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }

            ++deltaCount;
            if (ActionType.ADDED.equals(instance.getActionType())) {
                // 新增處理
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                // 修改處理
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Modified instance {} to the existing apps ", instance.getId());
                // 和新增都是調用addInstance方法,由於Application#addInstance裏是先移除再新增,全部修改也能夠調用
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

            } else if (ActionType.DELETED.equals(instance.getActionType())) {
                // 刪除處理
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {
                    logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                    existingApp.removeInstance(instance);
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                        applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }
    logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
    getApplications().setVersion(delta.getVersion());
    getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

    for (Applications applications : remoteRegionVsApps.values()) {
        applications.setVersion(delta.getVersion());
        applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    }
}
相關文章
相關標籤/搜索