本項目代碼地址: https://github.com/HashZhang/...
咱們使用 Spring Cloud 官方推薦的 Spring Cloud LoadBalancer 做爲咱們的客戶端負載均衡器。上一節咱們瞭解了 Spring Cloud LoadBalancer 的結構,接下來咱們來講一下咱們在使用 Spring Cloud LoadBalancer 要實現的功能:java
metamap
中的zone
配置,來區分不一樣集羣的實例。只有實例的metamap
中的zone
配置同樣的實例才能互相調用。這個經過實現自定義的 ServiceInstanceListSupplier
便可實現RoundRobinLoadBalancer
是全部線程共用同一個原子變量 position
每次請求原子加 1。在這種狀況下會有問題:假設有微服務 A 有兩個實例:實例 1 和實例 2。請求 A 到達時,RoundRobinLoadBalancer
返回實例 1,這時有請求 B 到達,RoundRobinLoadBalancer
返回實例 2。而後若是請求 A 失敗重試,RoundRobinLoadBalancer
又返回了實例 1。這不是咱們指望看到的。針對這兩個功能,咱們分別編寫本身的實現。react
Spring Cloud LoadBalancer 定義了 LoadBalancerZoneConfig
:git
public class LoadBalancerZoneConfig { //標識當前負載均衡器處於哪個 zone private String zone; public LoadBalancerZoneConfig(String zone) { this.zone = zone; } public String getZone() { return zone; } public void setZone(String zone) { this.zone = zone; } }
若是沒有引入 Eureka 相關依賴,則這個 zone 經過 spring.cloud.loadbalancer.zone
配置:LoadBalancerAutoConfiguration
github
@Bean @ConditionalOnMissingBean public LoadBalancerZoneConfig zoneConfig(Environment environment) { return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone")); }
若是引入了 Eureka 相關依賴,則若是在 Eureka 元數據配置了 zone,則這個 zone 會覆蓋 Spring Cloud LoadBalancer 中的 LoadBalancerZoneConfig
:算法
EurekaLoadBalancerClientConfiguration
spring
@PostConstruct public void postprocess() { if (!StringUtils.isEmpty(zoneConfig.getZone())) { return; } String zone = getZoneFromEureka(); if (!StringUtils.isEmpty(zone)) { if (LOG.isDebugEnabled()) { LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone); } //設置 `LoadBalancerZoneConfig` zoneConfig.setZone(zone); } } private String getZoneFromEureka() { String zone; //是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 爲 true boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname(); //若是配置了,則嘗試從 Eureka 配置的 host 名稱中提取 //實際就是以 . 分割 host,而後第二個就是 zone //例如 www.zone1.com 就是 zone1 if (approximateZoneFromHostname && eurekaConfig != null) { return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false)); } else { //不然,從 metadata map 中取 zone 這個 key zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone"); //若是這個 key 不存在,則從配置中以 region 從 zone 列表取第一個 zone 做爲當前 zone if (StringUtils.isEmpty(zone) && clientConfig != null) { String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); // Pick the first one from the regions we want to connect to zone = zones != null && zones.length > 0 ? zones[0] : null; } return zone; } }
爲了實現經過 zone 來過濾同一 zone 下的實例,而且絕對不會返回非同一 zone 下的實例,咱們來編寫代碼:編程
SameZoneOnlyServiceInstanceListSupplier
緩存
/** * 只返回與當前實例同一個 Zone 的服務實例,不一樣 zone 之間的服務不互相調用 */ public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier { /** * 實例元數據 map 中表示 zone 配置的 key */ private final String ZONE = "zone"; /** * 當前 spring cloud loadbalancer 的 zone 配置 */ private final LoadBalancerZoneConfig zoneConfig; private String zone; public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) { super(delegate); this.zoneConfig = zoneConfig; } @Override public Flux<List<ServiceInstance>> get() { return getDelegate().get().map(this::filteredByZone); } //經過 zoneConfig 過濾 private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) { if (zone == null) { zone = zoneConfig.getZone(); } if (zone != null) { List<ServiceInstance> filteredInstances = new ArrayList<>(); for (ServiceInstance serviceInstance : serviceInstances) { String instanceZone = getZone(serviceInstance); if (zone.equalsIgnoreCase(instanceZone)) { filteredInstances.add(serviceInstance); } } if (filteredInstances.size() > 0) { return filteredInstances; } } /** * @see ZonePreferenceServiceInstanceListSupplier 在沒有相同zone實例的時候返回的是全部實例 * 咱們這裏爲了實現不一樣 zone 之間不互相調用須要返回空列表 */ return List.of(); } //讀取實例的 zone,沒有配置則爲 null private String getZone(ServiceInstance serviceInstance) { Map<String, String> metadata = serviceInstance.getMetadata(); if (metadata != null) { return metadata.get(ZONE); } return null; } }
在以前章節的講述中,咱們提到了咱們使用 spring-cloud-sleuth 做爲鏈路追蹤庫。咱們想能夠經過其中的 traceId,來區分到底是否是同一個請求。微信
RoundRobinWithRequestSeparatedPositionLoadBalancer
多線程
//必定必須是實現ReactorServiceInstanceLoadBalancer //而不是ReactorLoadBalancer<ServiceInstance> //由於註冊的時候是ReactorServiceInstanceLoadBalancer @Log4j2 public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer { private final ServiceInstanceListSupplier serviceInstanceListSupplier; //每次請求算上重試不會超過1分鐘 //對於超過1分鐘的,這種請求確定比較重,不該該重試 private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) //隨機初始值,防止每次都是從第一個開始調用 .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000))); private final String serviceId; private final Tracer tracer; public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) { this.serviceInstanceListSupplier = serviceInstanceListSupplier; this.serviceId = serviceId; this.tracer = tracer; } @Override public Mono<Response<ServiceInstance>> choose(Request request) { return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances)); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } return getInstanceResponseByRoundRobin(serviceInstances); } private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } //爲了解決原始算法不一樣調用併發可能致使一個請求重試相同的實例 Span currentSpan = tracer.currentSpan(); if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); AtomicInteger seed = positionCache.get(l); int s = seed.getAndIncrement(); int pos = s % serviceInstances.size(); log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size()); return new DefaultResponse(serviceInstances.stream() //實例返回列表順序可能不一樣,爲了保持一致,先排序再取 .sorted(Comparator.comparing(ServiceInstance::getInstanceId)) .collect(Collectors.toList()).get(pos)); } }
在上一節,咱們提到了能夠經過 @LoadBalancerClients
註解配置默認的負載均衡器配置,咱們這裏就是經過這種方式進行配置。首先在 spring.factories 中添加自動配置類:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration
而後編寫這個自動配置類,其實很簡單,就是添加一個 @LoadBalancerClients
註解,設置默認配置類:
@Configuration(proxyBeanMethods = false) @LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class) public class LoadBalancerAutoConfiguration { }
編寫這個默認配置類,將上面咱們實現的兩個類,組裝進去:
DefaultLoadBalancerConfiguration
@Configuration(proxyBeanMethods = false) public class DefaultLoadBalancerConfiguration { @Bean public ServiceInstanceListSupplier serviceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, ConfigurableApplicationContext context, LoadBalancerZoneConfig zoneConfig ) { ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); return //開啓服務實例緩存 new CachingServiceInstanceListSupplier( //只能返回同一個 zone 的服務實例 new SameZoneOnlyServiceInstanceListSupplier( //啓用經過 discoveryClient 的服務發現 new DiscoveryClientServiceInstanceListSupplier( discoveryClient, env ), zoneConfig ) , cacheManagerProvider.getIfAvailable() ); } @Bean public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer( Environment environment, ServiceInstanceListSupplier serviceInstanceListSupplier, Tracer tracer ) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinWithRequestSeparatedPositionLoadBalancer( serviceInstanceListSupplier, name, tracer ); } }
這樣,咱們就實現了自定義的負載均衡器。也理解了 Spring Cloud LoadBalancer 的使用。接下來,咱們來單元測試下這些功能。集成測試後面會有單獨的章節,不用着急。
經過這屆單元測試,咱們也能夠了解下通常咱們實現 spring cloud 自定義的基礎組件,怎麼去單元測試。
這裏的單元測試主要測試三個場景:
編寫代碼:LoadBalancerTest
//SpringRunner也包含了MockitoJUnitRunner,因此 @Mock 等註解也生效了 @RunWith(SpringRunner.class) @SpringBootTest(properties = {LoadBalancerEurekaAutoConfiguration.LOADBALANCER_ZONE + "=zone1"}) public class LoadBalancerTest { @EnableAutoConfiguration(exclude = EurekaDiscoveryClientConfiguration.class) @Configuration public static class App { @Bean public DiscoveryClient discoveryClient() { ServiceInstance zone1Instance1 = Mockito.mock(ServiceInstance.class); ServiceInstance zone1Instance2 = Mockito.mock(ServiceInstance.class); ServiceInstance zone2Instance3 = Mockito.mock(ServiceInstance.class); Map<String, String> zone1 = Map.ofEntries( Map.entry("zone", "zone1") ); Map<String, String> zone2 = Map.ofEntries( Map.entry("zone", "zone2") ); when(zone1Instance1.getMetadata()).thenReturn(zone1); when(zone1Instance1.getInstanceId()).thenReturn("instance1"); when(zone1Instance2.getMetadata()).thenReturn(zone1); when(zone1Instance2.getInstanceId()).thenReturn("instance2"); when(zone2Instance3.getMetadata()).thenReturn(zone2); when(zone2Instance3.getInstanceId()).thenReturn("instance3"); DiscoveryClient mock = Mockito.mock(DiscoveryClient.class); Mockito.when(mock.getInstances("testService")) .thenReturn(List.of(zone1Instance1, zone1Instance2, zone2Instance3)); return mock; } } @Autowired private LoadBalancerClientFactory loadBalancerClientFactory; @Autowired private Tracer tracer; /** * 只返回同一個 zone 下的實例 */ @Test public void testFilteredByZone() { ReactiveLoadBalancer<ServiceInstance> testService = loadBalancerClientFactory.getInstance("testService"); for (int i = 0; i < 100; i++) { ServiceInstance server = Mono.from(testService.choose()).block().getServer(); //必須處於和當前實例同一個zone下 Assert.assertEquals(server.getMetadata().get("zone"), "zone1"); } } /** * 返回不一樣的實例 */ @Test public void testReturnNext() { ReactiveLoadBalancer<ServiceInstance> testService = loadBalancerClientFactory.getInstance("testService"); //獲取服務實例 ServiceInstance server1 = Mono.from(testService.choose()).block().getServer(); ServiceInstance server2 = Mono.from(testService.choose()).block().getServer(); //每次選擇的是不一樣實例 Assert.assertNotEquals(server1.getInstanceId(), server2.getInstanceId()); } /** * 跨線程,默認狀況下是可能返回同一實例的,在咱們的實現下,保持 * span 則會返回下一個實例,這樣保證多線程環境同一個 request 重試會返回下一實例 * @throws Exception */ @Test public void testSameSpanReturnNext() throws Exception { Span span = tracer.nextSpan(); //測試 100 次 for (int i = 0; i < 100; i++) { try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { ReactiveLoadBalancer<ServiceInstance> testService = loadBalancerClientFactory.getInstance("testService"); //獲取實例 ServiceInstance server1 = Mono.from(testService.choose()).block().getServer(); AtomicReference<ServiceInstance> server2 = new AtomicReference<>(); Thread thread = new Thread(() -> { //保持 trace,這樣就會認爲仍然是同一個請求上下文,這樣模擬重試 try (Tracer.SpanInScope cleared2 = tracer.withSpanInScope(span)) { server2.set(Mono.from(testService.choose()).block().getServer()); } }); thread.start(); thread.join(); System.out.println(i); Assert.assertNotEquals(server1.getInstanceId(), server2.get().getInstanceId()); } } } }
運行測試,測試經過。
微信搜索「個人編程喵」關注公衆號,加做者微信,每日一刷,輕鬆提高技術,斬獲各類offer: