Spring Cloud升級之路 - Hoxton - 3. 負載均衡從ribbon替換成spring-cloud-loadbalancer

本系列示例與膠水代碼地址: https://github.com/HashZhang/spring-cloud-scaffoldjava

負載均衡Ribbon替換成Spring Cloud Load Balancer

Spring Cloud Load Balancer並非一個獨立的項目,而是spring-cloud-commons其中的一個模塊。 項目中用了Eureka以及相關的 starter,想徹底剔除Ribbon的相關依賴基本是不可能的,Spring 社區的人也是看到了這一點,經過配置去關閉Ribbon啓用Spring-Cloud-LoadBalancerreact

spring.cloud.loadbalancer.ribbon.enabled=false

關閉ribbon以後,Spring Cloud LoadBalancer就會加載成爲默認的負載均衡器。git

Spring Cloud LoadBalancer 結構以下所示:github

image

其中:算法

  1. 全局只有一個 BlockingLoadBalancerClient,負責執行全部的負載均衡請求。
  2. BlockingLoadBalancerClientLoadBalancerClientFactory裏面加載對應微服務的負載均衡配置。
  3. 每一個微服務下有獨自的LoadBalancerLoadBalancer裏面包含負載均衡的算法,例如RoundRobin.根據算法,從ServiceInstanceListSupplier返回的實例列表中選擇一個實例返回。

1. 實現zone隔離

要想實現zone隔離,應該從ServiceInstanceListSupplier裏面作手腳。默認的實現裏面有關於zone隔離的ServiceInstanceListSupplier -> org.springframework.cloud.loadbalancer.core.ZonePreferenceServiceInstanceListSupplier:spring

private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
	if (zone == null) {
		zone = zoneConfig.getZone();
	}
	//若是zone不爲null,而且該zone下有存活實例,則返回這個實例列表
	//不然,返回全部的實例
	if (zone != null) {
		List<ServiceInstance> filteredInstances = new ArrayList<>();
		for (ServiceInstance serviceInstance : serviceInstances) {
			String instanceZone = getZone(serviceInstance);
			if (zone.equalsIgnoreCase(instanceZone)) {
				filteredInstances.add(serviceInstance);
			}
		}
		if (filteredInstances.size() > 0) {
			return filteredInstances;
		}
	}
	// If the zone is not set or there are no zone-specific instances available,
	// we return all instances retrieved for given service id.
	return serviceInstances;
}

這裏對於沒指定zone或者該zone下沒有存活實例的狀況下,會返回全部查到的實例,不區分zone。這個不符合咱們的要求,因此咱們修改並實現下咱們本身的com.github.hashjang.hoxton.service.consumer.config.SameZoneOnlyServiceInstanceListSupplier:緩存

private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
    if (zone == null) {
        zone = zoneConfig.getZone();
    }
    if (zone != null) {
        List<ServiceInstance> filteredInstances = new ArrayList<>();
        for (ServiceInstance serviceInstance : serviceInstances) {
            String instanceZone = getZone(serviceInstance);
            if (zone.equalsIgnoreCase(instanceZone)) {
                filteredInstances.add(serviceInstance);
            }
        }
        if (filteredInstances.size() > 0) {
            return filteredInstances;
        }
    }
    //若是沒找到就返回空列表,毫不返回其餘集羣的實例
    return List.of();
}

而後咱們來看一下默認的 Spring Cloud LoadBalancer 提供的 LoadBalancer ,它是帶緩存的:負載均衡

org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration框架

@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
		ReactiveDiscoveryClient discoveryClient, Environment env,
		ApplicationContext context) {
	DiscoveryClientServiceInstanceListSupplier delegate = new DiscoveryClientServiceInstanceListSupplier(
			discoveryClient, env);
	ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
			.getBeanProvider(LoadBalancerCacheManager.class);
	if (cacheManagerProvider.getIfAvailable() != null) {
		return new CachingServiceInstanceListSupplier(delegate,
				cacheManagerProvider.getIfAvailable());
	}
	return delegate;
}

DiscoveryClientServiceInstanceListSupplier每次從Eureka上面拉取實例列表,CachingServiceInstanceListSupplier提供了緩存,這樣沒必要每次從Eureka上面拉取。能夠看出CachingServiceInstanceListSupplier是一種代理模式的實現,和SameZoneOnlyServiceInstanceListSupplier的模式是同樣的。dom

咱們來組裝咱們的ServiceInstanceListSupplier,因爲咱們是同步的環境,只用實現同步的ServiceInstanceListSupplier就好了。

public class CommonLoadBalancerConfig {

