客戶端負載均衡Ribbon之源碼解析

什麼是負載均衡器?

假設有一個分佈式系統,該系統由在不一樣計算機上運行的許多服務組成。可是,當用戶數量很大時,一般會爲服務建立多個副本。每一個副本都在另外一臺計算機上運行。此時,出現 「Load Balancer(負載均衡器)」。它有助於在服務器之間平均分配傳入流量。html

服務器端負載均衡器

傳統上,Load Balancers(例如Nginx、F5)是放置在服務器端的組件。當請求來自 客戶端 時,它們將轉到負載均衡器,負載均衡器將爲請求指定 服務器。負載均衡器使用的最簡單的算法是隨機指定。在這種狀況下,大多數負載平衡器是用於控制負載平衡的硬件集成軟件。java

重點:git

  • 對客戶端不透明,客戶端不知道服務器端的服務列表,甚至不知道本身發送請求的目標地址存在負載均衡器。
  • 服務器端維護負載均衡服務器,控制負載均衡策略和算法。

客戶端負載均衡器

當負載均衡器位於 客戶端 時,客戶端獲得可用的服務器列表而後按照特定的負載均衡策略,分發請求到不一樣的 服務器程序員

重點:github

  • 對客戶端透明,客戶端須要知道服務器端的服務列表,須要自行決定請求要發送的目標地址。
  • 客戶端維護負載均衡服務器,控制負載均衡策略和算法。
  • 目前單獨提供的客戶端實現比較少( 我用過的只有Ribbon),大部分都是在框架內部自行實現。

Ribbon

簡介

Ribbon是Netflix公司開源的一個客戶單負載均衡的項目,能夠自動與 Eureka 進行交互。它提供下列特性:算法

  • 負載均衡
  • 容錯
  • 以異步和反應式模型執行多協議 (HTTP, TCP, UDP)
  • 緩存和批量

Ribbon中的關鍵組件

  • ServerList:能夠響應客戶端的特定服務的服務器列表。
  • ServerListFilter:能夠動態得到的具備所需特徵的候選服務器列表的過濾器。
  • ServerListUpdater:用於執行動態服務器列表更新。
  • Rule:負載均衡策略,用於肯定從服務器列表返回哪一個服務器。
  • Ping:客戶端用於快速檢查服務器當時是否處於活動狀態。
  • LoadBalancer:負載均衡器,負責負載均衡調度的管理。

源碼分析

LoadBalancerClient

實際應用中,一般將 RestTemplate 和 Ribbon 結合使用,例如:spring

@Configuration
public class RibbonConfig {
    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

消費者調用服務接口:緩存

@Service
public class RibbonService {
    @Autowired
    private RestTemplate restTemplate;
    public String hi(String name) {
        return restTemplate.getForObject("http://service-hi/hi?name="+name,String.class);
    }
}

@LoadBalanced,經過源碼能夠發現這是一個標記註解:服務器

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

經過註釋能夠知道@LoadBalanced註解是用來給RestTemplate作標記,方便咱們對RestTemplate添加一個LoadBalancerClient,以實現客戶端負載均衡。微信

根據spring boot的自動配置原理,能夠知道同包下的LoadBalancerAutoConfiguration,應該是實現客戶端負載均衡器的自動化配置類。代碼以下:

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
            final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
    }

    @Autowired(required = false)
    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(
            LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
    }

    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }
    
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
    }
    
    @Configuration
    @ConditionalOnClass(RetryTemplate.class)
    public static class RetryAutoConfiguration {

        @Bean
        @ConditionalOnMissingBean
        public LoadBalancedRetryFactory loadBalancedRetryFactory() {
            return new LoadBalancedRetryFactory() {};
        }
    }
    
    @Configuration
    @ConditionalOnClass(RetryTemplate.class)
    public static class RetryInterceptorAutoConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public RetryLoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,
                LoadBalancerRequestFactory requestFactory,
                LoadBalancedRetryFactory loadBalancedRetryFactory) {
            return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
                    requestFactory, loadBalancedRetryFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
    }
}

從代碼能夠看出,這個類做用主要是使用RestTemplateCustomizer對全部標註了@LoadBalanced的RestTemplate Bean添加了一個LoadBalancerInterceptor攔截器,而這個攔截器的做用就是對請求的URI進行轉換獲取到具體應該請求哪一個服務實例。

那再看看添加的攔截器LoadBalancerInterceptor的代碼,以下:

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }
}

