springcloud項目優雅重啓(一):問題和gateway-ribbon流程

問題

當前項目用的是springcloud-gateway + eureka + springboot架構,請求會先通過網關,網關根據註冊中心獲取業務項目服務器地址,再轉發到業務服務接口上;這種架構在項目重啓時,存在幾個問題 :java

  1. 業務項目實例shutdown時,會中止當前未完成的REQUEST請求。
  2. 某個業務項目實例已經中止了,可是網關仍會轉發請求過去,致使請求失敗。
  3. 某個業務項目實例已經從新啓動了,可是網關並不會立刻向這個實例轉發請求;假如項目只有兩個實例,若是在第一個節點剛啓動完就馬上重啓另一個實例,就會致使服務不可用。

要解決以上問題,咱們須要先了解gatewayeurekaribbonTomcat的原理,明白爲何會出現以上問題。web

主流程

先從gateway入口處開始瞭解,如下是springcloud-gateway官網的一張圖:
18b53f1c-c46e-4e8e-869a-b51fb08c4204.png
有個關鍵類RoutePredicateHandlerMapping,繼承了AbstractHandlerMapping,是webflux的handlermapping,做用至關於webmvc的handlermapping:將請求映射到對應的handler來處理。RoutePredicateHandlerMapping會遍歷全部路由Route,獲取符合規則的路由,並將獲取到的route放入當前請求上下文的屬性中。spring

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {


    @Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        // don't handle requests on the management port if set
        if (managmentPort != null && exchange.getRequest().getURI().getPort() == managmentPort.intValue()) {
            return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }

    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator
                .getRoutes()
                //individually filter routes so that filterWhen error delaying is not a problem
                .concatMap(route -> Mono
                        .just(route)
                        .filterWhen(r -> {
                            // add the current route we are testing
                            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                            return r.getPredicate().apply(exchange);
                        })
                        //instead of immediately stopping main flux due to error, log and swallow it
                        .doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))
                        .onErrorResume(e -> Mono.empty())
                )
                // .defaultIfEmpty() put a static Route not found
                // or .switchIfEmpty()
                // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
                .next()
                //TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });

        /* TODO: trace logging
            if (logger.isTraceEnabled()) {
                logger.trace("RouteDefinition did not match: " + routeDefinition.getId());
            }*/
    }

}

routeLocator.getRoutes()看到是從routeLocator裏獲取路由列表,咱們看下路由規則是怎麼生成的。routeLocator有個實現類是RouteDefinitionRouteLocator緩存

public class RouteDefinitionRouteLocator implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {

    @Override
    public Flux<Route> getRoutes() {
        return this.routeDefinitionLocator.getRouteDefinitions()
                .map(this::convertToRoute)
                //TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("RouteDefinition matched: " + route.getId());
                    }
                    return route;
                });
        /* TODO: trace logging
            if (logger.isTraceEnabled()) {
                logger.trace("RouteDefinition did not match: " + routeDefinition.getId());
            }*/
    }

}

RouteDefinitionRouteLocator是從RouteDefinitionLocator裏獲取路由列表。在項目配置的是基於服務發現的路由spring.cloud.gateway.discovery.locator.enabled: true狀況下,RouteDefinitionLocator實現類默認是從DiscoveryClientRouteDefinitionLocator獲取路由列表。springboot