    /**
     * 同步環境下的ServiceInstanceListSupplier
     * SameZoneOnlyServiceInstanceListSupplier限制僅返回同一個zone下的實例(注意)
     * CachingServiceInstanceListSupplier啓用緩存,不每次訪問eureka請求實例列表
     *
     * @param discoveryClient
     * @param env
     * @param zoneConfig
     * @param context
     * @return
     */
    @Bean
    @Order(Integer.MIN_VALUE)
    public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
            DiscoveryClient discoveryClient, Environment env,
            LoadBalancerZoneConfig zoneConfig,
            ApplicationContext context) {
        ServiceInstanceListSupplier delegate = new SameZoneOnlyServiceInstanceListSupplier(
                new DiscoveryClientServiceInstanceListSupplier(discoveryClient, env),
                zoneConfig
        );
        ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
                .getBeanProvider(LoadBalancerCacheManager.class);
        if (cacheManagerProvider.getIfAvailable() != null) {
            return new CachingServiceInstanceListSupplier(
                    delegate,
                    cacheManagerProvider.getIfAvailable()
            );
        }
        return delegate;
    }
}

2. 實現下一次重試的時候,若是存在其餘實例,則必定會重試與本次不一樣的其餘實例

默認的RoundRobinLoadBalancer,其中的輪詢position,是一個Atomic類型的,在某個微服務的調用請求下,全部線程,全部請求共用(調用其餘的每一個微服務會建立一個RoundRobinLoadBalancer)。在使用的時候,會有這樣的一個問題:

  • 假設某個微服務有兩個實例,實例 A 和實例 B
  • 某次請求 X 發往實例 A,position = position + 1
  • 在請求沒有返回時,請求 Y 到達,發往實例 B,position = position + 1
  • 請求 A 失敗,重試,重試的實例仍是實例 A

這樣在重試的狀況下,某個請求的重試可能會發送到上一次的實例進行重試,這不是咱們想要的。針對這個,我提了個Issue:Enhance RoundRoubinLoadBalancer position。我修改的思路是,咱們須要一個單次請求隔離的position,這個position對於實例個數取餘得出請求要發往的實例。那麼如何進行請求隔離呢?

首先想到的是線程隔離,可是這個是不行的。Spring Cloud LoadBalancer 底層運用了 reactor 框架,致使實際承載選擇實例的線程,不是業務線程,而是 reactor 裏面的線程池,如圖所示: image 因此,不能用ThreadLocal的方式實現position

因爲咱們用到了sleuth,通常請求的context會傳遞其中的traceId,咱們根據這個traceId區分不一樣的請求,實現咱們的 LoadBalancer

RoundRobinBaseOnTraceIdLoadBalancer

//這個超時時間,須要設置的比你的請求的 connectTimeout + readTimeout 長
private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.SECONDS).build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));

private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    //若是沒有 traceId,就生成一個新的,可是最好檢查下爲啥會沒有
    //是否是 MQ 消費這種沒有主動生成 traceId 的狀況,最好主動生成下。
    Span currentSpan = tracer.currentSpan();
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    int seed = positionCache.get(l).getAndIncrement();
    return new DefaultResponse(serviceInstances.get(seed % serviceInstances.size()));
}

3. 替換默認的負載均衡相關 Bean 實現

咱們要用上面的兩個類替換默認的實現,先編寫一個配置類:

public class CommonLoadBalancerConfig {

    private volatile boolean isValid = false;

    /**
     * 同步環境下的ServiceInstanceListSupplier
     * SameZoneOnlyServiceInstanceListSupplier限制僅返回同一個zone下的實例(注意)
     * CachingServiceInstanceListSupplier啓用緩存,不每次訪問eureka請求實例列表
     *
     * @param discoveryClient
     * @param env
     * @param zoneConfig
     * @param context
     * @return
     */
    @Bean
    @Order(Integer.MIN_VALUE)
    public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
            DiscoveryClient discoveryClient, Environment env,
            LoadBalancerZoneConfig zoneConfig,
            ApplicationContext context) {
        isValid = true;
        ServiceInstanceListSupplier delegate = new SameZoneOnlyServiceInstanceListSupplier(
                new DiscoveryClientServiceInstanceListSupplier(discoveryClient, env),
                zoneConfig
        );
        ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
                .getBeanProvider(LoadBalancerCacheManager.class);
        if (cacheManagerProvider.getIfAvailable() != null) {
            return new CachingServiceInstanceListSupplier(
                    delegate,
                    cacheManagerProvider.getIfAvailable()
            );
        }
        return delegate;
    }

    @Bean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
            Environment environment,
            ServiceInstanceListSupplier serviceInstanceListSupplier,
            Tracer tracer) {
        if (!isValid) {
            throw new IllegalStateException("should use the ServiceInstanceListSupplier in this configuration, please check config");
        }
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinBaseOnTraceIdLoadBalancer(
                name,
                serviceInstanceListSupplier,
                tracer
        );
    }
}

而後,指定默認的負載均衡配置採起這個配置, 經過註解:

@LoadBalancerClients(defaultConfiguration = {CommonLoadBalancerConfig.class})
相關文章
相關標籤/搜索