Spring Cloud Ribbon負載均衡器

客戶端負載均衡Spring Cloud Ribbon

 Spring Cloud Ribbon是一個基於HTTP和TCP的客戶端負載均衡工具,基於Netflix Ribbon實現。java

目錄

  1. 客戶端負載均衡
  2. 源碼分析
  3. 負載均衡器(本文重點)
  4. 負載均衡策略
  5. 配置詳解
  6. 自動化配置

客戶端負載均衡&源碼分析

 請在上一篇文章的基礎上進行下面的學習,點擊這裏閱讀上一篇算法

負載均衡器

 下面咱們看一下具體的的負載均衡器,也就是ILoadBalancer接口的實現類。spring

AbstractLoadBalancer

 該類是ILoadBalancer接口的抽象實現類。segmentfault

 在該抽象實現類中含有一個關於服務實例的分組枚舉類,該枚舉類主要有如下三種類型:服務器

  1. ALL:全部服務實例
  2. STATUS_UP:正常服務的實例
  3. STATUS_NOT_UP:中止服務的實例

 該抽象類下面的的函數有如下幾個:併發

  1. chooseServer():該函數經過調用接口中的chooseServer(Object key)實現,其中參數key爲null,表示在選擇具體服務實例時忽略key的條件判斷
  2. List<Server> getServerList(ServerGroup serverGroup):定義了根據分組類型來獲取不一樣的服務實例的列表
  3. LoadBalancerStats getLoadBalancerStats():定義了獲取LoadBalancerStats對象的方法,LoadBalancerStats對象被用來存儲負載均衡器中各個服務實例當前的屬性和統計信息。這些信息能夠用來觀察負載均衡器的運行狀況,同時也是用來制定負載均衡策略的重要依據。

BaseLoadBalancer

 該類是Ribbon負載均衡器的基礎實現類,在該類中定義了不少關於負載均衡相關的基礎內容。app

 該類中定義並維護了兩個存儲服務實例Server對象的列表。一個用於存儲全部服務實例的清單,一個用於存儲正常服務的實例清單。代碼以下:負載均衡

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());

 定義了用來存儲負載均衡器各服務實例屬性和統計信息的LoadBalancerStats對象。dom

 定義了檢查服務實例是否正常的IPing對象,在BaseLoadBalancer中默認爲null,須要在構造時注入它的實現。ide

 定義了檢查服務實例操做的執行策略對象IPingStrategy,在BaseLoadBalancerz中默認使用了該類中定義的靜態內部類SerialPingStrategy。根據源碼,能夠看到該策略採用線性遍歷ping服務實例的方式實現檢查。可是該策略在當IPing的實現速度不理想或者Server列表過大時,可能會影響系統性能。這時就須要本身去實現本身的IPing策略。

 定義了負載均衡的處理規則IRule對象,從BaseLoadBalancer中chooseServer(Object key)方法源碼中也能夠看出它是將服務實例選擇的任務交給了IRule中的Server choose(Object key)方法。默認的IRule實現是RoundRobinRule。

 啓動Ping任務,在BaseLoadBalancer的默認構造函數中,會直接啓動一個用於定時檢查Server是否健康的任務。該任務默認執行的時間間隔爲10s。

 實現了ILoadBalancer接口定義的負載均衡器應該具有如下操做:

  1. addServers(List<Server> newServers):向負載均衡器中增長新的服務實例列表。該實現將本來已經維護的全部服務實例清單allServerList和新傳入的服務實例清單newServers都加入到newList中,而後再調用setServersList(List lsrv)方法對newList進行處理。在BaseLoadBalancer中的默認實現會用新的列表覆蓋舊的列表。後面幾個擴展實現類對於服務實例清單的更新的優化都是經過對setServersList(List lsrv)重寫來實現的。
  2. Server chooseServer(Object key):挑選一個具體的服務實例,上面介紹IRule的時候已經說過,再也不重說。
  3. markServerDown(Server server):用來標記某個服務實例暫停服務
  4. List<Server> getReachableServers():獲取可用的服務實例列表
  5. List<Server> getAllServers():獲取全部的服務實例列表

DynamicServerListLoadBalancer

 DynamicServerListLoadBalancer該類繼承於BaseLoadBalancer類,它是對基礎負載均衡器的擴展。

 在該負載均衡器,實現了服務實例清單在運行期的動態更新能力;同時,它還具有了對服務實例清單的過濾功能,咱們能夠經過過濾器來選擇性的獲取一批服務實例清單。

 下面看一下負載均衡器增長了哪些內容。

ServerList

 經過查看源碼,發現增長了一個關於服務列表的操做對象ServerList<T> serverListImpl,其中T是一個Server的子類,即表明了一個具體的服務實例的擴展類。其中ServerList的定義以下:

public interface ServerList<T extends Server> {

    public List<T> getInitialListOfServers();
    
    public List<T> getUpdatedListOfServers();   

}

 該抽象接口定義了兩個抽象方法,以下:

  1. List<T> getInitialListOfServers():用於獲取初始化的服務實例清單
  2. List<T> getUpdatedListOfServers():用於獲取更新的服務實例清單

 該抽象接口的實現類有不少,由於該負載均衡器中須要實現服務實例的動態更新,那麼就須要Ribbon具有訪問Eureka服務註冊中心獲取服務實例的能力,在DynamicServerListLoadBalancer默認的ServerList是DomainExtractingServerList(默認的實現是在EurekaRibbonClientConfiguration),源碼以下:

package org.springframework.cloud.netflix.ribbon.eureka;

@Configuration
public class EurekaRibbonClientConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }

}

 查看DomainExtractingServerList的源碼能夠看出,該類中有一個ServerList<DiscoveryEnabledServer> list,經過查看DomainExtractingServerList的構造函數,DomainExtractingServerList中的ServerList對象就是從上面的代碼中傳過來的DiscoveryEnabledNIWSServerList,源碼以下:

package org.springframework.cloud.netflix.ribbon.eureka;

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    private ServerList<DiscoveryEnabledServer> list;
    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getInitialListOfServers());
        return servers;
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }

}

 同時,經過上面的源碼還能夠看出,getInitialListOfServers()和getUpdatedListOfServers()方法的實現其實交給DiscoveryEnabledNIWSServerList來實現的,下面看一下DiscoveryEnabledNIWSServerList中這兩個方法的實現

package com.netflix.niws.loadbalancer;

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{

    private static final Logger logger = LoggerFactory.getLogger(DiscoveryEnabledNIWSServerList.class);

    String clientName;
    String vipAddresses;
    boolean isSecure = false;

    boolean prioritizeVipAddressBasedServers = true;

    String datacenter;
    String targetRegion;

    int overridePort = DefaultClientConfigImpl.DEFAULT_PORT;
    boolean shouldUseOverridePort = false;
    boolean shouldUseIpAddr = false;

    private final Provider<EurekaClient> eurekaClientProvider;

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }

}

 上述代碼的主要邏輯是藉助EurekaClient從服務註冊中心獲取到具體的服務實例(InstanceInfo)列表,首頁獲取到EurekaClient,而後更具邏輯服務名(vipAddress),獲取服務實例,將服務實例狀態爲UP(正常服務)的實例轉換爲DiscoveryEnabledServer對象,最終放在一個列表裏返回。

 在獲取到ServerList以後,DomainExtractingServerList會調用自身的setZones方法,源碼以下:

private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

 經過源碼能夠看出,該方法的主要做用是將DiscoveryEnabledNIWSServerList返回的List<DiscoveryEnabledServer>列表中的元素,轉換成DiscoveryEnabledServer的子類對象DomainExtractingServer,在該類對象的構造函數中將爲服務實例對象設置一些必要的屬性,如id,zone,isAliveFlag,readToServer等。

ServerListUpdate

 在DynamicServerListLoadBalancer類中有以下一段代碼,ServerListUpdater對象的實現就是對ServerList的更新

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

 下面看一下ServerListUpdater接口,該類內部還定義了一個UpdateAction接口,下面看一下源碼:

package com.netflix.loadbalancer;

public interface ServerListUpdater {
    
    public interface UpdateAction {
        void doUpdate();
    }

    void start(UpdateAction updateAction);

    void stop();

    String getLastUpdate();

    long getDurationSinceLastUpdateMs();
    
    int getNumberMissedCycles();

    int getCoreThreads();
}

 下面是該接口方法的介紹

  1. void doUpdate():該方法的實現內容就是對ServerList的具體更新操做
  2. void start(UpdateAction updateAction):啓動更新服務器,傳入的UpdateAction對象爲更新操做的具體實現
  3. void stop():中止更新服務器
  4. String getLastUpdate():獲取最近的更新時間戳
  5. long getDurationSinceLastUpdateMs():獲取上一次更新到如今的時間間隔,單位ms
  6. int getNumberMissedCycles():獲取錯過的更新週期數
  7. int getCoreThreads():獲取核心線程數

 下面看一下ServerListUpdater的具體實現類
實現類圖

  1. PollingServerListUpdater:動態服務列表更新的默認策略,DynamicServerListLoadBalancer負載均衡器中的默認實現就是該類,它經過定時任務的方式進行服務列表的更新。
  2. EurekaNotificationServerListUpdater:該更新器能夠用於DynamicServerListLoadBalancer負載均衡器,可是它的觸發機制與PollingServerListUpdater不一樣,它須要利用Eureka的事件監聽器來驅動服務列表的更新操做。

 下面看一下PollingServerListUpdater的實現,咱們從start函數看起

public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

 經過上述代碼能夠看出大體邏輯,建立了一個Runnable線程任務,在線程中調用了UpdateAction的doUpdate()方法,最後再啓動定時任務,initialDelayMs默認值1000ms,refreshIntervalMs默認值是30*1000ms,也就是說更新服務實例在初始化以後延遲1s後開始執行,並以30s爲週期重複執行。

ServerListFilter

 下面咱們回顧一下UpdateAction中doUpdate()方法的具體實現,源碼以下:

public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

 在上述源碼能夠看出,首先是調用了ServerList的getUpdatedListOfServers方法,這是用來從Eureka Server獲取正常的服務實例列表。在獲取完服務實例列表之後,咱們會調用filter.getFilteredListOfServers(servers),此處的filter就是咱們所要找的ServerListFilter。

 ServerListFilter接口很是簡單,僅僅有一個List<T> getFilteredListOfServers(List<T> servers)方法,用於實現對服務列表的過濾,下面看一下它的主要實現類:
PD7jEV.md.png
 在上面的圖中,ZonePreferenceServerListFilter的實現是Spring Cloud Ribbon中對Netflix Ribbon的擴展實現,其餘都是Netflix Ribbon中的原生實現類。下面咱們這些類的特色。

AbstractServerListFilter

package com.netflix.loadbalancer;

public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {

    private volatile LoadBalancerStats stats;
    
    public void setLoadBalancerStats(LoadBalancerStats stats) {
        this.stats = stats;
    }
    
    public LoadBalancerStats getLoadBalancerStats() {
        return stats;
    }

}

 該類是一個抽象過濾器,在這裏定義了過濾時須要的一個重要依據對象LoadBalancerStats,該對象存儲了關於負載均衡器的一些屬性和統計信息等。

ZoneAffinityServerListFilter

 該過濾器基於區域感知(Zone Affinity)的方式實現服務實例的過濾,它會根據提供服務的實例所處的區域(Zone)與消費者自身所處區域(Zone)進行比較,過濾掉那些不是同處一個區域的實例。

public List<T> getFilteredListOfServers(List<T> servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
    }

 從上面的源碼能夠看出,對於服務實例列表的過濾是經過Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())來實現的,其中判斷依據由ZoneAffinityPredicate實現服務實例與消費者的Zone比較。

 在比較事後,並非當即返回過濾以後的ServerList。而是經過shouldEnableZoneAffinity方法來判斷是否要啓用區域感知的功能。下面看一下shouldEnableZoneAffinity的實現:

private boolean shouldEnableZoneAffinity(List<T> filtered) {    
        if (!zoneAffinity && !zoneExclusive) {
            return false;
        }
        if (zoneExclusive) {
            return true;
        }
        LoadBalancerStats stats = getLoadBalancerStats();
        if (stats == null) {
            return zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();            
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                    || loadPerServer >= activeReqeustsPerServerThreshold.get()
                    || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                        new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            } else {
                return true;
            }
            
        }
    }

 經過查看源碼能夠看出,它調用了LoadBalancerStats的getZoneSnapshot方法來獲取這些過濾後的同區域實例的基礎指標(包含實例數量、斷路由器斷開數、活動請求數、實例平均負載等),而後根據一系列的算法求出下面的幾個評價值並與設置的閥值進行對比,若是有一個條件符合,就不啓用區域感知過濾的服務實例清單。

 上述算法實現爲集羣出現區域故障時,依然能夠依靠其餘區域的實例進行正常服務提供了完善的高可用保障。

  1. blackOutServerPercentage:故障實例百分比(斷路由器斷開數/實例數量)>=0.8
  2. activeReqeustsPerServer:實例平均負載>=0.6
  3. availableServers:可用實例數量(實例數量-斷路器斷開數)<2

DefaultNIWSServerListFilter

 該過濾器徹底繼承自ZoneAffinityServerListFilter,是默認的NIWS(Netflix Internal Web Service)過濾器。

ServerListSubsetFilter

 該過濾器繼承自ZoneAffinityServerListFilter,適合擁有大規模服務集羣(上百或更多)的系統。該過濾器能夠產生一個區域感知結果的子集列表,同時還可以經過比較服務實例的通訊失敗數量和併發鏈接數來斷定該服務是否健康來選擇性地從服務實例列表中剔除那些相對不夠健康的實例。該過濾器的實現主要有如下三步:

1.獲取區域感知的過濾結果,做爲候選的服務實例清單。

2.從當前消費者維護的服務實例子集中剔除那些相對不夠健康的實例(同時將這些實例從候選清單中剔除,防止第三步的時候又被選入),不健康的標準以下:

 a. 服務實例的併發鏈接數超過客戶端配置的值,默認爲0,配置參數爲<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationConnectionThresold

 b. 服務實例的失敗數超過客戶端配置的值,默認爲0,配置參數爲<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationFailureThresold。

 c. 若是按符合上面任一規則的服務實例剔除後,剔除比例小於客戶端默認配置的百分比,默認爲10%,配置參數爲<clientName>.<nameSpace>.ServerListSubsetFilter.forceEliminatePercent,那麼就先對剩下的實例列表進行健康排序,再從最不健康的實例進行剔除,直到達到配置的剔除百分比。

3.在完成剔除後,清單已經少了至少10%的服務實例,最後經過隨機的方式從候選清單中選出一批實例加入到清單中,以保持服務實例子集與原來的數量一致,默認的實例本身數量爲20,配置參數爲<clientName>.<nameSpace>.ServerListSubsetFilter.size。

ZonePreferenceServerListFilter

 Spring Cloud整合時新增的過濾器。若使用Spring Cloud整合Eureka和Ribbon時會默認使用該過濾器。它實現了經過配置或者Eureka實例元數據的所屬區域(Zone)來過濾出同區域的服務實例。下面看一下源碼:

@Override
    public List<Server> getFilteredListOfServers(List<Server> servers) {
        List<Server> output = super.getFilteredListOfServers(servers);
        if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
    }

 經過源碼分析能夠得出如下幾個步驟:

  1. 首先經過父類的ZoneAffinityServerListFilter過濾器來得到區域感知的服務實例列表
  2. 遍歷獲取的服務實例列表,取出根據消費者配置預設的區域Zone來進行過濾
  3. 過濾的結果若是是空直接返回區域感知的服務實例列表,若是不爲空則返回過濾後的結果

ZoneAwareLoadBalancer

 ZoneAwareLoadBalancer負載均衡器是對DynamicServerListLoadBalancer的擴展。

 在DynamicServerListLoadBalancer中,並無對chooseServer函數進行重寫,所以會採用BaseLoadBalancer中chooseServer,使用RoundRobinRule規則,以線性輪詢的方式來選擇調用的服務實例,該算法實現簡單並無區域(Zone)的概念,因此會把全部實例視爲一個Zone下的節點看待,這樣就會週期性的產生跨區域(Zone)訪問的狀況,因爲跨區域會產生更高的延遲,這些跨區域的實例主要以用來防止區域性故障實現高可用爲目的,不能做爲常規的訪問實例。

 ZoneAwareLoadBalancer能夠有效的避免DynamicServerListLoadBalancer的問題。下面咱們來看一下是如何避免這個問題的。

首先,在ZoneAwareLoadBalancer中並無重寫setServerList,說明實現服務實例清單的更新主邏輯沒有修改。可是ZoneAwareLoadBalancer中重寫了setServerListForZones(Map<String, List<Server>> zoneServersMap)函數。

 下面咱們先看一下DynamicServerListLoadBalancer中setServerListForZones中的實現:

@Override
    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(
            Map<String, List<Server>> zoneServersMap) {
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }

 經過分析源碼能夠看出,setServerListForZones的調用位於更新服務實例清單setServersList函數的最後,在setServerListForZones的實現中,首先獲取了LoadBalancerStats對象,而後調用其updateZoneServerMapping方法,下面咱們看一下該方法的具體實現:

private ZoneStats getZoneStats(String zone) {
        zone = zone.toLowerCase();
        ZoneStats zs = zoneStatsMap.get(zone);
        if (zs == null){
            zoneStatsMap.put(zone, new ZoneStats(this.getName(), zone, this));
            zs = zoneStatsMap.get(zone);
        }
        return zs;
    }

    public void updateZoneServerMapping(Map<String, List<Server>> map) {
        upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>(map);
        // make sure ZoneStats object exist for available zones for monitoring purpose
        for (String zone: map.keySet()) {
            getZoneStats(zone);
        }
    }

 經過上述源碼能夠看出,setServerListForZones方法的主要做用是根據按區域(Zone)分組的實例列表,爲負載均衡器中的LoadBalancerStats對象建立ZoneStats並放入Map zoneStatsMap集合中,每個區域對應一個ZoneStats,它用於存儲每一個Zone的一些狀態和統計信息。

 下面咱們看一下ZoneAwareLoadBalancer負載均衡器中setServerListForZones方法的實現:

@Override
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }

 首先建立了一個ConcurrentHashMap<String, BaseLoadBalancer>類型的balancers對象,它將用來存儲每一個Zone區域對應的負載均衡器。具體的負載均衡器的建立則是在下面的第一個循環中調用getLoadBalancer方法來完成,在建立負載均衡器的時候同時會建立它的規則(若是當前實現中沒有IRule,就建立一個AvailabilityFilteringRule規則,若是已經有實例,則克隆一個)。

 在建立完負載均衡器以後立刻調用setServersList方法爲其設置對應Zone區域的實例清單。

 第二個循環是對Zone區域中實例清單的檢查,看看是否有Zone區域下已經沒有實例了,是的話就將balancers中對應Zone區域的實例列表清空,該操做的做用是爲了後續選擇節點時,防止過期的Zone區域統計信息干擾具體實例的選擇算法。

 下面咱們再看一下負載均衡器是如何挑選服務實例,來實現對區域的識別的:

@Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

 經過源碼能夠看出,只有當負載均衡器中維護的實例所屬的Zone區域的個數大於1的時候纔會執行這裏的選擇策略,不然仍是將使用父類的實現。當Zone區域的個數大於1的時候,它的實現步驟以下:

1.調用ZoneAvoidanceRule中的靜態方法createSnapshot(lbStats),爲當前負載均衡器中全部的Zone區域分別建立快照,保存在在Map zoneSnapshot中,這些快照中的數據將用於後續的算法。

2.調用ZoneAvoidanceRule中的靜態方法getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()),來獲取可用的Zone區域集合,在該函數中會經過Zone區域快照中的統計數據來實現可用區的挑選

 a.首先會剔除符合這些規則的Zone區域:所屬實例數爲0的Zone區域;Zone區域內實例的平均負載小於0,或者實例故障率(斷路由器斷開次數/實例數)大於等於閥值(默認值爲0.99999)

 b.而後根據Zone區域的實例平均負載計算出最差的Zone區域,這裏的最差指的是實例平均負載最高的Zone區域

 c.若是在上面的過程當中沒有符合剔除要求的區域,同時實例最大平均負載小於閥值(默認20%),就直接返回全部Zone區域爲可用區域。不然,從最壞Zone區域集合中隨機選擇一個,將它從可用Zone區域集合中剔除。

3.當得到的可用Zone區域集合不爲空,而且個數小於Zone區域總數,就隨機選擇一個Zone區域

4.在肯定了某個Zone區域後,則獲取了對應Zone區域的負載均衡器,並調用chooseServer來選擇具體的服務實例,而在chooseServer中將使用IRule接口的choose方法來選擇具體的服務實例。在這裏,IRule接口的實現會採用ZoneAvoidanceRule來挑選具體的服務實例。

後續

後面會介紹負載均衡策略的源碼分析,請繼續關注!!!

相關文章
相關標籤/搜索