Spring Cloud Eureka 與 Ribbon 是怎麼作服務發現的?

做者:fredalxin

地址:https://fredal.xin/how-eureka...java

本文基於 spring cloud dalston,同時文章較長,請選擇舒服姿式進行閱讀。面試

Eureka 與 Ribbon 是什麼?和服務發現什麼關係?

Eureka 與 Ribbon 都是 Netflix 提供的微服務組件,分別用於服務註冊與發現、負載均衡。同時,這二者均屬於 spring cloud netflix 體系,和 spring cloud 無縫集成,也正因爲此被你們所熟知。spring

Eureka 自己是服務註冊發現組件,實現了完整的 Service Registry 和 Service Discovery。緩存

Ribbon 則是一款負載均衡組件,那它和服務發現又有什麼關係呢?負載均衡在整個微服務的調用模型中是緊挨着服務發現的,而 Ribbon 這個框架它實際上是起到了開發者服務消費行爲與底層服務發現組件 Eureka 之間橋樑的做用。架構

從嚴格概念上說 Ribbon 並非作服務發現的,可是因爲 Netflix 組件的鬆耦合,Ribbon 須要對 Eureka 的緩存服務列表進行相似"服務發現"的行爲,從而構建本身的負載均衡列表並及時更新,也就是說 Ribbon 中的"服務發現"的賓語變成了 Eureka(或其餘服務發現組件)。intellij-idea

Eureka 的服務註冊與發現

咱們會先對 Eureka 的服務發現進行描述,重點是 Eureka-client 是如何進行服務的註冊與發現的,同時不會過多停留於 Eureka 的架構、Eureka-server 的實現、Zone/Region 等範疇。app

Eureka-client 的服務發現都是由 DiscoveryClient 類實現的,它主要包括的功能有:負載均衡

  • 向 Eureka-server 註冊服務實例
  • 更新在 Eureka-server 的租期
  • 取消在 Eureka-server 的租約(服務下線)
  • 發現服務實例並按期更新

服務註冊

DiscoveryClient 全部的定時任務都是在 initScheduledTasks()方法裏,咱們能夠看到如下關鍵代碼:框架

private void initScheduledTasks() {
    ...
    if (clientConfig.shouldRegisterWithEureka()) {
        ...
         // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
        ...
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    }
}

咱們能夠看到在 if 判斷分支裏建立了一個 instanceInfoReplicator 實例,它會經過 start 執行一個定時任務:async

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

咱們能夠在 InstanceInfoReplicator 類的 run()方法中找到這一段,同時能夠一眼發現其註冊關鍵點在於discoveryClient.register()這段,咱們點進去看看:

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

這邊能夠發現是經過 HTTP REST (jersey 客戶端)請求的方式將 instanceInfo 實例信息註冊到 Eureka-server 上。咱們簡單看一下 InstanceInfo 對象,屬性基本上都能見名知義:

