Spring Cloud 升級之路-2020.0.x -7.使用 Spring Cloud LoadBalancer (2)

本項目代碼地址: https://github.com/HashZhang/...

咱們使用 Spring Cloud 官方推薦的 Spring Cloud LoadBalancer 做爲咱們的客戶端負載均衡器。上一節咱們瞭解了 Spring Cloud LoadBalancer 的結構,接下來咱們來講一下咱們在使用 Spring Cloud LoadBalancer 要實現的功能:java

  1. 咱們要實現不一樣集羣之間不互相調用,經過實例的metamap中的zone配置,來區分不一樣集羣的實例。只有實例的metamap中的zone配置同樣的實例才能互相調用。這個經過實現自定義的 ServiceInstanceListSupplier 便可實現
  2. 負載均衡的輪詢算法,須要請求與請求之間隔離,不能共用同一個 position 致使某個請求失敗以後的重試仍是原來失敗的實例。上一節看到的默認的 RoundRobinLoadBalancer 是全部線程共用同一個原子變量 position 每次請求原子加 1。在這種狀況下會有問題:假設有微服務 A 有兩個實例:實例 1 和實例 2。請求 A 到達時,RoundRobinLoadBalancer 返回實例 1,這時有請求 B 到達,RoundRobinLoadBalancer 返回實例 2。而後若是請求 A 失敗重試,RoundRobinLoadBalancer 又返回了實例 1。這不是咱們指望看到的。

針對這兩個功能,咱們分別編寫本身的實現。react

實現不一樣集羣不互相調用

Spring Cloud LoadBalancer 中的 zone 配置

Spring Cloud LoadBalancer 定義了 LoadBalancerZoneConfiggit

public class LoadBalancerZoneConfig {
    //標識當前負載均衡器處於哪個 zone
    private String zone;
    public LoadBalancerZoneConfig(String zone) {
        this.zone = zone;
    }
    public String getZone() {
        return zone;
    }
    public void setZone(String zone) {
        this.zone = zone;
    }
}

若是沒有引入 Eureka 相關依賴,則這個 zone 經過 spring.cloud.loadbalancer.zone 配置:
LoadBalancerAutoConfigurationgithub

@Bean
@ConditionalOnMissingBean
public LoadBalancerZoneConfig zoneConfig(Environment environment) {
    return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
}

若是引入了 Eureka 相關依賴,則若是在 Eureka 元數據配置了 zone,則這個 zone 會覆蓋 Spring Cloud LoadBalancer 中的 LoadBalancerZoneConfig算法

EurekaLoadBalancerClientConfigurationspring

@PostConstruct
public void postprocess() {
    if (!StringUtils.isEmpty(zoneConfig.getZone())) {
        return;
    }
    String zone = getZoneFromEureka();
    if (!StringUtils.isEmpty(zone)) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone);
        }
        //設置 `LoadBalancerZoneConfig`
        zoneConfig.setZone(zone);
    }
}

private String getZoneFromEureka() {
    String zone;
    //是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 爲 true
    boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname();
    //若是配置了,則嘗試從 Eureka 配置的 host 名稱中提取
    //實際就是以 . 分割 host,而後第二個就是 zone
    //例如 www.zone1.com 就是 zone1
    if (approximateZoneFromHostname && eurekaConfig != null) {
        return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false));
    }
    else {
        //不然,從 metadata map 中取 zone 這個 key
        zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone");
        //若是這個 key 不存在,則從配置中以 region 從 zone 列表取第一個 zone 做爲當前 zone
        if (StringUtils.isEmpty(zone) && clientConfig != null) {
            String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
            // Pick the first one from the regions we want to connect to
            zone = zones != null && zones.length > 0 ? zones[0] : null;
        }
        return zone;
    }
}

實現 SameZoneOnlyServiceInstanceListSupplier

爲了實現經過 zone 來過濾同一 zone 下的實例,而且絕對不會返回非同一 zone 下的實例,咱們來編寫代碼:編程

SameZoneOnlyServiceInstanceListSupplier緩存

/**
 * 只返回與當前實例同一個 Zone 的服務實例,不一樣 zone 之間的服務不互相調用
 */