從代碼能夠看出 LoadBalancerInterceptor 攔截了請求後,經過LoadBalancerClient執行具體的請求發送。

打開LoadBalancerClient,發現它是一個接口:

public interface LoadBalancerClient {

    ServiceInstance choose(String serviceId);

    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

    <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

    URI reconstructURI(ServiceInstance instance, URI original);
}

接口說明:

  • ServiceInstance choose(String serviceId):根據傳入的服務id,從負載均衡器中爲指定的服務選擇一個服務實例。
  • T execute(String serviceId, LoadBalancerRequest request):根據傳入的服務id,指定的負載均衡器中的服務實例執行請求。
  • T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request):根據傳入的服務實例,執行請求。

LoadBalancerClient 有一個惟一的實現類 RibbonLoadBalancerClient,關鍵代碼以下:

public class RibbonLoadBalancerClient implements LoadBalancerClient {

    public ServiceInstance choose(String serviceId) {
        Server server = this.getServer(serviceId);
        return server == null ? null : new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
    }
    
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
        Server server = this.getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        } else {
            RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
            return this.execute(serviceId, ribbonServer, request);
        }
    }
    
    protected Server getServer(String serviceId) {
        return this.getServer(this.getLoadBalancer(serviceId));
    }
    
    protected Server getServer(ILoadBalancer loadBalancer) {
        return loadBalancer == null ? null : loadBalancer.chooseServer("default");
    }
    
    protected ILoadBalancer getLoadBalancer(String serviceId) {
        return this.clientFactory.getLoadBalancer(serviceId);
    }
    
    //省略...

}

負載均衡器

從 RibbonLoadBalancerClient 代碼能夠看出,實際負載均衡的是經過 ILoadBalancer 來實現的。

ILoadBalancer 接口代碼以下:

public interface ILoadBalancer {

    public void addServers(List<Server> newServers);

    public Server chooseServer(Object key);

    public void markServerDown(Server server);

    public List<Server> getReachableServers();

    public List<Server> getAllServers();
}

接口說明:

  • addServers:向負載均衡器中添加一個服務實例集合。
  • chooseServer:跟據key,從負載均衡器獲取服務實例。
  • markServerDown:用來標記某個服務實例下線。
  • getReachableServers:獲取可用的服務實例集合。
  • getAllServers():獲取全部服務實例集合,包括下線的服務實例。

ILoadBalancer 的實現 依賴關係示意圖以下:

  • NoOpLoadBalancer:啥都不作
  • BaseLoadBalancer:
  • 一個負載均衡器的基本實現,其中有一個任意列表,能夠將服務器設置爲服務器池。
  • 能夠設置一個ping來肯定服務器的活力。
  • 在內部,該類維護一個「all」服務器列表,以及一個「up」服務器列表,並根據調用者的要求使用它們。
  • DynamicServerListLoadBalancer:
  • 經過動態的獲取服務器的候選列表的負載平衡器。
  • 能夠經過篩選標準來傳遞服務器列表,以過濾不符合所需條件的服務器。
  • ZoneAwareLoadBalancer:
  • 用於測量區域條件的關鍵指標是平均活動請求,它根據每一個rest客戶機和每一個區域聚合。這是區域內未完成的請求總數除以可用目標實例的數量(不包括斷路器跳閘實例)。當在壞區上緩慢發生超時時,此度量很是有效。
  • 該負載均衡器將計算並檢查全部可用區域的區域狀態。若是任何區域的平均活動請求已達到配置的閾值,則該區域將從活動服務器列表中刪除。若是超過一個區域達到閾值,則將刪除每一個服務器上活動請求最多的區域。一旦去掉最壞的區域,將在其他區域中選擇一個區域,其機率與其實例數成正比。服務器將使用給定的規則從所選區域返回。對於每一個請求,將重複上述步驟。也就是說,每一個與區域相關的負載平衡決策都是實時作出的,最新的統計數據能夠幫助進行選擇。

那麼在整合Ribbon的時候Spring Cloud默認採用了哪一個具體實現呢?咱們經過RibbonClientConfiguration配置類,能夠知道在整合時默認採用了ZoneAwareLoadBalancer來實現負載均衡器。

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    return (ILoadBalancer)(this.propertiesFactory
    .isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory
    .get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
}

從這段代碼 ,也能夠看出,負載均衡器所需的主要配置項是IClientConfig, ServerList, ServerListFilter, IRule, IPing, ServerListUpdater。下面逐一分析他們。