public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {

        SpelExpressionParser parser = new SpelExpressionParser();
        Expression includeExpr = parser.parseExpression(properties.getIncludeExpression());
        Expression urlExpr = parser.parseExpression(properties.getUrlExpression());

        Predicate<ServiceInstance> includePredicate;
        if (properties.getIncludeExpression() == null || "true".equalsIgnoreCase(properties.getIncludeExpression())) {
            includePredicate = instance -> true;
        } else {
            includePredicate = instance -> {
                Boolean include = includeExpr.getValue(evalCtxt, instance, Boolean.class);
                if (include == null) {
                    return false;
                }
                return include;
            };
        }

        return Flux.fromIterable(discoveryClient.getServices())
                .map(discoveryClient::getInstances)
                .filter(instances -> !instances.isEmpty())
                .map(instances -> instances.get(0))
                .filter(includePredicate)
                .map(instance -> {
                    String serviceId = instance.getServiceId();
                    RouteDefinition routeDefinition = new RouteDefinition();
                    routeDefinition.setId(this.routeIdPrefix + serviceId);
                    String uri = urlExpr.getValue(evalCtxt, instance, String.class);
                    routeDefinition.setUri(URI.create(uri));
                    final ServiceInstance instanceForEval = new DelegatingServiceInstance(instance, properties);
                    for (PredicateDefinition original : this.properties.getPredicates()) {
                        PredicateDefinition predicate = new PredicateDefinition();
                        predicate.setName(original.getName());
                        for (Map.Entry<String, String> entry : original.getArgs().entrySet()) {
                            String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry);
                            predicate.addArg(entry.getKey(), value);
                        }
                        routeDefinition.getPredicates().add(predicate);
                    }

                    for (FilterDefinition original : this.properties.getFilters()) {
                        FilterDefinition filter = new FilterDefinition();
                        filter.setName(original.getName());
                        for (Map.Entry<String, String> entry : original.getArgs().entrySet()) {
                            String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry);
                            filter.addArg(entry.getKey(), value);
                        }
                        routeDefinition.getFilters().add(filter);
                    }
                    return routeDefinition;
                });
    }
}

DiscoveryClientRouteDefinitionLocatorEureka中獲取服務列表(discoveryClient.getServices),爲了性能考慮,springcloud-gateway又在RouteDefinitionRouteLocator上套上了緩存CachingRouteLocator服務器

public class CachingRouteLocator implements RouteLocator, ApplicationListener<RefreshRoutesEvent> {

   private final RouteLocator delegate;
   private final Flux<Route> routes;
   private final Map<String, List> cache = new HashMap<>();

   public CachingRouteLocator(RouteLocator delegate) {
      this.delegate = delegate;
      routes = CacheFlux.lookup(cache, "routes", Route.class)
            .onCacheMissResume(() -> this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE));
   }

   @Override
   public Flux<Route> getRoutes() {
      return this.routes;
   }

   /**
    * Clears the routes cache
    * @return routes flux
    */
   public Flux<Route> refresh() {
      this.cache.clear();
      return this.routes;
   }

   @Override
   public void onApplicationEvent(RefreshRoutesEvent event) {
      refresh();
   }

   @Deprecated
   /* for testing */ void handleRefresh() {
      refresh();
   }
}

CachingRouteLocator監聽到RefreshRoutesEvent事件時刷新緩存(GatewayWebfluxEndpoint有一個HTTP API調用了ApplicationEventPublisher ,發佈RefreshRoutesEvent事件),而RouteRefreshListener監聽到服務註冊InstanceRegisteredEvent事件時,會發送RefreshRoutesEvent事件,也就是當有新服務註冊時,會刷新緩存。架構

public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {

   private HeartbeatMonitor monitor = new HeartbeatMonitor();
   private final ApplicationEventPublisher publisher;

   public RouteRefreshListener(ApplicationEventPublisher publisher) {
      Assert.notNull(publisher, "publisher may not be null");
      this.publisher = publisher;
   }

   @Override
   public void onApplicationEvent(ApplicationEvent event) {
      if (event instanceof ContextRefreshedEvent
            || event instanceof RefreshScopeRefreshedEvent
            || event instanceof InstanceRegisteredEvent) {
         reset();
      }
      else if (event instanceof ParentHeartbeatEvent) {
         ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;
         resetIfNeeded(e.getValue());
      }
      else if (event instanceof HeartbeatEvent) {
         HeartbeatEvent e = (HeartbeatEvent) event;
         resetIfNeeded(e.getValue());
      }
   }

   private void resetIfNeeded(Object value) {
      if (this.monitor.update(value)) {
         reset();
      }
   }

   private void reset() {
      this.publisher.publishEvent(new RefreshRoutesEvent(this));
   }

}

接下來進入到FilteringWebHandlerFilteringWebHandler獲取route的過濾器列表並轉爲過濾鏈,開始執行過濾器鏈。mvc

public class FilteringWebHandler implements WebHandler {

    @Override

    public Mono<Void> handle(ServerWebExchange exchange) {
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        List<GatewayFilter> gatewayFilters = route.getFilters();

        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        combined.addAll(gatewayFilters);
        //TODO: needed or cached?
        AnnotationAwareOrderComparator.sort(combined);
        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: "+ combined);
        }
        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }
}

接下來進入到RouteToRequestUrlFilter,構造完整的負載均衡地址,例如route配置的中轉服務是lb://MY-SERVICE,請求的路徑是/hello/world,則構建後的地址是lb://MY-SERVICE/hello/world,將構建後的地址放入當前請求上下文中,繼續下一個filterapp

public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
        if (route == null) {
            return chain.filter(exchange);
        }
        log.trace("RouteToRequestUrlFilter start");
        URI uri = exchange.getRequest().getURI();
        boolean encoded = containsEncodedParts(uri);
        URI routeUri = route.getUri();

        if (hasAnotherScheme(routeUri)) {
            // this is a special url, save scheme to special attribute
            // replace routeUri with schemeSpecificPart
            exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
            routeUri = URI.create(routeUri.getSchemeSpecificPart());
        }

        if("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
            //Load balanced URIs should always have a host. If the host is null it is most
            //likely because the host name was invalid (for example included an underscore)
            throw new IllegalStateException("Invalid host: " + routeUri.toString());
        }

        URI mergedUrl = UriComponentsBuilder.fromUri(uri)
                // .uri(routeUri)
                .scheme(routeUri.getScheme())
                .host(routeUri.getHost())
                .port(routeUri.getPort())
                .build(encoded)
                .toUri();

        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
        return chain.filter(exchange);
    }

}

接下來就是LoadBalancerClientFilter了,進入LoadBalancerClientFilter能夠看到,首先獲取scheme,若是不是lb,則直接往下一個filter傳遞;若是是lb,則選擇服務節點構建成最終的中轉地址。負載均衡

public class LoadBalancerClientFilter implements GlobalFilter, Ordered {
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
        if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
            log.trace("LoadBalancerClientFilter url before: " + url);
            ServiceInstance instance = this.choose(exchange);
            if (instance == null) {
                String msg = "Unable to find instance for " + url.getHost();
                if (this.properties.isUse404()) {
                    throw new LoadBalancerClientFilter.FourOFourNotFoundException(msg);
                } else {
                    throw new NotFoundException(msg);
                }
            } else {
                URI uri = exchange.getRequest().getURI();
                String overrideScheme = instance.isSecure() ? "https" : "http";
                if (schemePrefix != null) {
                    overrideScheme = url.getScheme();
                }
    
                URI requestUrl = this.loadBalancer.reconstructURI(new LoadBalancerClientFilter.DelegatingServiceInstance(instance, overrideScheme), uri);
                log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                return chain.filter(exchange);
            }
        } else {
            return chain.filter(exchange);
        }
    }
    
    protected ServiceInstance choose(ServerWebExchange exchange) {
        return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost());
    }
}

最後對實際地址的轉發在NettyRoutingFilter中。

public class NettyRoutingFilter implements GlobalFilter, Ordered {

    private final HttpClient httpClient;
    private final ObjectProvider<List<HttpHeadersFilter>> headersFilters;
    private final HttpClientProperties properties;

    public NettyRoutingFilter(HttpClient httpClient,
                              ObjectProvider<List<HttpHeadersFilter>> headersFilters,
                              HttpClientProperties properties) {
        this.httpClient = httpClient;
        this.headersFilters = headersFilters;
        this.properties = properties;
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

        String scheme = requestUrl.getScheme();
        if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
            return chain.filter(exchange);
        }
        setAlreadyRouted(exchange);

        ServerHttpRequest request = exchange.getRequest();

        final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
        final String url = requestUrl.toString();

        HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(),
                exchange);

        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        filtered.forEach(httpHeaders::set);

        String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
        boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);

        boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
        // 這裏
        Flux<HttpClientResponse> responseFlux = this.httpClient
                .chunkedTransfer(chunkedTransfer)
                .request(method)
                .uri(url)
                .send((req, nettyOutbound) -> {
                    req.headers(httpHeaders);

                    if (preserveHost) {
                        String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                        req.header(HttpHeaders.HOST, host);
                    }
                    return nettyOutbound
                            .options(NettyPipeline.SendOptions::flushOnEach)
                            .send(request.getBody().map(dataBuffer ->
                                    ((NettyDataBuffer) dataBuffer).getNativeBuffer()));
                }).responseConnection((res, connection) -> {
                    ServerHttpResponse response = exchange.getResponse();
                    // put headers and status so filters can modify the response
                    HttpHeaders headers = new HttpHeaders();

                    res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

                    String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
                    if (StringUtils.hasLength(contentTypeValue)) {
                        exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
                    }

                    HttpStatus status = HttpStatus.resolve(res.status().code());
                    if (status != null) {
                        response.setStatusCode(status);
                    } else if (response instanceof AbstractServerHttpResponse) {
                        // https://jira.spring.io/browse/SPR-16748
                        ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code());
                    } else {
                        throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass());
                    }

                    // make sure headers filters run after setting status so it is available in response
                    HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
                            this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE);

                    if(!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) &&
                            filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                        //It is not valid to have both the transfer-encoding header and the content-length header
                        //remove the transfer-encoding header in the response if the content-length header is presen
                        response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
                    }

                    exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());

                    response.getHeaders().putAll(filteredResponseHeaders);

                    // Defer committing the response until all route filters have run
                    // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
                    exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
                    exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

                    return Mono.just(res);
                });

        if (properties.getResponseTimeout() != null) {
            responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
                    Mono.error(new TimeoutException("Response took longer than timeout: " +
                            properties.getResponseTimeout()))).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, null, th));
        }

        return responseFlux.then(chain.filter(exchange));
    }

}

負載均衡

整個轉發流程的重點就是選擇服務節點,在選擇服務節點以前,須要先獲取負載均衡策略器。

// RibbonLoadBalancerClient.java
protected ILoadBalancer getLoadBalancer(String serviceId) {
   return this.clientFactory.getLoadBalancer(serviceId);
}
// SpringClientFactory.java
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {
    public ILoadBalancer getLoadBalancer(String name) {
        return (ILoadBalancer)this.getInstance(name, ILoadBalancer.class);
    }
    
    public <C> C getInstance(String name, Class<C> type) {
        C instance = super.getInstance(name, type);
        if (instance != null) {
            return instance;
        } else {
            IClientConfig config = (IClientConfig)this.getInstance(name, IClientConfig.class);
            return instantiateWithConfig(this.getContext(name), type, config);
        }
    }
    
    static <C> C instantiateWithConfig(AnnotationConfigApplicationContext context, Class<C> clazz, IClientConfig config) {
        Object result = null;

        try {
            Constructor<C> constructor = clazz.getConstructor(IClientConfig.class);
            result = constructor.newInstance(config);
        } catch (Throwable var5) {
        }

        if (result == null) {
            result = BeanUtils.instantiate(clazz);
            if (result instanceof IClientConfigAware) {
                ((IClientConfigAware)result).initWithNiwsConfig(config);
            }

            if (context != null) {
                context.getAutowireCapableBeanFactory().autowireBean(result);
            }
        }

        return result;
    }
}
// NamedContextFactory.java
public <T> T getInstance(String name, Class<T> type) {
        AnnotationConfigApplicationContext context = this.getContext(name);
    return BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0 ? context.getBean(type) : null;
}

經過源碼能夠看到,先根據serviceIdspring-context裏獲取負載均衡策略器,若是沒有獲取到,則本身初始化一個,默認負載均衡策略器是ZoneAwareLoadBalancer
獲取到負載均衡策略器以後,就要獲取服務列表,並選擇其中的一個節點;選擇服務的核心在BaseLoadBalancer

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

每一個ILoadBalancer內都有個IRule對象,ILoadBalancer.chooseServer最終是調IRule.chooseServer,默認是ZoneAvoidanceRule
Rule選擇服務以前,要先獲取全部的服務。

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
   
    /**
     * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
     * 
     */
    public abstract AbstractServerPredicate getPredicate();
        
    /**
     * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
     * The performance for this method is O(n) where n is number of servers to be filtered.
     */
    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
}