public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
    /**
     * 實例元數據 map 中表示 zone 配置的 key
     */
    private final String ZONE = "zone";
    /**
     * 當前 spring cloud loadbalancer 的 zone 配置
     */
    private final LoadBalancerZoneConfig zoneConfig;
    private String zone;

    public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) {
        super(delegate);
        this.zoneConfig = zoneConfig;
    }

    @Override
    public Flux<List<ServiceInstance>> get() {
        return getDelegate().get().map(this::filteredByZone);
    }

    //經過 zoneConfig 過濾
    private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
        if (zone == null) {
            zone = zoneConfig.getZone();
        }
        if (zone != null) {
            List<ServiceInstance> filteredInstances = new ArrayList<>();
            for (ServiceInstance serviceInstance : serviceInstances) {
                String instanceZone = getZone(serviceInstance);
                if (zone.equalsIgnoreCase(instanceZone)) {
                    filteredInstances.add(serviceInstance);
                }
            }
            if (filteredInstances.size() > 0) {
                return filteredInstances;
            }
        }
        /**
         * @see ZonePreferenceServiceInstanceListSupplier 在沒有相同zone實例的時候返回的是全部實例
         * 咱們這裏爲了實現不一樣 zone 之間不互相調用須要返回空列表
         */
        return List.of();
    }

    //讀取實例的 zone,沒有配置則爲 null
    private String getZone(ServiceInstance serviceInstance) {
        Map<String, String> metadata = serviceInstance.getMetadata();
        if (metadata != null) {
            return metadata.get(ZONE);
        }
        return null;
    }
}

實現請求與請求之間隔離的負載均衡算法

在以前章節的講述中,咱們提到了咱們使用 spring-cloud-sleuth 做爲鏈路追蹤庫。咱們想能夠經過其中的 traceId,來區分到底是否是同一個請求。微信

RoundRobinWithRequestSeparatedPositionLoadBalancer多線程

//必定必須是實現ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//由於註冊的時候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    //每次請求算上重試不會超過1分鐘
    //對於超過1分鐘的,這種請求確定比較重,不該該重試
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            //隨機初始值,防止每次都是從第一個開始調用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        //爲了解決原始算法不一樣調用併發可能致使一個請求重試相同的實例
        Span currentSpan = tracer.currentSpan();
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                //實例返回列表順序可能不一樣,爲了保持一致,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}

將上述兩個元素加入咱們自定義的 LoadBalancerClient 並啓用

在上一節,咱們提到了能夠經過 @LoadBalancerClients 註解配置默認的負載均衡器配置,咱們這裏就是經過這種方式進行配置。首先在 spring.factories 中添加自動配置類:

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration

而後編寫這個自動配置類,其實很簡單,就是添加一個 @LoadBalancerClients 註解,設置默認配置類:

LoadBalancerAutoConfiguration

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class)
public class LoadBalancerAutoConfiguration {
}

編寫這個默認配置類,將上面咱們實現的兩個類,組裝進去:

DefaultLoadBalancerConfiguration

@Configuration(proxyBeanMethods = false)
public class DefaultLoadBalancerConfiguration {

    @Bean
    public ServiceInstanceListSupplier serviceInstanceListSupplier(
            DiscoveryClient discoveryClient,
            Environment env,
            ConfigurableApplicationContext context,
            LoadBalancerZoneConfig zoneConfig
    ) {
        ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
                .getBeanProvider(LoadBalancerCacheManager.class);
        return  //開啓服務實例緩存
                new CachingServiceInstanceListSupplier(
                        //只能返回同一個 zone 的服務實例
                        new SameZoneOnlyServiceInstanceListSupplier(
                                //啓用經過 discoveryClient 的服務發現
                                new DiscoveryClientServiceInstanceListSupplier(
                                        discoveryClient, env
                                ),
                                zoneConfig
                        )
                        , cacheManagerProvider.getIfAvailable()
                );
    }

    @Bean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
            Environment environment,
            ServiceInstanceListSupplier serviceInstanceListSupplier,
            Tracer tracer
    ) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinWithRequestSeparatedPositionLoadBalancer(
                serviceInstanceListSupplier,
                name,
                tracer
        );
    }
}

這樣,咱們就實現了自定義的負載均衡器。也理解了 Spring Cloud LoadBalancer 的使用。接下來,咱們來單元測試下這些功能。集成測試後面會有單獨的章節,不用着急。

單元測試上述功能

經過這屆單元測試,咱們也能夠了解下通常咱們實現 spring cloud 自定義的基礎組件,怎麼去單元測試。

這裏的單元測試主要測試三個場景:

  1. 只返回同一個 zone 下的實例,其餘 zone 的不會返回
  2. 對於多個請求,每一個請求返回的與上次的實例不一樣。
  3. 對於多線程的每一個請求,若是重試,返回的都是不一樣的實例