@JsonCreator
public InstanceInfo(
    @JsonProperty("instanceId") String instanceId,
    @JsonProperty("app") String appName,
    @JsonProperty("appGroupName") String appGroupName,
    @JsonProperty("ipAddr") String ipAddr,
    @JsonProperty("sid") String sid,
    @JsonProperty("port") PortWrapper port,
    @JsonProperty("securePort") PortWrapper securePort,
    @JsonProperty("homePageUrl") String homePageUrl,
    @JsonProperty("statusPageUrl") String statusPageUrl,
    @JsonProperty("healthCheckUrl") String healthCheckUrl,
    @JsonProperty("secureHealthCheckUrl") String secureHealthCheckUrl,
    @JsonProperty("vipAddress") String vipAddress,
    @JsonProperty("secureVipAddress") String secureVipAddress,
    @JsonProperty("countryId") int countryId,
    @JsonProperty("dataCenterInfo") DataCenterInfo dataCenterInfo,
    @JsonProperty("hostName") String hostName,
    @JsonProperty("status") InstanceStatus status,
    @JsonProperty("overriddenstatus") InstanceStatus overriddenstatus,
    @JsonProperty("leaseInfo") LeaseInfo leaseInfo,
    @JsonProperty("isCoordinatingDiscoveryServer") Boolean isCoordinatingDiscoveryServer,
    @JsonProperty("metadata") HashMap<String, String> metadata,
    @JsonProperty("lastUpdatedTimestamp") Long lastUpdatedTimestamp,
    @JsonProperty("lastDirtyTimestamp") Long lastDirtyTimestamp,
    @JsonProperty("actionType") ActionType actionType,
    @JsonProperty("asgName") String asgName) {
    this.instanceId = instanceId;
    this.sid = sid;
    this.appName = StringCache.intern(appName);
    this.appGroupName = StringCache.intern(appGroupName);
    this.ipAddr = ipAddr;
    this.port = port == null ? 0 : port.getPort();
    this.isUnsecurePortEnabled = port != null && port.isEnabled();
    this.securePort = securePort == null ? 0 : securePort.getPort();
    this.isSecurePortEnabled = securePort != null && securePort.isEnabled();
    this.homePageUrl = homePageUrl;
    this.statusPageUrl = statusPageUrl;
    this.healthCheckUrl = healthCheckUrl;
    this.secureHealthCheckUrl = secureHealthCheckUrl;
    this.vipAddress = StringCache.intern(vipAddress);
    this.secureVipAddress = StringCache.intern(secureVipAddress);
    this.countryId = countryId;
    this.dataCenterInfo = dataCenterInfo;
    this.hostName = hostName;
    this.status = status;
    this.overriddenstatus = overriddenstatus;
    this.leaseInfo = leaseInfo;
    this.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer;
    this.lastUpdatedTimestamp = lastUpdatedTimestamp;
    this.lastDirtyTimestamp = lastDirtyTimestamp;
    this.actionType = actionType;
    this.asgName = StringCache.intern(asgName);

    // ---------------------------------------------------------------
    // for compatibility

    if (metadata == null) {
        this.metadata = Collections.emptyMap();
    } else if (metadata.size() == 1) {
        this.metadata = removeMetadataMapLegacyValues(metadata);
    } else {
        this.metadata = metadata;
    }

    if (sid == null) {
        this.sid = SID_DEFAULT;
    }
}

總結一下整個過程以下:

服務續期

服務續期提及來可能比較晦澀,其實就是在 client 端定時發起調用,讓 Eureka-server 知道本身還活着,在 eureka 代碼中的註釋解釋爲心跳(heart-beat)。

這裏有兩個比較重要的配置須要注意:

  • instance.leaseRenewalIntervalInSeconds
    表示客戶端的更新頻率,默認 30s,也就是每 30s 就會向 Eureka-server 發起 renew 更新操做。
  • instance.leaseExpirationDurationInSeconds
    這是服務端視角的失效時間,默認是 90s,也就是 Eureka-server 在 90s 內沒有接收到來自 client 的 renew 操做就會將其剔除。

咱們直接從代碼角度看一下,一樣呢相關定時任務在 initScheduledTasks()方法中:

private void initScheduledTasks() {
    ...
    if (clientConfig.shouldRegisterWithEureka()) {
        ...
         // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
        ...
    }
}

能夠看到這裏建立了一個 HeartbeatThread()線程執行操做:

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

咱們直接看 renew()方法:

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            return register();
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

這裏比較簡單,能夠發現和服務註冊是相似的,一樣使用 HTTP REST 發起一個 hearbeat 請求,底層使用 jersey 客戶端。

總結一下整個過程以下:

服務註銷

服務註銷邏輯比較簡單,自己並不在定時任務中觸發,而是經過對方法標記@PreDestroy,從而調用 shutdown 方法觸發,最終會調用 unRegister()方法進行註銷,一樣的這也是一個 HTTP REST 請求,能夠簡單看下代碼:

@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }

        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }

        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }

        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        logger.info("Completed shut down of DiscoveryClient");
    }
}

/**
     * unregister w/ the eureka service.
     */
void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + appPathIdentifier + " - deregister  status: " + httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e);
        }
    }
}

服務發現及更新

咱們來看做爲服務消費者的關鍵邏輯,即發現服務以及更新服務。

首先 consumer 會在啓動時從 Eureka-server 獲取全部的服務列表,並在本地緩存。同時呢,因爲本地有一份緩存,因此須要按期更新,更新頻率能夠配置。

啓動時候在 consumer 在 discoveryClient 中會調用 fetchRegistry() 方法:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            ...
    if (clientConfig.shouldDisableDelta()
            || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
    {
        ...
        getAndStoreFullRegistry();
    } else {
        getAndUpdateDelta(applications);
    }
    ...
}

這裏能夠看到 fetchRegistry 裏有 2 個判斷分支,對應首次更新以及後續更新。首次更新會調用 getAndStoreFullRegistry()方法,咱們看一下:

