本文主要研究一下eureka client的serviceUrljava
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.javagit
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport, AbstractDiscoveryClientOptionalArgs args) { Collection<?> additionalFilters = args == null ? Collections.emptyList() : args.additionalFilters; EurekaJerseyClient providedJerseyClient = args == null ? null : args.eurekaJerseyClient; TransportClientFactories argsTransportClientFactories = null; if (args != null && args.getTransportClientFactories() != null) { argsTransportClientFactories = args.getTransportClientFactories(); } // Ignore the raw types warnings since the client filter interface changed between jersey 1/2 @SuppressWarnings("rawtypes") TransportClientFactories transportClientFactories = argsTransportClientFactories == null ? new Jersey1TransportClientFactories() : argsTransportClientFactories; Optional<SSLContext> sslContext = args == null ? Optional.empty() : args.getSSLContext(); Optional<HostnameVerifier> hostnameVerifier = args == null ? Optional.empty() : args.getHostnameVerifier(); // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity eurekaTransport.transportClientFactory = providedJerseyClient == null ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier) : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient); ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() { @Override public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) { long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit); long delay = getLastSuccessfulRegistryFetchTimePeriod(); if (delay > thresholdInMs) { logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}", thresholdInMs, delay); return null; } else { return localRegionApps.get(); } } }; eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver( clientConfig, transportConfig, eurekaTransport.transportClientFactory, applicationInfoManager.getInfo(), applicationsSource ); if (clientConfig.shouldRegisterWithEureka()) { EurekaHttpClientFactory newRegistrationClientFactory = null; EurekaHttpClient newRegistrationClient = null; try { newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, transportConfig ); newRegistrationClient = newRegistrationClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.registrationClientFactory = newRegistrationClientFactory; eurekaTransport.registrationClient = newRegistrationClient; } // new method (resolve from primary servers for read) // Configure new transport layer (candidate for injecting in the future) if (clientConfig.shouldFetchRegistry()) { EurekaHttpClientFactory newQueryClientFactory = null; EurekaHttpClient newQueryClient = null; try { newQueryClientFactory = EurekaHttpClients.queryClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, clientConfig, transportConfig, applicationInfoManager.getInfo(), applicationsSource ); newQueryClient = newQueryClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.queryClientFactory = newQueryClientFactory; eurekaTransport.queryClient = newQueryClient; } }
構造器裏頭直接初始化scheduleServerEndpointTask,而後newBootstrapResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/EurekaHttpClients.javagithub
public static ClosableResolver<AwsEndpoint> newBootstrapResolver( final EurekaClientConfig clientConfig, final EurekaTransportConfig transportConfig, final TransportClientFactory transportClientFactory, final InstanceInfo myInstanceInfo, final ApplicationsResolver.ApplicationsSource applicationsSource) { if (COMPOSITE_BOOTSTRAP_STRATEGY.equals(transportConfig.getBootstrapResolverStrategy())) { if (clientConfig.shouldFetchRegistry()) { return compositeBootstrapResolver( clientConfig, transportConfig, transportClientFactory, myInstanceInfo, applicationsSource ); } else { logger.warn("Cannot create a composite bootstrap resolver if registry fetch is disabled." + " Falling back to using a default bootstrap resolver."); } } // if all else fails, return the default return defaultBootstrapResolver(clientConfig, myInstanceInfo); }
這裏執行defaultBootstrapResolver
/** * @return a bootstrap resolver that resolves eureka server endpoints based on either DNS or static config, * depending on configuration for one or the other. This resolver will warm up at the start. */ static ClosableResolver<AwsEndpoint> defaultBootstrapResolver(final EurekaClientConfig clientConfig, final InstanceInfo myInstanceInfo) { String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); ClusterResolver<AwsEndpoint> delegateResolver = new ZoneAffinityClusterResolver( new ConfigClusterResolver(clientConfig, myInstanceInfo), myZone, true ); List<AwsEndpoint> initialValue = delegateResolver.getClusterEndpoints(); if (initialValue.isEmpty()) { String msg = "Initial resolution of Eureka server endpoints failed. Check ConfigClusterResolver logs for more info"; logger.error(msg); failFastOnInitCheck(clientConfig, msg); } return new AsyncResolver<>( EurekaClientNames.BOOTSTRAP, delegateResolver, initialValue, 1, clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000 ); }
從註釋能夠看到,這裏從DNS或者配置文件來解析eureka server的配置,這裏的delegateResolver爲ZoneAffinityClusterResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/AsyncResolver.javaspring
this.backgroundTask = new TimedSupervisorTask( this.getClass().getSimpleName(), executorService, threadPoolExecutor, refreshIntervalMs, TimeUnit.MILLISECONDS, 5, updateTask ); private final Runnable updateTask = new Runnable() { @Override public void run() { try { List<T> newList = delegate.getClusterEndpoints(); if (newList != null) { resultsRef.getAndSet(newList); lastLoadTimestamp = System.currentTimeMillis(); } else { logger.warn("Delegate returned null list of cluster endpoints"); } logger.debug("Resolved to {}", newList); } catch (Exception e) { logger.warn("Failed to retrieve cluster endpoints from the delegate", e); } } };
這裏有個backgroundTask去更新ClusterEndpoints,間隔值爲clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000毫秒,默認爲5分鐘
配置項爲eureka.client.eureka-service-url-poll-interval-seconds
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/aws/ZoneAffinityClusterResolver.javabootstrap
public List<AwsEndpoint> getClusterEndpoints() { List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone); List<AwsEndpoint> myZoneEndpoints = parts[0]; List<AwsEndpoint> remainingEndpoints = parts[1]; List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints); if (!zoneAffinity) { Collections.reverse(randomizedList); } logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList); return randomizedList; }
這裏調用了delegate.getClusterEndpoints(),這裏的delegate爲ConfigClusterResolver
這裏對獲得的List<AwsEndpoint>進行randomizeAndMerge
private static List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) { if (myZoneEndpoints.isEmpty()) { return ResolverUtils.randomize(remainingEndpoints); } if (remainingEndpoints.isEmpty()) { return ResolverUtils.randomize(myZoneEndpoints); } List<AwsEndpoint> mergedList = ResolverUtils.randomize(myZoneEndpoints); mergedList.addAll(ResolverUtils.randomize(remainingEndpoints)); return mergedList; }
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/aws/ConfigClusterResolver.javaapp
public List<AwsEndpoint> getClusterEndpoints() { if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { if (logger.isInfoEnabled()) { logger.info("Resolving eureka endpoints via DNS: {}", getDNSName()); } return getClusterEndpointsFromDns(); } else { logger.info("Resolving eureka endpoints via configuration"); return getClusterEndpointsFromConfig(); } } private List<AwsEndpoint> getClusterEndpointsFromConfig() { String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); Map<String, List<String>> serviceUrls = EndpointUtils .getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka()); List<AwsEndpoint> endpoints = new ArrayList<>(); for (String zone : serviceUrls.keySet()) { for (String url : serviceUrls.get(zone)) { try { endpoints.add(new AwsEndpoint(url, getRegion(), zone)); } catch (Exception ignore) { logger.warn("Invalid eureka server URI: {}; removing from the server pool", url); } } } logger.debug("Config resolved to {}", endpoints); if (endpoints.isEmpty()) { logger.error("Cannot resolve to any endpoints from provided configuration: {}", serviceUrls); } return endpoints; }
這裏經過EndpointUtils.getServiceUrlsMapFromConfig解析serviceUrl
假設serviceUrl是
eureka: client: serviceUrl: defaultZone: http://127.0.0.1:8761/eureka/,http://127.0.0.1:8762/eureka/
則最後獲得兩個AwsEndpointdom
{ serviceUrl='http://127.0.0.1:8761/eureka/', region='us-east-1', zone='defaultZone'} { serviceUrl='http://127.0.0.1:8762/eureka/', region='us-east-1', zone='defaultZone'}
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/endpoint/EndpointUtils.javaide
/** * Get the list of all eureka service urls from properties file for the eureka client to talk to. * * @param clientConfig the clientConfig to use * @param instanceZone The zone in which the client resides * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise * @return an (ordered) map of zone -> list of urls mappings, with the preferred zone first in iteration order */ public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { Map<String, List<String>> orderedUrls = new LinkedHashMap<>(); String region = getRegion(clientConfig); String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); if (availZones == null || availZones.length == 0) { availZones = new String[1]; availZones[0] = DEFAULT_ZONE; } logger.debug("The availability zone for the given region {} are {}", region, availZones); int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); String zone = availZones[myZoneOffset]; List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); while (currentOffset != myZoneOffset) { zone = availZones[currentOffset]; serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } if (currentOffset == (availZones.length - 1)) { currentOffset = 0; } else { currentOffset++; } } if (orderedUrls.size() < 1) { throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); } return orderedUrls; }
這裏調用clientConfig.getEurekaServerServiceUrls(zone)解析serviceUrl
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaClientConfigBean.javafetch
public List<String> getEurekaServerServiceUrls(String myZone) { String serviceUrls = this.serviceUrl.get(myZone); if (serviceUrls == null || serviceUrls.isEmpty()) { serviceUrls = this.serviceUrl.get(DEFAULT_ZONE); } if (!StringUtils.isEmpty(serviceUrls)) { final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls); List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length); for (String eurekaServiceUrl : serviceUrlsSplit) { if (!endsWithSlash(eurekaServiceUrl)) { eurekaServiceUrl += "/"; } eurekaServiceUrls.add(eurekaServiceUrl); } return eurekaServiceUrls; } return new ArrayList<>(); }
能夠看到這裏對多個eureka server的地址採用逗號分割
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/decorator/RetryableEurekaHttpClient.javaui
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { List<EurekaEndpoint> candidateHosts = null; int endpointIdx = 0; for (int retry = 0; retry < numberOfRetries; retry++) { EurekaHttpClient currentHttpClient = delegate.get(); EurekaEndpoint currentEndpoint = null; if (currentHttpClient == null) { if (candidateHosts == null) { candidateHosts = getHostCandidates(); if (candidateHosts.isEmpty()) { throw new TransportException("There is no known eureka server; cluster server list is empty"); } } if (endpointIdx >= candidateHosts.size()) { throw new TransportException("Cannot execute request on any known server"); } currentEndpoint = candidateHosts.get(endpointIdx++); currentHttpClient = clientFactory.newClient(currentEndpoint); } try { EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient); if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) { delegate.set(currentHttpClient); if (retry > 0) { logger.info("Request execution succeeded on retry #{}", retry); } return response; } logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode()); } catch (Exception e) { logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace } // Connection error or 5xx from the server that must be retried on another server delegate.compareAndSet(currentHttpClient, null); if (currentEndpoint != null) { quarantineSet.add(currentEndpoint); } } throw new TransportException("Retry limit reached; giving up on completing the request"); }
能夠看到這裏先獲取candidateHosts,而後支持失敗重試,重試默認是3次(int DEFAULT_NUMBER_OF_RETRIES = 3
)
每重試一次失敗,就換下一個eureka server的地址,若是endpointIdx >= candidateHosts.size(),則拋出TransportException("Cannot execute request on any known server")
private List<EurekaEndpoint> getHostCandidates() { List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints(); quarantineSet.retainAll(candidateHosts); // If enough hosts are bad, we have no choice but start over again int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage()); //Prevent threshold is too large if (threshold > candidateHosts.size()) { threshold = candidateHosts.size(); } if (quarantineSet.isEmpty()) { // no-op } else if (quarantineSet.size() >= threshold) { logger.debug("Clearing quarantined list of size {}", quarantineSet.size()); quarantineSet.clear(); } else { List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size()); for (EurekaEndpoint endpoint : candidateHosts) { if (!quarantineSet.contains(endpoint)) { remainingHosts.add(endpoint); } } candidateHosts = remainingHosts; } return candidateHosts; }
這裏quarantineSet = new ConcurrentSkipListSet<>(),它維護的是不可用的eureka server列表(Connection error or 5xx
)
這裏有個threshold,是依據eureka.client.transport.retryableClientQuarantineRefreshPercentage來計算的,默認是0.66*candidateHosts.size(),大小最大爲candidateHosts.size(),防止quarantineSet.size() < threshold致使不會清空quarantineSet,最後致使remainingHosts爲空。
默認5分鐘執行一次,取決於eureka.client.eureka-service-url-poll-interval-seconds配置
)來刷新eureka server的list,默認仍是走ZoneAffinityClusterResolver,而後走ConfigClusterResolver去獲取,會從新隨機list