IClientConfig

IClientConfig 用於對客戶端或者負載均衡的配置,它的默認實現類爲 DefaultClientConfigImpl。

IRule

爲LoadBalancer定義「負載均衡策略」的接口。

public interface IRule{

    public Server choose(Object key);
    
    public void setLoadBalancer(ILoadBalancer lb);
    
    public ILoadBalancer getLoadBalancer();    
}

IRule 的實現 依賴關係示意圖以下:

  • BestAvailableRule:選擇具備最低併發請求的服務器。
  • ClientConfigEnabledRoundRobinRule:輪詢。
  • RandomRule:隨機選擇一個服務器。
  • RoundRobinRule:輪詢選擇服務器。
  • RetryRule:具有重試機制的輪詢。
  • WeightedResponseTimeRule:根據使用平均響應時間去分配一個weight(權重) ,weight越低,被選擇的可能性就越低。
  • ZoneAvoidanceRule:根據區域和可用性篩選,再輪詢選擇服務器。

IPing

定義如何 「ping」 服務器以檢查其是否存活。

public interface IPing {
    public boolean isAlive(Server server);
}

IPing 的實現 依賴關係示意圖以下:

  • PingUrl:真實的去ping 某個url,判斷其是否alive。
  • PingConstant:固定返回某服務是否可用,默認返回true,便可用
  • NoOpPing:不去ping,直接返回true,便可用。
  • DummyPing:繼承抽象類AbstractLoadBalancerPing,認爲因此服務都是存活狀態,返回true,便可用。
  • NIWSDiscoveryPing:結合eureka使用時,若是Discovery Client在線,則認爲心跳檢測經過。

ServerList

定義獲取全部的服務實例清單。

public interface ServerList<T extends Server> {
    public List<T> getInitialListOfServers();
    public List<T> getUpdatedListOfServers();   
}

ServerList 的實現 依賴關係示意圖以下:

  • DomainExtractingServerList:代理類,根據傳入的ServerList的值,實現具體的邏輯。
  • ConfigurationBasedServerList:從配置文件中加載服務器列表。
  • DiscoveryEnabledNIWSServerList:從Eureka註冊中心中獲取服務器列表。
  • StaticServerList:經過靜態配置來維護服務器列表。

ServerListFilter

容許根據過濾配置動態得到的具備所需特性的候選服務器列表。

public interface ServerListFilter<T extends Server> {
    public List<T> getFilteredListOfServers(List<T> servers);
}

ServerListFilter 的實現 依賴關係示意圖以下:

  • DefaultNIWSServerListFilter:徹底繼承自ZoneAffinityServerListFilter。
  • ZonePreferenceServerListFilter:EnableZoneAffinity 或 EnableZoneExclusivity 開啓狀態使用,默認關閉。處理基於區域感知的過濾服務器,過濾掉不和客戶端在相同zone的服務,若不存在相同zone,則不進行過濾。
  • ServerListSubsetFilter:服務器列表篩選器,它將負載平衡器使用的服務器數量限制爲全部服務器的子集。若是服務器機羣很大(例如數百個),而且不須要使用每個機羣並將鏈接保存在http客戶機的鏈接池中,那麼這是很是有用的。它還能夠經過比較總的網絡故障和併發鏈接來驅逐相對不健康的服務器。

ServerListUpdater

用於執行動態服務器列表更新。

public interface ServerListUpdater {

    public interface UpdateAction {
        void doUpdate();
    }

    void start(UpdateAction updateAction);

    void stop();

    String getLastUpdate();

    long getDurationSinceLastUpdateMs();

    int getNumberMissedCycles();

    int getCoreThreads();
}

ServerListUpdater 的實現 依賴關係示意圖以下:

  • PollingServerListUpdater:默認的實現策略,會啓動一個定時線程池,定時執行更新策略。
  • EurekaNotificationServerListUpdater:利用Eureka的事件監聽器來驅動服務列表的更新操做。

參考資料

https://github.com/Netflix/ribbon/wiki

http://tech.lede.com/2018/01/11/rd/server/NetflixRibbon/

http://blog.didispace.com/springcloud-sourcecode-ribbon/

https://www.fangzhipeng.com/springcloud/2017/08/11/Ribbon-resources.html

https://blog.csdn.net/Tincox/article/details/79210309




歡迎掃碼或微信搜索公衆號《程序員果果》關注我,關注有驚喜~

相關文章
相關標籤/搜索