編寫代碼:
LoadBalancerTest

//SpringRunner也包含了MockitoJUnitRunner,因此 @Mock 等註解也生效了
@RunWith(SpringRunner.class)
@SpringBootTest(properties = {LoadBalancerEurekaAutoConfiguration.LOADBALANCER_ZONE + "=zone1"})
public class LoadBalancerTest {

    @EnableAutoConfiguration(exclude = EurekaDiscoveryClientConfiguration.class)
    @Configuration
    public static class App {
        @Bean
        public DiscoveryClient discoveryClient() {
            ServiceInstance zone1Instance1 = Mockito.mock(ServiceInstance.class);
            ServiceInstance zone1Instance2 = Mockito.mock(ServiceInstance.class);
            ServiceInstance zone2Instance3 = Mockito.mock(ServiceInstance.class);
            Map<String, String> zone1 = Map.ofEntries(
                    Map.entry("zone", "zone1")
            );
            Map<String, String> zone2 = Map.ofEntries(
                    Map.entry("zone", "zone2")
            );
            when(zone1Instance1.getMetadata()).thenReturn(zone1);
            when(zone1Instance1.getInstanceId()).thenReturn("instance1");
            when(zone1Instance2.getMetadata()).thenReturn(zone1);
            when(zone1Instance2.getInstanceId()).thenReturn("instance2");
            when(zone2Instance3.getMetadata()).thenReturn(zone2);
            when(zone2Instance3.getInstanceId()).thenReturn("instance3");
            DiscoveryClient mock = Mockito.mock(DiscoveryClient.class);
            Mockito.when(mock.getInstances("testService"))
                    .thenReturn(List.of(zone1Instance1, zone1Instance2, zone2Instance3));
            return mock;
        }
    }

    @Autowired
    private LoadBalancerClientFactory loadBalancerClientFactory;
    @Autowired
    private Tracer tracer;

    /**
     * 只返回同一個 zone 下的實例
     */
    @Test
    public void testFilteredByZone() {
        ReactiveLoadBalancer<ServiceInstance> testService =
                loadBalancerClientFactory.getInstance("testService");
        for (int i = 0; i < 100; i++) {
            ServiceInstance server = Mono.from(testService.choose()).block().getServer();
            //必須處於和當前實例同一個zone下
            Assert.assertEquals(server.getMetadata().get("zone"), "zone1");
        }
    }

    /**
     * 返回不一樣的實例
     */
    @Test
    public void testReturnNext() {
        ReactiveLoadBalancer<ServiceInstance> testService =
                loadBalancerClientFactory.getInstance("testService");
        //獲取服務實例
        ServiceInstance server1 = Mono.from(testService.choose()).block().getServer();
        ServiceInstance server2 = Mono.from(testService.choose()).block().getServer();
        //每次選擇的是不一樣實例
        Assert.assertNotEquals(server1.getInstanceId(), server2.getInstanceId());
    }

    /**
     * 跨線程,默認狀況下是可能返回同一實例的,在咱們的實現下,保持
     * span 則會返回下一個實例,這樣保證多線程環境同一個 request 重試會返回下一實例
     * @throws Exception
     */
    @Test
    public void testSameSpanReturnNext() throws Exception {
        Span span = tracer.nextSpan();
        //測試 100 次
        for (int i = 0; i < 100; i++) {
            try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                ReactiveLoadBalancer<ServiceInstance> testService =
                        loadBalancerClientFactory.getInstance("testService");
                //獲取實例
                ServiceInstance server1 = Mono.from(testService.choose()).block().getServer();
                AtomicReference<ServiceInstance> server2 = new AtomicReference<>();
                Thread thread = new Thread(() -> {
                    //保持 trace,這樣就會認爲仍然是同一個請求上下文,這樣模擬重試
                    try (Tracer.SpanInScope cleared2 = tracer.withSpanInScope(span)) {
                        server2.set(Mono.from(testService.choose()).block().getServer());
                    }
                });
                thread.start();
                thread.join();
                System.out.println(i);
                Assert.assertNotEquals(server1.getInstanceId(), server2.get().getInstanceId());
            }
        }
    }
}

運行測試,測試經過。

微信搜索「個人編程喵」關注公衆號,加做者微信,每日一刷,輕鬆提高技術,斬獲各類offer

image

相關文章
相關標籤/搜索