private void getAndStoreFullRegistry() throws Throwable {
     long currentUpdateGeneration = fetchRegistryGeneration.get();

     logger.info("Getting all instance registry info from the eureka server");

     Applications apps = null;
     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
         ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
         : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
         apps = httpResponse.getEntity();
     }
     logger.info("The response status is {}", httpResponse.getStatusCode());

     if (apps == null) {
         logger.error("The application is null for some reason. Not storing this information");
     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
         localRegionApps.set(this.filterAndShuffle(apps));
         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
     } else {
         logger.warn("Not updating applications as another thread is updating it already");
     }
 }

能夠看到和以前相似,若是在沒有特殊指定的狀況下,咱們會發起一個 HTTP REST 請求拉取全部應用的信息並進行緩存,緩存對象爲 Applications,有興趣的能夠進一步查看。

接下來,在咱們熟悉的 initScheduledTasks()方法中,咱們還會啓動一個更新應用信息緩存的 task:

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    ...
}

在 CacheRefreshThread()這個 task 的 run 方法中,仍然會調用到咱們以前的 fetchRegistry()方法,同時在判斷時會走到另外一個分支中,即調用到 getAndUpdateDelta()方法:

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }

    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

能夠看到,這邊是使用 HTTP REST 發起一個 getDelta 請求,同時在 updateDelta()方法中會更新本地的 Applications 緩存對象。

總結一下,整個服務發現與更新的過程以下:

Ribbon 的"服務發現"

接下來咱們來看看 Ribbon 是怎麼基於 Eureka 進行"服務發現"的,咱們以前說過這裏的"服務發現"並非嚴格意義上的服務發現,而是 Ribbon 如何基於 Eureka 構建本身的負載均衡列表並及時更新,同時咱們也不關注 Ribbon 其餘負載均衡的具體邏輯(包括 IRule 路由,IPing 判斷可用性)。

咱們能夠先作一些猜測,首先 Ribbon 確定是基於 Eureka 的服務發現的。咱們上邊描述了 Eureka 會拉取全部服務信息到本地緩存 Applications 中,那麼 Ribbon 確定是基於這個 Applications 緩存來構建負載均衡列表的了,同時呢,負載均衡列表一樣須要一個定時更新的機制來保證一致性。

服務調用

首先咱們從開發者的最初使用上看,在開發者在 RestTemplate 上開啓@LoadBalanced 註解就可開啓 Ribbon 的邏輯了,顯然這是用了相似攔截的方法。在 LoadBalancerAutoConfiguration 類中,咱們能夠看到相關代碼:

...
@Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
    final List<RestTemplateCustomizer> customizers) {
    return new SmartInitializingSingleton() {
        @Override
        public void afterSingletonsInstantiated() {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        }
    };
}


@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
    @Bean
    public LoadBalancerInterceptor ribbonInterceptor(
        LoadBalancerClient loadBalancerClient,
        LoadBalancerRequestFactory requestFactory) {
        return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public RestTemplateCustomizer restTemplateCustomizer(
        final LoadBalancerInterceptor loadBalancerInterceptor) {
        return new RestTemplateCustomizer() {
            @Override
            public void customize(RestTemplate restTemplate) {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                    restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            }
        };
    }
}
...

能夠看到,在初始化的過程當中經過調用 customize()方法來給 RestTemplate 增長了攔截器 LoadBalancerInterceptor。而 LoadBalancerInterceptor 則是在攔截方法中使用了 loadBalancer(RibbonLoadBalancerClient 類) 完成請求調用:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                    final ClientHttpRequestExecution execution) throws IOException {
    final URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}

服務發現

到如今爲止呢,咱們的請求調用已經被 RibbonLoadBalancerClient 所封裝,而其"服務發現"也是發生在 RibbonLoadBalancerClient 中的。

咱們點到其 execute()方法中:

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                                                                             serviceId), serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

這裏根據 serviceId 構建了一個 ILoadBalancer,同時從 loadBalancer 中獲取到了最終的實例 server 信息。ILoadBalancer 是定義了負載均衡的一個接口,它的關鍵方法 chooseServer()便是從負載均衡列表根據路由規則中選取一個 server。固然咱們主要關心的點在於,負載均衡列表是怎麼構建出來的。

經過源碼跟蹤咱們發現,在經過 getLoadBalancer()方法構建好 ILoadBalancer 對象後,對象中就已經包含了服務列表。因此咱們來看看 ILoadBalancer 對象是怎麼建立的:

protected ILoadBalancer getLoadBalancer(String serviceId) {
    return this.clientFactory.getLoadBalancer(serviceId);
}

那麼這裏實際上是 springcloud 封裝的 clientFactory,它會在 applicationContext 容器中尋找對應的 bean 。

經過源碼追蹤,咱們能夠在自動配置類 RibbonClientConfiguration 中找到對應代碼:

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

咱們看到這裏最終構建了 ILoadBalancer,其實現類是 ZoneAwareLoadBalancer,咱們觀察其超類的初始化:

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                     ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
    }
    restOfInit(clientConfig);
}

這邊最終執行了 restOfInit()方法,進一步跟蹤:

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature();

    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

updateListOfServers()方法是獲取全部的 ServerList 的,最終由 serverListImpl.getUpdatedListOfServers()獲取全部的服務列表,在此 serverListImpl 即實現類爲 DiscoveryEnabledNIWSServerList。

其中 DiscoveryEnabledNIWSServerList 有 getInitialListOfServers()和 getUpdatedListOfServers()方法,具體代碼以下

@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
    return obtainServersViaDiscovery();
}

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
    return obtainServersViaDiscovery();
}

此時咱們查看 obtainServersViaDiscovery()方法,已經基本接近於事物本質了,它建立了一個 EurekaClient 對象,在此就是 Eureka 的 DiscoveryClient 實現類,調用了其 getInstancesByVipAddress()方法,它最終從 DiscoveryClient 的 Applications 緩存中根據 serviceId 選取了對應的服務信息:

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();
    }

    EurekaClient eurekaClient = eurekaClientProvider.get();
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // if targetRegion is null, it will be interpreted as the same region of client
            List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
            for (InstanceInfo ii : listOfInstanceInfo) {
                if (ii.getStatus().equals(InstanceStatus.UP)) {

                    if(shouldUseOverridePort){
                        if(logger.isDebugEnabled()){
                            logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                        }

                        // copy is necessary since the InstanceInfo builder just uses the original reference,
                        // and we don't want to corrupt the global eureka copy of the object which may be
                        // used by other clients in our system
                        InstanceInfo copy = new InstanceInfo(ii);

                        if(isSecure){
                            ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                        }else{
                            ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                        }
                    }

                    DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                    des.setZone(DiscoveryClient.getZone(ii));
                    serverList.add(des);
                }
            }
            if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
            }
        }
    }
    return serverList;
}

服務更新

咱們已經知道初次啓動時,Ribbon 是怎麼結合 Eureka 完成負載均衡列表的構建了,那麼與 Eureka 相似,咱們還須要及時對服務列表進行更新以保證一致性。

在 RibbonClientConfiguration 自動配置類中構建 ILoadBalancer 時咱們能夠看到其構造器中有 ServerListUpdater 對象,而此對象也是在當前類中構建的:

@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
    return new PollingServerListUpdater(config);
}

咱們觀察此對象中的 start()方法看是如何完成更新的:

@Override
public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            initialDelayMs,
            refreshIntervalMs,
            TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

這裏有 2 個配置,即 initialDelayMs 首次檢測默認 1s,refreshIntervalMs 檢測間隔默認 30s(和 Eureka 一致),建立了一個定時任務,執行 updateAction.doUpdate()方法。

咱們回到以前的 restOfInit()方法,查看其中的 enableAndInitLearnNewServersFeature()方法,能夠看到是在此處觸發了 ServerListUpdater 的 start 方法,同時傳入了 updateAction 對象:

public void enableAndInitLearnNewServersFeature() {
    LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
    serverListUpdater.start(updateAction);
}

其實 updateAction 一開始就已經建立好了,它仍然是調用 以前的 updateListOfServers 方法來進行後續的更新:

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
        updateListOfServers();
    }
};

總結一下 Ribbon 三部分服務發現的總體流程以下:

參考資料:

近期熱文推薦:

1.600+ 道 Java面試題及答案整理(2021最新版)

2.終於靠開源項目弄到 IntelliJ IDEA 激活碼了,真香!

3.阿里 Mock 工具正式開源,幹掉市面上全部 Mock 工具!

4.Spring Cloud 2020.0.0 正式發佈,全新顛覆性版本!

5.《Java開發手冊(嵩山版)》最新發布,速速下載!

以爲不錯,別忘了隨手點贊+轉發哦!

相關文章
相關標籤/搜索