Spring Cloud Ribbon

客戶端負載均衡Spring Cloud Ribbon

 Spring Cloud Ribbon是一個基於HTTP和TCP的客戶端負載均衡工具,基於Netflix Ribbon實現。java

目錄

  1. 客戶端負載均衡(本文重點)
  2. 源碼分析(本文重點)
  3. 負載均衡器
  4. 負載均衡策略
  5. 配置詳解
  6. 自動化配置

客戶端負載均衡

 負載均衡是對系統的高可用、網絡壓力的緩解和處理內容擴容的重要手段之一。git

 負載均衡能夠分爲客戶端負載均衡和服務端負載均衡。算法

 負載均衡按設備來分爲硬件負載均衡和軟件負載均衡,都屬於服務端負載均衡。spring

 硬件負載均衡主要經過在服務器節點之間安裝專門用於負載均衡的設備,例如F5等。服務器

 軟件負載均衡經過在服務器上安裝一些具備負載均衡功能或模塊的軟件來完成請求的轉發工做,例如Nginx等。微信

 硬件負載均衡和軟件負載均衡都會維護一個可用的服務清單,而後經過心跳檢測來剔除故障節點以保證服務清單中的節點都正常可用。當客戶端發出請求時,負載均衡器會按照某種算法(線性輪詢、按權重負載、按流量負載等)從服務清單中取出一臺服務器的地址,而後將請求轉發到該服務器上。網絡

 客戶端負載均衡須要客戶端本身維護本身要訪問的服務實例清單, 這些服務清單來源於註冊中心(在使用Eureka進行服務治理時)。app

基於Spring Cloud Ribbon實現客戶端負載均衡

 基於Spring Cloud Ribbon實現客戶端負載均衡很是簡單,主要由如下步驟:負載均衡

  1. 服務提供者須要啓動多個服務實例並註冊到一個或多個相關聯的服務註冊中心上;
  2. 服務消費者直接經過帶有@LoadBalanced註解的RestTemplate向服務提供者發送請求以實現客戶端的負載均衡。

源碼分析

 既然Ribbon客戶端負載均衡須要爲RestTemplate增長@LoadBalanced註解,那麼下面咱們就從這個註解開始分析。分佈式

@LoadBalanced

 經過該註解的頭部註釋能夠得知,該註解的做用是使用LoadBalancerClient來對RestTemplate進行配置,下面接着看LoadBalancerClient

LoadBalancerClient

 該類是一個接口類,代碼以下:

package org.springframework.cloud.client.loadbalancer;

import org.springframework.cloud.client.ServiceInstance;

import java.io.IOException;
import java.net.URI;

public interface LoadBalancerClient extends ServiceInstanceChooser {
    
    /**
     * 注意該方法是從父類ServiceInstanceChooser接口中繼承過來的
     */
    ServiceInstance choose(String serviceId);

    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

    <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

    URI reconstructURI(ServiceInstance instance, URI original);
}

 從接口的方法和註釋能夠看出,一個客戶端負載均衡器應該具備如下幾種重要功能:

  1. ServiceInstance choose(String serviceId) : 根據傳入的服務名稱,從負載均衡器中挑選一個對應服務的實例。
  2. <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException : 使用從負載均衡器中挑選出的服務實例來執行請求內容。
  3. URI reconstructURI(ServiceInstance instance, URI original) : 構建一個符合host:port格式的URI。在分佈式系統中,咱們都使用邏輯上的服務名做爲host來構建URI(代替服務實例的host:port形式)進行請求。在該操做的定義中,前者ServiceInstance對象是帶有host和port的具體服務實例,後者URI對象則是使用邏輯服務名的URI,返回的URI是根據這二者轉換後的host:port形式的URI。

 經過分析org.springframework.cloud.client.loadbalancer包中的類,能夠找出org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration是實現客戶端負載均衡的自動配置類

LoadBalancerAutoConfiguration

 從LoadBalancerAutoConfiguration類的頭部註解上能夠看出,Ribbon實現客戶端負載均衡的自動配置須要知足下面兩個條件:

  1. @ConditionalOnClass(RestTemplate.class) : RestTemplate類必須存在於當前工程的環境中
  2. @ConditionalOnBean(LoadBalancerClient.class) : 在Spring的Bean工程中必需要有LoadBalancerClient的實現Bean

 在該自動化配置的任務中,主要完成如下三個任務:

  1. 建立一個LoadBalancerInterceptor實例,用於客戶端發起請求時進行攔截,進而實現客戶端的負載均衡
  2. 建立一個RestTemplateCustomizer實例,用於給RestTemplate增長LoadBalancerInterceptor攔截器
  3. 維護一個被@LoadBalanced註解修飾的RestTemplate集合(List<RestTemplate>),並在這裏進行初始化,經過調用RestTemplateCustomizer實例來給須要客戶端負載均衡的RestTemplate增長LoadBalancerInterceptor攔截器

 經過上面能夠看出,真正實現客戶端負載均衡是由於有LoadBalancerInterceptor攔截器的存在,那麼下面看一下LoadBalancerInterceptor類

