當前項目用的是springcloud-gateway + eureka + springboot
架構,請求會先通過網關,網關根據註冊中心獲取業務項目服務器地址,再轉發到業務服務接口上;這種架構在項目重啓時,存在幾個問題 :java
要解決以上問題,咱們須要先了解gateway
、eureka
、ribbon
、Tomcat
的原理,明白爲何會出現以上問題。web
先從gateway
入口處開始瞭解,如下是springcloud-gateway
官網的一張圖:
有個關鍵類RoutePredicateHandlerMapping
,繼承了AbstractHandlerMapping,是webflux的handlermapping,做用至關於webmvc的handlermapping:將請求映射到對應的handler來處理。RoutePredicateHandlerMapping
會遍歷全部路由Route
,獲取符合規則的路由,並將獲取到的route
放入當前請求上下文的屬性中。spring
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping { @Override protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { // don't handle requests on the management port if set if (managmentPort != null && exchange.getRequest().getURI().getPort() == managmentPort.intValue()) { return Mono.empty(); } exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); return lookupRoute(exchange) // .log("route-predicate-handler-mapping", Level.FINER) //name this .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r); } exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))); } protected Mono<Route> lookupRoute(ServerWebExchange exchange) { return this.routeLocator .getRoutes() //individually filter routes so that filterWhen error delaying is not a problem .concatMap(route -> Mono .just(route) .filterWhen(r -> { // add the current route we are testing exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return r.getPredicate().apply(exchange); }) //instead of immediately stopping main flux due to error, log and swallow it .doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e)) .onErrorResume(e -> Mono.empty()) ) // .defaultIfEmpty() put a static Route not found // or .switchIfEmpty() // .switchIfEmpty(Mono.<Route>empty().log("noroute")) .next() //TODO: error handling .map(route -> { if (logger.isDebugEnabled()) { logger.debug("Route matched: " + route.getId()); } validateRoute(route, exchange); return route; }); /* TODO: trace logging if (logger.isTraceEnabled()) { logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }*/ } }
從routeLocator.getRoutes()
看到是從routeLocator
裏獲取路由列表,咱們看下路由規則是怎麼生成的。routeLocator
有個實現類是RouteDefinitionRouteLocator
。緩存
public class RouteDefinitionRouteLocator implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware { @Override public Flux<Route> getRoutes() { return this.routeDefinitionLocator.getRouteDefinitions() .map(this::convertToRoute) //TODO: error handling .map(route -> { if (logger.isDebugEnabled()) { logger.debug("RouteDefinition matched: " + route.getId()); } return route; }); /* TODO: trace logging if (logger.isTraceEnabled()) { logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }*/ } }
RouteDefinitionRouteLocator
是從RouteDefinitionLocator
裏獲取路由列表。在項目配置的是基於服務發現的路由
:spring.cloud.gateway.discovery.locator.enabled: true
狀況下,RouteDefinitionLocator
實現類默認是從DiscoveryClientRouteDefinitionLocator
獲取路由列表。springboot
public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator { @Override public Flux<RouteDefinition> getRouteDefinitions() { SpelExpressionParser parser = new SpelExpressionParser(); Expression includeExpr = parser.parseExpression(properties.getIncludeExpression()); Expression urlExpr = parser.parseExpression(properties.getUrlExpression()); Predicate<ServiceInstance> includePredicate; if (properties.getIncludeExpression() == null || "true".equalsIgnoreCase(properties.getIncludeExpression())) { includePredicate = instance -> true; } else { includePredicate = instance -> { Boolean include = includeExpr.getValue(evalCtxt, instance, Boolean.class); if (include == null) { return false; } return include; }; } return Flux.fromIterable(discoveryClient.getServices()) .map(discoveryClient::getInstances) .filter(instances -> !instances.isEmpty()) .map(instances -> instances.get(0)) .filter(includePredicate) .map(instance -> { String serviceId = instance.getServiceId(); RouteDefinition routeDefinition = new RouteDefinition(); routeDefinition.setId(this.routeIdPrefix + serviceId); String uri = urlExpr.getValue(evalCtxt, instance, String.class); routeDefinition.setUri(URI.create(uri)); final ServiceInstance instanceForEval = new DelegatingServiceInstance(instance, properties); for (PredicateDefinition original : this.properties.getPredicates()) { PredicateDefinition predicate = new PredicateDefinition(); predicate.setName(original.getName()); for (Map.Entry<String, String> entry : original.getArgs().entrySet()) { String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry); predicate.addArg(entry.getKey(), value); } routeDefinition.getPredicates().add(predicate); } for (FilterDefinition original : this.properties.getFilters()) { FilterDefinition filter = new FilterDefinition(); filter.setName(original.getName()); for (Map.Entry<String, String> entry : original.getArgs().entrySet()) { String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry); filter.addArg(entry.getKey(), value); } routeDefinition.getFilters().add(filter); } return routeDefinition; }); } }
DiscoveryClientRouteDefinitionLocator
從Eureka
中獲取服務列表(discoveryClient.getServices
),爲了性能考慮,springcloud-gateway
又在RouteDefinitionRouteLocator
上套上了緩存CachingRouteLocator
。服務器
public class CachingRouteLocator implements RouteLocator, ApplicationListener<RefreshRoutesEvent> { private final RouteLocator delegate; private final Flux<Route> routes; private final Map<String, List> cache = new HashMap<>(); public CachingRouteLocator(RouteLocator delegate) { this.delegate = delegate; routes = CacheFlux.lookup(cache, "routes", Route.class) .onCacheMissResume(() -> this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE)); } @Override public Flux<Route> getRoutes() { return this.routes; } /** * Clears the routes cache * @return routes flux */ public Flux<Route> refresh() { this.cache.clear(); return this.routes; } @Override public void onApplicationEvent(RefreshRoutesEvent event) { refresh(); } @Deprecated /* for testing */ void handleRefresh() { refresh(); } }
CachingRouteLocator
監聽到RefreshRoutesEvent
事件時刷新緩存(GatewayWebfluxEndpoint
有一個HTTP API
調用了ApplicationEventPublisher
,發佈RefreshRoutesEvent
事件),而RouteRefreshListener
監聽到服務註冊InstanceRegisteredEvent
事件時,會發送RefreshRoutesEvent
事件,也就是當有新服務註冊時,會刷新緩存。架構
public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> { private HeartbeatMonitor monitor = new HeartbeatMonitor(); private final ApplicationEventPublisher publisher; public RouteRefreshListener(ApplicationEventPublisher publisher) { Assert.notNull(publisher, "publisher may not be null"); this.publisher = publisher; } @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ContextRefreshedEvent || event instanceof RefreshScopeRefreshedEvent || event instanceof InstanceRegisteredEvent) { reset(); } else if (event instanceof ParentHeartbeatEvent) { ParentHeartbeatEvent e = (ParentHeartbeatEvent) event; resetIfNeeded(e.getValue()); } else if (event instanceof HeartbeatEvent) { HeartbeatEvent e = (HeartbeatEvent) event; resetIfNeeded(e.getValue()); } } private void resetIfNeeded(Object value) { if (this.monitor.update(value)) { reset(); } } private void reset() { this.publisher.publishEvent(new RefreshRoutesEvent(this)); } }
接下來進入到FilteringWebHandler
,FilteringWebHandler
獲取route
的過濾器列表並轉爲過濾鏈,開始執行過濾器鏈。mvc
public class FilteringWebHandler implements WebHandler { @Override public Mono<Void> handle(ServerWebExchange exchange) { Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); List<GatewayFilter> gatewayFilters = route.getFilters(); List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); combined.addAll(gatewayFilters); //TODO: needed or cached? AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: "+ combined); } return new DefaultGatewayFilterChain(combined).filter(exchange); } }
接下來進入到RouteToRequestUrlFilter
,構造完整的負載均衡地址
,例如route
配置的中轉服務是lb://MY-SERVICE
,請求的路徑是/hello/world
,則構建後的地址是lb://MY-SERVICE/hello/world
,將構建後的地址放入當前請求上下文中,繼續下一個filter
。app
public class RouteToRequestUrlFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); if (route == null) { return chain.filter(exchange); } log.trace("RouteToRequestUrlFilter start"); URI uri = exchange.getRequest().getURI(); boolean encoded = containsEncodedParts(uri); URI routeUri = route.getUri(); if (hasAnotherScheme(routeUri)) { // this is a special url, save scheme to special attribute // replace routeUri with schemeSpecificPart exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme()); routeUri = URI.create(routeUri.getSchemeSpecificPart()); } if("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) { //Load balanced URIs should always have a host. If the host is null it is most //likely because the host name was invalid (for example included an underscore) throw new IllegalStateException("Invalid host: " + routeUri.toString()); } URI mergedUrl = UriComponentsBuilder.fromUri(uri) // .uri(routeUri) .scheme(routeUri.getScheme()) .host(routeUri.getHost()) .port(routeUri.getPort()) .build(encoded) .toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl); return chain.filter(exchange); } }
接下來就是LoadBalancerClientFilter
了,進入LoadBalancerClientFilter
能夠看到,首先獲取scheme,若是不是lb
,則直接往下一個filter傳遞;若是是lb
,則選擇服務節點構建成最終的中轉地址。負載均衡
public class LoadBalancerClientFilter implements GlobalFilter, Ordered { public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR); if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) { ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url); log.trace("LoadBalancerClientFilter url before: " + url); ServiceInstance instance = this.choose(exchange); if (instance == null) { String msg = "Unable to find instance for " + url.getHost(); if (this.properties.isUse404()) { throw new LoadBalancerClientFilter.FourOFourNotFoundException(msg); } else { throw new NotFoundException(msg); } } else { URI uri = exchange.getRequest().getURI(); String overrideScheme = instance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } URI requestUrl = this.loadBalancer.reconstructURI(new LoadBalancerClientFilter.DelegatingServiceInstance(instance, overrideScheme), uri); log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl); return chain.filter(exchange); } } else { return chain.filter(exchange); } } protected ServiceInstance choose(ServerWebExchange exchange) { return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost()); } }
最後對實際地址的轉發在NettyRoutingFilter
中。
public class NettyRoutingFilter implements GlobalFilter, Ordered { private final HttpClient httpClient; private final ObjectProvider<List<HttpHeadersFilter>> headersFilters; private final HttpClientProperties properties; public NettyRoutingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFilters, HttpClientProperties properties) { this.httpClient = httpClient; this.headersFilters = headersFilters; this.properties = properties; } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); final String url = requestUrl.toString(); HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING); boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); // 這裏 Flux<HttpClientResponse> responseFlux = this.httpClient .chunkedTransfer(chunkedTransfer) .request(method) .uri(url) .send((req, nettyOutbound) -> { req.headers(httpHeaders); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); req.header(HttpHeaders.HOST, host); } return nettyOutbound .options(NettyPipeline.SendOptions::flushOnEach) .send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer).getNativeBuffer())); }).responseConnection((res, connection) -> { ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } HttpStatus status = HttpStatus.resolve(res.status().code()); if (status != null) { response.setStatusCode(status); } else if (response instanceof AbstractServerHttpResponse) { // https://jira.spring.io/browse/SPR-16748 ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code()); } else { throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass()); } // make sure headers filters run after setting status so it is available in response HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE); if(!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) { //It is not valid to have both the transfer-encoding header and the content-length header //remove the transfer-encoding header in the response if the content-length header is presen response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); return Mono.just(res); }); if (properties.getResponseTimeout() != null) { responseFlux = responseFlux.timeout(properties.getResponseTimeout(), Mono.error(new TimeoutException("Response took longer than timeout: " + properties.getResponseTimeout()))).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, null, th)); } return responseFlux.then(chain.filter(exchange)); } }
整個轉發流程的重點就是選擇服務節點
,在選擇服務節點以前,須要先獲取負載均衡策略器。
// RibbonLoadBalancerClient.java protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } // SpringClientFactory.java public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> { public ILoadBalancer getLoadBalancer(String name) { return (ILoadBalancer)this.getInstance(name, ILoadBalancer.class); } public <C> C getInstance(String name, Class<C> type) { C instance = super.getInstance(name, type); if (instance != null) { return instance; } else { IClientConfig config = (IClientConfig)this.getInstance(name, IClientConfig.class); return instantiateWithConfig(this.getContext(name), type, config); } } static <C> C instantiateWithConfig(AnnotationConfigApplicationContext context, Class<C> clazz, IClientConfig config) { Object result = null; try { Constructor<C> constructor = clazz.getConstructor(IClientConfig.class); result = constructor.newInstance(config); } catch (Throwable var5) { } if (result == null) { result = BeanUtils.instantiate(clazz); if (result instanceof IClientConfigAware) { ((IClientConfigAware)result).initWithNiwsConfig(config); } if (context != null) { context.getAutowireCapableBeanFactory().autowireBean(result); } } return result; } } // NamedContextFactory.java public <T> T getInstance(String name, Class<T> type) { AnnotationConfigApplicationContext context = this.getContext(name); return BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0 ? context.getBean(type) : null; }
經過源碼能夠看到,先根據serviceId
去spring-context
裏獲取負載均衡策略器,若是沒有獲取到,則本身初始化一個,默認負載均衡策略器是ZoneAwareLoadBalancer
。
獲取到負載均衡策略器以後,就要獲取服務列表,並選擇其中的一個節點;選擇服務的核心在BaseLoadBalancer
。
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }
每一個ILoadBalancer
內都有個IRule
對象,ILoadBalancer.chooseServer
最終是調IRule.chooseServer
,默認是ZoneAvoidanceRule
。
在Rule
選擇服務以前,要先獲取全部的服務。
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule { /** * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class. * */ public abstract AbstractServerPredicate getPredicate(); /** * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}. * The performance for this method is O(n) where n is number of servers to be filtered. */ @Override public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } } public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware { protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>()); @Override public List<Server> getAllServers() { return Collections.unmodifiableList(allServerList); } } public abstract class AbstractServerPredicate implements Predicate<PredicateKey> { private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current + 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } } public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); } }
能夠看到在LoadBalancer
內有保存了服務列表,而後IRule
根據本身的規則選擇其中的一個服務節點。至於LoadBalancer
內有保存的服務列表是怎麼獲取的,在BaseLoadBalancer
裏能夠看到設置allServerList
的方法。
public void setServersList(List lsrv) { Lock writeLock = allServerLock.writeLock(); logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name); ArrayList<Server> newServers = new ArrayList<Server>(); writeLock.lock(); try { ArrayList<Server> allServers = new ArrayList<Server>(); for (Object server : lsrv) { if (server == null) { continue; } if (server instanceof String) { server = new Server((String) server); } if (server instanceof Server) { logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId()); allServers.add((Server) server); } else { throw new IllegalArgumentException( "Type String or Server expected, instead found:" + server.getClass()); } } boolean listChanged = false; if (!allServerList.equals(allServers)) { listChanged = true; if (changeListeners != null && changeListeners.size() > 0) { List<Server> oldList = ImmutableList.copyOf(allServerList); List<Server> newList = ImmutableList.copyOf(allServers); for (ServerListChangeListener l: changeListeners) { try { l.serverListChanged(oldList, newList); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e); } } } } if (isEnablePrimingConnections()) { for (Server server : allServers) { if (!allServerList.contains(server)) { server.setReadyToServe(false); newServers.add((Server) server); } } if (primeConnections != null) { primeConnections.primeConnectionsAsync(newServers, this); } } // This will reset readyToServe flag to true on all servers // regardless whether // previous priming connections are success or not allServerList = allServers; if (canSkipPing()) { for (Server s : allServerList) { s.setAlive(true); } upServerList = allServerList; } else if (listChanged) { forceQuickPing(); } } finally { writeLock.unlock(); } }
ZoneAwareLoadBalancer
獲取服務列表的代碼入口是在DynamicServerListLoadBalancer
(BaseLoadBalancer
的子類、同時也是默認負載均衡器ZoneAwareLoadBalancer
父類)裏。在ZoneAwareLoadBalancer
構造方法裏啓動了ServerListUpdater.UpdateAction
定時任務。
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } }; public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); } void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); } @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); } }
其中serverListUpdater
默認是PollingServerListUpdater
,進入代碼能夠看到是啓動定時任務來定時調用updateAction
。
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs; private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs; @Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
定時任務默認是每30秒刷新調用一次,也能夠經過設置參數ribbon.ServerListRefreshInterval
調整刷新頻率。
而updateListOfServers
方法裏的serverListImpl.getUpdatedListOfServers()
是獲取服務列表,這裏的ServerList serverListImpl
默認是DomainExtractingServerList
。
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> { public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list, IClientConfig clientConfig, boolean approximateZoneFromHostname) { this.list = list; this.ribbon = RibbonProperties.from(clientConfig); this.approximateZoneFromHostname = approximateZoneFromHostname; } @Override public List<DiscoveryEnabledServer> getUpdatedListOfServers() { List<DiscoveryEnabledServer> servers = setZones(this.list .getUpdatedListOfServers()); return servers; } }
DomainExtractingServerList
裏面包着一層ServerList: DiscoveryEnabledNIWSServerList
,最終獲取服務列表的方式在DiscoveryEnabledNIWSServerList
裏。
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{ private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; } }
最終是獲取Eureka
裏的節點信息,也就是Ribbon
經過Eureka
獲取服務列表,獲取的服務列表會緩存在本地,每隔一段時間刷新(默認30秒)。須要注意的是每一個服務都有一個獨立的LoadBalancer
,因此allServerList
保存的是是單個服務的全部節點列表。springcloud-gateway
的GatewayControllerEndpoint
提供了對route
和filter
的操做API。
建議流程以下: