SpringCloud Ribbon

Ribbon 寓意爲帶狀。。。 Ribbon是Netflix公司開源的一個負載均衡的項目,是一個客戶端負載均衡器,運行在客戶端上,它是一個通過了雲端測試的IPC庫,能夠很好地控制HTTP和TCP客戶端的一些行爲。 Feign已經默認使用了Ribbon算法

負載均衡
容錯
多協議(HTTP,TCP,UDP)支持異步和反應模型
緩存和批處理

RestTemplate和Ribbon相結合 Ribbon在Netflix組件是很是重要的一個組件,在Zuul中使用Ribbon作負載均衡,以及Feign組件的結合等。在Spring Cloud 中,做爲開發中,作的最多的多是將RestTemplate和Ribbon相結合,你可能會這樣寫:spring

@Configuration
public class RibbonConfig {
    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

消費另一個的服務的接口,差很少是這樣的:緩存

@Service
public class RibbonService {
    @Autowired
    RestTemplate restTemplate;
    public String hi(String name) {
        return restTemplate.getForObject("http://eureka-client/hi?name="+name,String.class);
    }
}

深刻理解Ribbon LoadBalancerClient 在Riibon中一個很是重要的組件爲LoadBalancerClient,它做爲負載均衡的一個客戶端。(Feign最終實現負載均衡,也是要調用到LoadBalancerClient)。它在spring-cloud-commons包下:負載均衡

輸入圖片說明

LoadBalancerClient是一個接口,它繼承ServiceInstanceChooser,它的實現類是RibbonLoadBalancerClient,這三者之間的關係以下圖:less

輸入圖片說明

***LoadBalancerClient源碼,代碼以下:dom

public interface LoadBalancerClient extends ServiceInstanceChooser {
  <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
  <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
  URI reconstructURI(ServiceInstance instance, URI original);

}

ServiceInstanceChooser接口,主要有一個方法,用來根據serviceId來獲取ServiceInstance,代碼以下:異步

public interface ServiceInstanceChooser {

    ServiceInstance choose(String serviceId);
}

LoadBalancerClient的實現類爲RibbonLoadBalancerClient,這個類是很是重要的一個類,最終的負載均衡的請求處理,由它來執行。它的部分源碼以下:async

public class RibbonLoadBalancerClient implements LoadBalancerClient {

...//省略代碼

@Override
    public ServiceInstance choose(String serviceId) {
        Server server = getServer(serviceId);
        if (server == null) {
            return null;
        }
        return new RibbonServer(serviceId, server, isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));
    }



protected Server getServer(String serviceId) {
        return getServer(getLoadBalancer(serviceId));
   }


protected Server getServer(ILoadBalancer loadBalancer) {
        if (loadBalancer == null) {
            return null;
        }
        return loadBalancer.chooseServer("default"); // TODO: better handling of key
    }


protected ILoadBalancer getLoadBalancer(String serviceId) {
        return this.clientFactory.getLoadBalancer(serviceId);
    }

    ...//省略代碼

在RibbonLoadBalancerClient的源碼中,其中choose()方法是選擇具體服務實例的一個方法。該方法經過getServer()方法去獲取實例,通過源碼跟蹤,最終交給了ILoadBalancer類去選擇服務實例。ide

ILoadBalancer在ribbon-loadbalancer的jar包下,它是定義了實現軟件負載均衡的一個接口,它須要一組可供選擇的服務註冊列表信息,以及根據特定方法去選擇服務,它的源碼以下 :函數

public interface ILoadBalancer {

    public void addServers(List<Server> newServers);
    public Server chooseServer(Object key);
    public void markServerDown(Server server);
    public List<Server> getReachableServers();
    public List<Server> getAllServers();
}

其中, addServers(): 是添加一個Server集合; chooseServer():是根據key去獲取Server; markServerDown():用來標記某個服務下線; getReachableServers():獲取可用的Server集合; getAllServers():獲取全部的Server集合。

DynamicServerListLoadBalancer 它的繼承類爲BaseLoadBalancer,它的實現類爲DynamicServerListLoadBalancer,這三者之間的關係以下: 輸入圖片說明

查看上述三個類的源碼,可用發現,配置如下信息,IClientConfig、IRule、IPing、ServerList、ServerListFilter和ILoadBalancer,查看BaseLoadBalancer類,它默認的狀況下,實現瞭如下配置:

IClientConfig ribbonClientConfig: DefaultClientConfigImpl配置
IRule ribbonRule: RoundRobinRule 路由策略
IPing ribbonPing: DummyPing
ServerList ribbonServerList: ConfigurationBasedServerList
ServerListFilter ribbonServerListFilter: ZonePreferenceServerListFilter
ILoadBalancer ribbonLoadBalancer: ZoneAwareLoadBalancer

IClientConfig 用於對客戶端或者負載均衡的配置,它的默認實現類爲DefaultClientConfigImpl

IClientConfig 用於對客戶端或者負載均衡的配置,它的默認實現類爲DefaultClientConfigImpl。

IRule用於複雜均衡的策略,它有三個方法,其中choose()是根據key 來獲取server,setLoadBalancer()和getLoadBalancer()是用來設置和獲取ILoadBalancer的,它的源碼以下:

public interface IRule{

    public Server choose(Object key);

    public void setLoadBalancer(ILoadBalancer lb);

    public ILoadBalancer getLoadBalancer();    
}

IRule有不少默認的實現類,這些實現類根據不一樣的算法和邏輯來處理負載均衡。Ribbon實現的IRule有一下。在大多數狀況下,這些默認的實現類是能夠知足需求的,若是有特性的需求,能夠本身實現。

BestAvailableRule 選擇最小請求數

ClientConfigEnabledRoundRobinRule 輪詢

RandomRule 隨機選擇一個server
RoundRobinRule 輪詢選擇server
RetryRule 根據輪詢的方式重試
WeightedResponseTimeRule 根據響應時間去分配一個weight ,weight越低,被選擇的可能性就越低
ZoneAvoidanceRule 根據server的zone區域和可用性來輪詢選擇

輸入圖片說明

IPing向server發生」ping」,來判斷該server是否有響應,從而判斷該server是否可用。它有一個isAlive()方法,它的源碼以下:

public interface IPing {
    
    /**
     * Checks whether the given <code>Server</code> is "alive" i.e. should be
     * considered a candidate while loadbalancing
     * 
     */
    public boolean isAlive(Server server);
}

IPing的實現類有PingUrl、PingConstant、NoOpPing、DummyPing和NIWSDiscoveryPing。它門之間的關係以下: 輸入圖片說明

PingUrl 真實的去ping 某個url,判斷其是否alive
PingConstant 固定返回某服務是否可用,默認返回true,便可用
NoOpPing 不去ping,直接返回true,便可用。
DummyPing 直接返回true,並實現了initWithNiwsConfig方法。
NIWSDiscoveryPing,根據DiscoveryEnabledServer的InstanceInfo的InstanceStatus去判斷,若是爲InstanceStatus.UP,則爲可用,不然不可用。

ServerListFilter接口,定於了可根據配置去過濾或者根據特性動態獲取符合條件的server列表的方法,代碼以下:

public interface ServerListFilter<T extends Server> {

    public List<T> getFilteredListOfServers(List<T> servers);

}

閱讀DynamicServerListLoadBalancer的源碼,DynamicServerListLoadBalancer的構造函數中有個initWithNiwsConfig()方法。在改方法中,通過一系列的初始化配置,最終執行了restOfInit()方法。其代碼以下:

public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
        initWithNiwsConfig(clientConfig);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        try {
            super.initWithNiwsConfig(clientConfig);
            String niwsServerListClassName = clientConfig.getPropertyAsString(
                    CommonClientConfigKey.NIWSServerListClassName,
                    DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);

            ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
                    .instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
            this.serverListImpl = niwsServerListImpl;

            if (niwsServerListImpl instanceof AbstractServerList) {
                AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
                        .getFilterImpl(clientConfig);
                niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
                this.filter = niwsFilter;
            }

            String serverListUpdaterClassName = clientConfig.getPropertyAsString(
                    CommonClientConfigKey.ServerListUpdaterClassName,
                    DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
            );

            this.serverListUpdater = (ServerListUpdater) ClientFactory
                    .instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);

            restOfInit(clientConfig);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Exception while initializing NIWSDiscoveryLoadBalancer:"
                            + clientConfig.getClientName()
                            + ", niwsClientConfig:" + clientConfig, e);
        }
    }

在restOfInit()方法上,有一個 updateListOfServers()的方法,該方法是用來獲取全部的ServerList的。

void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

進一步跟蹤updateListOfServers()方法的源碼,最終由serverListImpl.getUpdatedListOfServers()獲取全部的服務列表的,代碼以下:

@VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

而serverListImpl是ServerList接口的具體實現類。跟蹤代碼,ServerList的實現類爲DiscoveryEnabledNIWSServerList,在ribbon-eureka.jar的com.netflix.niws.loadbalancer下。其中DiscoveryEnabledNIWSServerList有 getInitialListOfServers()和getUpdatedListOfServers()方法,具體代碼以下:

@Override
    public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }

繼續跟蹤源碼,obtainServersViaDiscovery(),是根據eurekaClientProvider.get()來回去EurekaClient,再根據EurekaClient來獲取註冊列表信息,代碼以下:

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }

其中eurekaClientProvider的實現類是LegacyEurekaClientProvider,它是一個獲取eurekaClient類,經過靜態的方法去獲取eurekaClient,其代碼以下:

class LegacyEurekaClientProvider implements Provider<EurekaClient> {

    private volatile EurekaClient eurekaClient;

    @Override
    public synchronized EurekaClient get() {
        if (eurekaClient == null) {
            eurekaClient = DiscoveryManager.getInstance().getDiscoveryClient();
        }

        return eurekaClient;
    }
}

EurekaClient的實現類爲DiscoveryClient,在以前已經分析了它具備服務註冊、獲取服務註冊列表等的所有功能。

*************因而可知,負載均衡器是從EurekaClient獲取服務信息,並根據IRule去路由,而且根據IPing去判斷服務的可用性。 **************

負載均衡器多久一次去獲取一次從Eureka Client獲取註冊信息呢? 在BaseLoadBalancer類下,BaseLoadBalancer的構造函數,該構造函數開啓了一個PingTask任務,代碼以下:

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
        ...//代碼省略
        setupPingTask();
         ...//代碼省略
    }

setupPingTask()的具體代碼邏輯,它開啓了ShutdownEnabledTimer執行PingTask任務,在默認狀況下pingIntervalSeconds爲10,即每10秒鐘,向EurekaClient發送一次」ping」。

void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

PingTask源碼,即new一個Pinger對象,並執行runPinger()方法。

class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }

查看Pinger的runPinger()方法,最終根據 pingerStrategy.pingServers(ping, allServers)來獲取服務的可用性,若是該返回結果,如以前相同,則不去向EurekaClient獲取註冊列表,若是不一樣則通知ServerStatusChangeListener或者changeListeners發生了改變,進行更新或者從新拉取。

public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }

            // we are "in" - we get to Ping

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * The readLock should be free unless an addServer operation is
                 * going on...
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);

                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                            name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();

                notifyServerStatusChangeListener(changedServers);
            } finally {
                pingInProgress.set(false);
            }
        }

因而可知,LoadBalancerClient是在初始化的時候,會向Eureka回去服務註冊列表,而且向經過10s一次向EurekaClient發送「ping」,來判斷服務的可用性,若是服務的可用性發生了改變或者服務數量和以前的不一致,則更新或者從新拉取。LoadBalancerClient有了這些服務註冊列表,就能夠根據具體的IRule來進行負載均衡。

RestTemplate是如何和Ribbon結合的

@LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }

全局搜索ctr+shift+f @LoadBalanced有哪些類用到了LoadBalanced有哪些類用到了, 發現LoadBalancerAutoConfiguration類,即LoadBalancer自動配置類。

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

@LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();
}
    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
            final List<RestTemplateCustomizer> customizers) {
        return new SmartInitializingSingleton() {
            @Override
            public void afterSingletonsInstantiated() {
                for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                    for (RestTemplateCustomizer customizer : customizers) {
                        customizer.customize(restTemplate);
                    }
                }
            }
        };
    }


    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return new RestTemplateCustomizer() {
                @Override
                public void customize(RestTemplate restTemplate) {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                }
            };
        }
    }

}

在該類中,首先維護了一個被@LoadBalanced修飾的RestTemplate對象的List,在初始化的過程當中,經過調用customizer.customize(restTemplate)方法來給RestTemplate增長攔截器LoadBalancerInterceptor。

而LoadBalancerInterceptor,用於實時攔截,在LoadBalancerInterceptor這裏實現來負載均衡。LoadBalancerInterceptor的攔截方法以下:

@Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }

總結

綜上所述,Ribbon的負載均衡,主要經過LoadBalancerClient來實現的,而LoadBalancerClient具體交給了ILoadBalancer來處理,ILoadBalancer經過配置IRule、IPing等信息,並向EurekaClient獲取註冊列表的信息,並默認10秒一次向EurekaClient發送「ping」,進而檢查是否更新服務列表,最後,獲得註冊列表後,ILoadBalancer根據IRule的策略進行負載均衡。

而RestTemplate 被@LoadBalance註解後,能過用負載均衡,主要是維護了一個被@LoadBalance註解的RestTemplate列表,並給列表中的RestTemplate添加攔截器,進而交給負載均衡器去處理。

相關文章
相關標籤/搜索