LoadBalancerInterceptor

package org.springframework.cloud.client.loadbalancer;

import java.io.IOException;
import java.net.URI;

import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.util.Assert;

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }
}
package org.springframework.cloud.client.loadbalancer;

import java.util.List;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpResponse;

public class LoadBalancerRequestFactory {

    private LoadBalancerClient loadBalancer;
    private List<LoadBalancerRequestTransformer> transformers;

    public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
            List<LoadBalancerRequestTransformer> transformers) {
        this.loadBalancer = loadBalancer;
        this.transformers = transformers;
    }

    public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
        this.loadBalancer = loadBalancer;
    }

    public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
            final byte[] body, final ClientHttpRequestExecution execution) {
        return instance -> {
            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
            if (transformers != null) {
                for (LoadBalancerRequestTransformer transformer : transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);
        };
    }
}
package org.springframework.cloud.client.loadbalancer;

import java.net.URI;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.support.HttpRequestWrapper;

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
                                 LoadBalancerClient loadBalancer) {
        super(request);
        this.instance = instance;
        this.loadBalancer = loadBalancer;
    }

    @Override
    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(
                this.instance, getRequest().getURI());
        return uri;
    }
}

 攔截器的intercept方法還用到另外兩個類LoadBalancerRequestFactory和ServiceRequestWrapper。

 當使用被@LoadBalanced註解的RestTemplate發送請求時,會被LoadBalancerInterceptor攔截器攔截,執行LoadBalancerInterceptor的intercept方法,最終在該方法中選擇合適的服務實例經過LoadBalancerClient的execute方法進行調用。

 由於LoadBalancerClient是抽象的負載均衡器接口,下面咱們能夠經過一個具體的負載均衡器RibbonLoadBalancerClient來進行分析。

RibbonLoadBalancerClient

package org.springframework.cloud.netflix.ribbon;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Map;

import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;

import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

import static org.springframework.cloud.netflix.ribbon.RibbonUtils.updateToSecureConnectionIfNeeded;

public class RibbonLoadBalancerClient implements LoadBalancerClient {

    private SpringClientFactory clientFactory;

    public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
        this.clientFactory = clientFactory;
    }

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

    @Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

}

 分析execute(String serviceId, LoadBalancerRequest<T> request)方法的執行步驟:

 1.獲取負載均衡器,默認的負載均衡器是ZoneAwareLoadBalancer,這個後面再講

 2.獲取具體的服務實例,這裏並無調用LoadBalancerClient的ServiceInstance choose(String serviceId)方法,而是調用的ILoadBalancer的Server chooseServer(Object key)方法來獲取具體的服務實例,想知道ILoadBalancer幹啥用,請看下面的接口介紹

 3.將獲取到的具體服務實例包裝成RibbonServer對象(該對象存儲了服務實例信息、服務名serviceId、是否須要https等其餘信息),最後調用LoadBalancerRequest的apply方法,向一個實際的的具體服務實例發起請求,請看下面LoadLoadBalancerRequest的分析

ILoadBalancer

package com.netflix.loadbalancer;

import java.util.List;

public interface ILoadBalancer {

    public void addServers(List<Server> newServers);
    
    public Server chooseServer(Object key);
    
    public void markServerDown(Server server);
    
    public List<Server> getReachableServers();

    public List<Server> getAllServers();
}

 該類也是負載均衡器的一個抽象接口,主要定義了一系列的抽象操做:

  1. void addServers(List<Server> newServers) : 向負載均衡器維護的服務實例列表中增長服務實例
  2. Server chooseServer(Object key) : 根據某種負載策略,從負載均衡器中挑選一個具體的服務實例
  3. void markServerDown(Server server) : 通知和標識負載均衡器中的某個具體的服務實例已經中止服務,防止負載均衡器在下一次獲取服務實例清單前認爲該服務實例是正常服務
  4. List<Server> getReachableServers() : 獲取當前正常的服務實例列表
  5. List<Server> getAllServers() : 獲取全部已知的服務實例列表

 該接口中使用到的Server對象定義是一個傳統的服務端節點,在該類中存儲了服務端節點的一些元數據信息,包括host、port以及一些部署信息等。

 整理該接口的實現類主要有:

  1. AbstractLoadBalancer(抽象類)
  2. BaseLoadBalancer:繼承自AbstractLoadBalancer
  3. DynamicServerListLoadBalancer:繼承自BaseLoadBalancer
  4. NoOpLoadBalancer:繼承自AbstractLoadBalancer
  5. ZoneAwareLoadBalancer:繼承自DynamicServerListLoadBalancer

 默認使用的負載均衡器是ZoneAwareLoadBalancer,要想知道這個結果很簡單,查看一下RibbonClientConfiguration配置類,該類中有一個方法以下:

@Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

 此處的代碼邏輯若是沒有配置負載均衡器,那麼默認的負載均衡器就是ZoneAwareLoadBalancer。

LoadLoadBalancerRequest

 該類是一個抽象的接口,只有一個apply(ServiceInstance instance)方法,先看一下ServiceInstance類。

ServiceInstance

 該接口暴露了服務實例的一些基本信息,如:serviceId、port、host等,源碼以下:

package org.springframework.cloud.client;

import java.net.URI;
import java.util.Map;

public interface ServiceInstance {

    String getServiceId();

    String getHost();

    int getPort();

    boolean isSecure();

    URI getUri();

    Map<String, String> getMetadata();
    
    default String getScheme() {
        return null;
    }
}

 上面說起的RibbonServer是ServiceInstance的一個具體實現,查看源碼能夠知道RibbonServer除了包含Server對象以外,還存儲了服務名、是否使用HTTPS以及一個Map類型的元數據集合。

 看完上面兩個類,咱們來看一下具體的apply方法的實現,源碼中使用Lamada表達式實現,下面我只抽出具體的功能實現的源碼以下:

HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
            if (transformers != null) {
                for (LoadBalancerRequestTransformer transformer : transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);

 在apply方法中,首先將HttpRequest包裝成ServiceRequestWrapper對象,下面看一下ServiceRequestWrapper類,源碼以下:

package org.springframework.cloud.client.loadbalancer;

import java.net.URI;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.support.HttpRequestWrapper;

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
                                 LoadBalancerClient loadBalancer) {
        super(request);
        this.instance = instance;
        this.loadBalancer = loadBalancer;
    }

    @Override
    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(
                this.instance, getRequest().getURI());
        return uri;
    }
}

 在ServiceRequestWrapper類中重寫了getURI方法,重寫後的該方法經過調用LoadBalancerClient中的reconstructURI方法來構建一個host:port形式的URI對外發起請求。

 在apply方法的最後在調用ClientHttpRequestExecution的execute方法時,實際會去執行InterceptingClientHttpRequest類下面的InterceptingRequestExecution的execute方法,該方法的具體代碼以下:

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }
    }

 分析上面的代碼能夠看出,在調用requestFactory.createRequest(request.getURI(), method)建立請求時會調用getURI方法,此時調用的getURI方法就是ServiceRequestWrapper類中的getURI方法,進而調用LoadBalancerClient中的reconstructURI方法,下面咱們看一下RibbonLoadBalancerClient中的reconstructURI是如何實現的

public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) {
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
        } else {
            server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);
    }

 分析上面的代碼,首先根據serviceId從SpringClientFactory對象中獲取serviceId對應的負載均衡器的上下文RibbonLoadBalancerContext對象,而後再使用該上下文對象的reconstructURIWithServer方法和server對象來構建具體的URI。

 備註:

  1. SpringClientFactory : 一個用來建立客戶端負載均衡器的工廠類,該工廠類會爲每個不一樣名的Ribbon客戶端生成不一樣的上下文
  2. RibbonLoadBalancerContext : LoadBalancerContext的子類,該類用於存儲一些被負載均衡器使用的上下文內容和API操做。

 關於LoadBalancerContext類中的reconstructURIWithServer方法是如何組裝host:port形式的邏輯很容易理解,咱們就不在這裏解釋了,有興趣的能夠本身去看一下。

小結

 使用被@LoadBalanced註解的RestTemplate發起請求時,會被LoadBalancerInterceptor攔截,而後藉助負載均衡器LoadBalancerClient將邏輯服務名轉換爲host:port的具體的服務實例地址,在使用RibbonLoadBalancerClient(Ribbon實現的負載均衡器)時實際使用的是Ribbon中定義的ILoadBalancer,默認自動化配置的負載均衡器是ZoneAwareLoadBalancer。

代碼地址

https://gitee.com/petterheng/spring-cloud-eureka

後續

後面會介紹負載均衡器的源碼分析,請繼續關注!!!

 前往微信公衆號閱讀文章,點擊這裏,或者直接掃碼關注公衆號

微信公衆號

相關文章
相關標籤/搜索