本系列示例與膠水代碼地址: https://github.com/HashZhang/spring-cloud-scaffoldjava
負載均衡Ribbon替換成Spring Cloud Load Balancer
Spring Cloud Load Balancer
並非一個獨立的項目,而是spring-cloud-commons
其中的一個模塊。 項目中用了Eureka
以及相關的 starter,想徹底剔除Ribbon
的相關依賴基本是不可能的,Spring 社區的人也是看到了這一點,經過配置去關閉Ribbon
啓用Spring-Cloud-LoadBalancer
。react
spring.cloud.loadbalancer.ribbon.enabled=false
關閉ribbon以後,Spring Cloud LoadBalancer就會加載成爲默認的負載均衡器。git
Spring Cloud LoadBalancer 結構以下所示:github
其中:算法
- 全局只有一個
BlockingLoadBalancerClient
,負責執行全部的負載均衡請求。 BlockingLoadBalancerClient
從LoadBalancerClientFactory
裏面加載對應微服務的負載均衡配置。- 每一個微服務下有獨自的
LoadBalancer
,LoadBalancer
裏面包含負載均衡的算法,例如RoundRobin
.根據算法,從ServiceInstanceListSupplier
返回的實例列表中選擇一個實例返回。
1. 實現zone
隔離
要想實現zone
隔離,應該從ServiceInstanceListSupplier
裏面作手腳。默認的實現裏面有關於zone
隔離的ServiceInstanceListSupplier
-> org.springframework.cloud.loadbalancer.core.ZonePreferenceServiceInstanceListSupplier
:spring
private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) { if (zone == null) { zone = zoneConfig.getZone(); } //若是zone不爲null,而且該zone下有存活實例,則返回這個實例列表 //不然,返回全部的實例 if (zone != null) { List<ServiceInstance> filteredInstances = new ArrayList<>(); for (ServiceInstance serviceInstance : serviceInstances) { String instanceZone = getZone(serviceInstance); if (zone.equalsIgnoreCase(instanceZone)) { filteredInstances.add(serviceInstance); } } if (filteredInstances.size() > 0) { return filteredInstances; } } // If the zone is not set or there are no zone-specific instances available, // we return all instances retrieved for given service id. return serviceInstances; }
這裏對於沒指定zone
或者該zone
下沒有存活實例的狀況下,會返回全部查到的實例,不區分zone
。這個不符合咱們的要求,因此咱們修改並實現下咱們本身的com.github.hashjang.hoxton.service.consumer.config.SameZoneOnlyServiceInstanceListSupplier:緩存
private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) { if (zone == null) { zone = zoneConfig.getZone(); } if (zone != null) { List<ServiceInstance> filteredInstances = new ArrayList<>(); for (ServiceInstance serviceInstance : serviceInstances) { String instanceZone = getZone(serviceInstance); if (zone.equalsIgnoreCase(instanceZone)) { filteredInstances.add(serviceInstance); } } if (filteredInstances.size() > 0) { return filteredInstances; } } //若是沒找到就返回空列表,毫不返回其餘集羣的實例 return List.of(); }
而後咱們來看一下默認的 Spring Cloud LoadBalancer
提供的 LoadBalancer
,它是帶緩存的:負載均衡
org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration
框架
@Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( ReactiveDiscoveryClient discoveryClient, Environment env, ApplicationContext context) { DiscoveryClientServiceInstanceListSupplier delegate = new DiscoveryClientServiceInstanceListSupplier( discoveryClient, env); ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); if (cacheManagerProvider.getIfAvailable() != null) { return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable()); } return delegate; }
DiscoveryClientServiceInstanceListSupplier
每次從Eureka
上面拉取實例列表,CachingServiceInstanceListSupplier
提供了緩存,這樣沒必要每次從Eureka
上面拉取。能夠看出CachingServiceInstanceListSupplier
是一種代理模式的實現,和SameZoneOnlyServiceInstanceListSupplier
的模式是同樣的。dom
咱們來組裝咱們的ServiceInstanceListSupplier
,因爲咱們是同步的環境,只用實現同步的ServiceInstanceListSupplier
就好了。
public class CommonLoadBalancerConfig { /** * 同步環境下的ServiceInstanceListSupplier * SameZoneOnlyServiceInstanceListSupplier限制僅返回同一個zone下的實例(注意) * CachingServiceInstanceListSupplier啓用緩存,不每次訪問eureka請求實例列表 * * @param discoveryClient * @param env * @param zoneConfig * @param context * @return */ @Bean @Order(Integer.MIN_VALUE) public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, LoadBalancerZoneConfig zoneConfig, ApplicationContext context) { ServiceInstanceListSupplier delegate = new SameZoneOnlyServiceInstanceListSupplier( new DiscoveryClientServiceInstanceListSupplier(discoveryClient, env), zoneConfig ); ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); if (cacheManagerProvider.getIfAvailable() != null) { return new CachingServiceInstanceListSupplier( delegate, cacheManagerProvider.getIfAvailable() ); } return delegate; } }
2. 實現下一次重試的時候,若是存在其餘實例,則必定會重試與本次不一樣的其餘實例
默認的RoundRobinLoadBalancer
,其中的輪詢position
,是一個Atomic
類型的,在某個微服務的調用請求下,全部線程,全部請求共用(調用其餘的每一個微服務會建立一個RoundRobinLoadBalancer
)。在使用的時候,會有這樣的一個問題:
- 假設某個微服務有兩個實例,實例 A 和實例 B
- 某次請求 X 發往實例 A,position = position + 1
- 在請求沒有返回時,請求 Y 到達,發往實例 B,position = position + 1
- 請求 A 失敗,重試,重試的實例仍是實例 A
這樣在重試的狀況下,某個請求的重試可能會發送到上一次的實例進行重試,這不是咱們想要的。針對這個,我提了個Issue:Enhance RoundRoubinLoadBalancer position。我修改的思路是,咱們須要一個單次請求隔離的position
,這個position
對於實例個數取餘得出請求要發往的實例。那麼如何進行請求隔離呢?
首先想到的是線程隔離,可是這個是不行的。Spring Cloud LoadBalancer 底層運用了 reactor 框架,致使實際承載選擇實例的線程,不是業務線程,而是 reactor 裏面的線程池,如圖所示: 因此,不能用
ThreadLocal
的方式實現position
。
因爲咱們用到了sleuth
,通常請求的context
會傳遞其中的traceId
,咱們根據這個traceId
區分不一樣的請求,實現咱們的 LoadBalancer
:
RoundRobinBaseOnTraceIdLoadBalancer
//這個超時時間,須要設置的比你的請求的 connectTimeout + readTimeout 長 private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.SECONDS).build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000))); private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } //若是沒有 traceId,就生成一個新的,可是最好檢查下爲啥會沒有 //是否是 MQ 消費這種沒有主動生成 traceId 的狀況,最好主動生成下。 Span currentSpan = tracer.currentSpan(); if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); int seed = positionCache.get(l).getAndIncrement(); return new DefaultResponse(serviceInstances.get(seed % serviceInstances.size())); }
3. 替換默認的負載均衡相關 Bean 實現
咱們要用上面的兩個類替換默認的實現,先編寫一個配置類:
public class CommonLoadBalancerConfig { private volatile boolean isValid = false; /** * 同步環境下的ServiceInstanceListSupplier * SameZoneOnlyServiceInstanceListSupplier限制僅返回同一個zone下的實例(注意) * CachingServiceInstanceListSupplier啓用緩存,不每次訪問eureka請求實例列表 * * @param discoveryClient * @param env * @param zoneConfig * @param context * @return */ @Bean @Order(Integer.MIN_VALUE) public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, LoadBalancerZoneConfig zoneConfig, ApplicationContext context) { isValid = true; ServiceInstanceListSupplier delegate = new SameZoneOnlyServiceInstanceListSupplier( new DiscoveryClientServiceInstanceListSupplier(discoveryClient, env), zoneConfig ); ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); if (cacheManagerProvider.getIfAvailable() != null) { return new CachingServiceInstanceListSupplier( delegate, cacheManagerProvider.getIfAvailable() ); } return delegate; } @Bean public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer( Environment environment, ServiceInstanceListSupplier serviceInstanceListSupplier, Tracer tracer) { if (!isValid) { throw new IllegalStateException("should use the ServiceInstanceListSupplier in this configuration, please check config"); } String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinBaseOnTraceIdLoadBalancer( name, serviceInstanceListSupplier, tracer ); } }
而後,指定默認的負載均衡配置採起這個配置, 經過註解:
@LoadBalancerClients(defaultConfiguration = {CommonLoadBalancerConfig.class})