Spring Cloud Eureka 全解 (4) - 核心流程-服務與實例列表獲取詳解

本文基於SpringCloud-Dalston.SR5spring

關於服務與實例列表獲取

EurekaClient端

咱們從Ribbon提及:EurekaClient也存在緩存,應用服務實例列表信息在每一個EurekaClient服務消費端都有緩存。通常的,Ribbon的LoadBalancer會讀取這個緩存,來知道當前有哪些實例能夠調用,從而進行負載均衡。這個loadbalancer一樣也有緩存。緩存

首先看這個LoadBalancer的緩存更新機制,相關類是PollingServerListUpdater:網絡

final Runnable wrapperRunnable = new Runnable() {
    @Override
    public void run() {
        if (!isActive.get()) {
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
            return;
        }
        try {
            //從EurekaClient緩存中獲取服務實例列表,保存在本地緩存
            updateAction.doUpdate();
            lastUpdated = System.currentTimeMillis();
        } catch (Exception e) {
            logger.warn("Failed one update cycle", e);
        }
    }
};

//定時調度
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
        wrapperRunnable,
        initialDelayMs,
        refreshIntervalMs,
        TimeUnit.MILLISECONDS
);

這個updateAction.doUpdate();就是從EurekaClient緩存中獲取服務實例列表,保存在BaseLoadBalancer的本地緩存:多線程

protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());

public void setServersList(List lsrv) {
    //寫入allServerList的代碼,這裏略
}

@Override
public List<Server> getAllServers() {
    return Collections.unmodifiableList(allServerList);
}

這裏的getAllServers會在每一個負載均衡規則中被調用,例如RoundRobinRule:併發

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        //獲取服務實例列表,調用的就是剛剛提到的getAllServers
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }

        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                + lb);
    }
    return server;
}

這個緩存須要注意下,有時候咱們只修改了EurekaClient緩存的更新時間,可是沒有修改這個LoadBalancer的刷新本地緩存時間,就是ribbon.ServerListRefreshInterval,這個參數能夠設置的很小,由於沒有從網絡讀取,就是從一個本地緩存刷到另外一個本地緩存(如何配置緩存配置來實現服務實例快速下線快速感知快速刷新,能夠參考個人另外一篇文章)。app

而後咱們來看一下EurekaClient自己的緩存,直接看關鍵類DiscoveryClient的相關源碼,咱們這裏只關心本地Region的,多Region配置咱們先忽略:負載均衡

//本地緩存,能夠理解爲是一個軟連接
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

private void initScheduledTasks() {
    //若是配置爲須要拉取服務列表,則設置定時拉取任務,這個配置默認是須要拉取服務列表
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    //其餘定時任務初始化的代碼,忽略
}

//定時從EurekaServer拉取服務列表的任務
class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
}

void refreshRegistry() {
    try {
        //多Region配置處理代碼,忽略

        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        //日誌代碼,忽略
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }        
}

//定時從EurekaServer拉取服務列表的核心方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        Applications applications = getApplications();

        //判斷,若是是第一次拉取,或者app列表爲空,就進行全量拉取,不然就會進行增量拉取
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    //緩存更新完成,發送個event給觀察者,目前沒啥用 
    onCacheRefreshed();

    // 檢查下遠端的服務實例列表裏面包括本身,而且狀態是否對,這裏咱們不關心
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

//全量拉取代碼
private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications apps = null;
    //訪問/eureka/apps接口,拉取全部服務實例信息
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

//增量拉取代碼

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    //訪問/eureka/delta接口,拉取全部服務實例增量信息
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }

    if (delta == null) {
        //若是delta爲空,拉取增量失敗,就全量拉取
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        //這裏設置原子鎖的緣由是怕某次調度網絡請求時間過長,致使同一時間有多線程拉取到增量信息併發修改
        //拉取增量成功,檢查hashcode是否同樣,不同的話也會全量拉取
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

以上就是對於EurekaClient拉取服務實例信息的源代碼分析,總結EurekaClient 重要緩存以下:ide

  1. EurekaClient第一次全量拉取,定時增量拉取應用服務實例信息,保存在緩存中。
  2. EurekaClient增量拉取失敗,或者增量拉取以後對比hashcode發現不一致,就會執行全量拉取,這樣避免了網絡某時段分片帶來的問題。
  3. 同時對於服務調用,若是涉及到ribbon負載均衡,那麼ribbon對於這個實例列表也有本身的緩存,這個緩存定時從EurekaClient的緩存更新

EurekaServer端

在EurekaServer端,全部的讀取請求都是讀的ReadOnlyMap(這個能夠配置) 有定時任務會定時從ReadWriteMap同步到ReadOnlyMap這個時間配置是:fetch

#eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上
#默認30s
eureka.server.responseCacheUpdateInvervalMs=3000

相關代碼:ui

if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache", th);
                }
            }
        }
    };
}

ReadWriteMap是一個LoadingCache,將Registry中的服務實例信息封裝成要返回的http響應(分別是通過gzip壓縮和非壓縮的),同時還有兩個特殊key,ALL_APPS和ALL_APPS_DELTA ALL_APPS就是全部服務實例信息 ALL_APPS_DELTA就是以前講註冊說的RecentlyChangedQueue裏面的實例列表封裝的http響應信息

相關文章
相關標籤/搜索