public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware {
        
    protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    
    @Override
    public List<Server> getAllServers() {
        return Collections.unmodifiableList(allServerList);
    }
}

public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {

    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextIndex.get();
            int next = (current + 1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
}

能夠看到在LoadBalancer內有保存了服務列表,而後IRule根據本身的規則選擇其中的一個服務節點。至於LoadBalancer內有保存的服務列表是怎麼獲取的,在BaseLoadBalancer裏能夠看到設置allServerList的方法。

public void setServersList(List lsrv) {
    Lock writeLock = allServerLock.writeLock();
    logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
    
    ArrayList<Server> newServers = new ArrayList<Server>();
    writeLock.lock();
    try {
        ArrayList<Server> allServers = new ArrayList<Server>();
        for (Object server : lsrv) {
            if (server == null) {
                continue;
            }

            if (server instanceof String) {
                server = new Server((String) server);
            }

            if (server instanceof Server) {
                logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
                allServers.add((Server) server);
            } else {
                throw new IllegalArgumentException(
                        "Type String or Server expected, instead found:"
                                + server.getClass());
            }

        }
        boolean listChanged = false;
        if (!allServerList.equals(allServers)) {
            listChanged = true;
            if (changeListeners != null && changeListeners.size() > 0) {
               List<Server> oldList = ImmutableList.copyOf(allServerList);
               List<Server> newList = ImmutableList.copyOf(allServers);                   
               for (ServerListChangeListener l: changeListeners) {
                   try {
                       l.serverListChanged(oldList, newList);
                   } catch (Exception e) {
                       logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
                   }
               }
            }
        }
        if (isEnablePrimingConnections()) {
            for (Server server : allServers) {
                if (!allServerList.contains(server)) {
                    server.setReadyToServe(false);
                    newServers.add((Server) server);
                }
            }
            if (primeConnections != null) {
                primeConnections.primeConnectionsAsync(newServers, this);
            }
        }
        // This will reset readyToServe flag to true on all servers
        // regardless whether
        // previous priming connections are success or not
        allServerList = allServers;
        if (canSkipPing()) {
            for (Server s : allServerList) {
                s.setAlive(true);
            }
            upServerList = allServerList;
        } else if (listChanged) {
            forceQuickPing();
        }
    } finally {
        writeLock.unlock();
    }
}

ZoneAwareLoadBalancer獲取服務列表的代碼入口是在DynamicServerListLoadBalancerBaseLoadBalancer的子類、同時也是默認負載均衡器ZoneAwareLoadBalancer父類)裏。在
ZoneAwareLoadBalancer構造方法裏啓動了ServerListUpdater.UpdateAction定時任務。

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
        
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
    
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }
    
    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
    
    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }
}

其中serverListUpdater默認是PollingServerListUpdater,進入代碼能夠看到是啓動定時任務來定時調用updateAction

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
@Override
public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

定時任務默認是每30秒刷新調用一次,也能夠經過設置參數ribbon.ServerListRefreshInterval調整刷新頻率。
updateListOfServers方法裏的serverListImpl.getUpdatedListOfServers()是獲取服務列表,這裏的ServerList serverListImpl默認是DomainExtractingServerList

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }
    
    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }
}

DomainExtractingServerList裏面包着一層ServerList: DiscoveryEnabledNIWSServerList,最終獲取服務列表的方式在DiscoveryEnabledNIWSServerList裏。

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{
    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }
}

最終是獲取Eureka裏的節點信息,也就是Ribbon經過Eureka獲取服務列表,獲取的服務列表會緩存在本地,每隔一段時間刷新(默認30秒)。須要注意的是每一個服務都有一個獨立的LoadBalancer,因此allServerList保存的是是單個服務的全部節點列表。
springcloud-gatewayGatewayControllerEndpoint提供了對routefilter的操做API。

流程圖

建議流程以下:
圖像 (1).png

相關文章
相關標籤/搜索