Spring Cloud Ribbon 源碼分析---負載均衡算法

上一篇分析了Ribbon如何發送出去一個自帶負載均衡效果的HTTP請求,本節就重點分析各個算法都是如何實現。java

負載均衡總體是從IRule進去的:算法

public interface IRule{
    /*
     * choose one alive server from lb.allServers or
     * lb.upServers according to key
     * 
     * @return choosen Server object. NULL is returned if none
     *  server is available 
     */

    public Server choose(Object key);

    public void setLoadBalancer(ILoadBalancer lb);

    public ILoadBalancer getLoadBalancer();    
}

經過 choose方法選擇指定的算法。後端

完整的算法包含以下:服務器

  1. RandomRule:隨機算法實現;
  2. RoundRobinRule:輪詢負載均衡策略,依次輪詢全部可用服務器列表,遇到第一個可用的即返回;
  3. RetryRule :先按照RoundRobinRule策略獲取服務,若是獲取服務失敗會在指定時間內重試;
  4. AvaliabilityFilteringRule: 過濾掉那些由於一直鏈接失敗的被標記爲circuit tripped的後端server,並過濾掉那些高併發的的後端server(active connections 超過配置的閾值) ;
  5. BestAvailableRule :會先過濾掉因爲屢次訪問故障二處於斷路器跳閘狀態的服務,而後選擇一個併發量最小的服務;
  6. WeightedResponseTimeRule: 根據響應時間分配一個weight,響應時間越長,weight越小,被選中的可能性越低;
  7. ZoneAvoidanceRule: 複合判斷server所在區域的性能和server的可用性選擇server

下面咱們一塊兒分析每個算法的實現。併發

RandomRule

public class RandomRule extends AbstractLoadBalancerRule {
    Random rand;

    public RandomRule() {
        rand = new Random();
    }

    /**
     * Randomly choose from all living servers
     */
    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            if (Thread.interrupted()) {
                return null;
            }
            //獲取全部 可用的節點
            List<Server> upList = lb.getReachableServers();
            //獲取全部節點,不區分是否可用
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();
            if (serverCount == 0) {
                /*
                 * No servers. End regardless of pass, because subsequent passes
                 * only get more restrictive.
                 */
                return null;
            }
			   //使用 隨機數算法,將 總結點數做爲種子
            int index = rand.nextInt(serverCount);
            //在全部可用的節點中隨機算則一個節點
            server = upList.get(index);

            if (server == null) {
                /*
                 * The only time this should happen is if the server list were
                 * somehow trimmed. This is a transient condition. Retry after
                 * yielding.
                 */
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            // Shouldn't actually happen.. but must be transient or a bug.
            server = null;
            Thread.yield();
        }

        return server;

    }

	@Override
	public Server choose(Object key) {
		return choose(getLoadBalancer(), key);
	}

	@Override
	public void initWithNiwsConfig(IClientConfig clientConfig) {
		// TODO Auto-generated method stub
		
	}
}

隨機算法的實現原理很簡單,將當前總節點數做爲種子,生成一個隨機數,在可用節點中選擇一個節點返回便可。app

RoundRobinRule

輪詢負載均衡策略,該算法順序查找全部服務列表,直到遇到第一個可用的服務就返回。限制了最多隻查詢10次,超過10次還未查到可用服務直接返回空。負載均衡

public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;

    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);

    public RoundRobinRule() {
        nextServerCyclicCounter = new AtomicInteger(0);
    }

    public RoundRobinRule(ILoadBalancer lb) {
        this();
        setLoadBalancer(lb);
    }

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        //最多嘗試10次,若是都沒有找到可用的服務器 就返回null
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
			//1,2,3...... 順序獲取 index
            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

    /**
     * nextServerCyclicCounter 初始值爲0,modulo 爲全部服務器總數
     *  next值 爲 1,2,3......
     * 正常狀況下 current 和 next 確定是相等的
     *
     * @param modulo The modulo to bound the value of the counter.
     * @return The next value.
     */
    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextServerCyclicCounter.get();
            int next = (current + 1) % modulo;
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

WeightedResponseTimeRule

響應時間做爲選取權重的負載均衡策略,響應時間越短的服務被選中的可能性大。less

public class WeightedResponseTimeRule extends RoundRobinRule {

    public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
        @Override
        public String key() {
            return "ServerWeightTaskTimerInterval";
        }
        
        @Override
        public String toString() {
            return key();
        }

        @Override
        public Class<Integer> type() {
            return Integer.class;
        }
    };
    
    public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
    
    private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;

    private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);
    
    // holds the accumulated weight from index 0 to current index
    // for example, element at index 2 holds the sum of weight of servers from 0 to 2
    private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
    

    private final Random random = new Random();

    protected Timer serverWeightTimer = null;

    protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);

    String name = "unknown";

    public WeightedResponseTimeRule() {
        super();
    }

    public WeightedResponseTimeRule(ILoadBalancer lb) {
        super(lb);
    }
    
    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        if (lb instanceof BaseLoadBalancer) {
            name = ((BaseLoadBalancer) lb).getName();
        }
        initialize(lb);
    }

    void initialize(ILoadBalancer lb) {        
        if (serverWeightTimer != null) {
            serverWeightTimer.cancel();
        }
        serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
                + name, true);
        serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
                serverWeightTaskTimerInterval);
        // do a initial run
        ServerWeight sw = new ServerWeight();
        sw.maintainWeights();

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                logger
                        .info("Stopping NFLoadBalancer-serverWeightTimer-"
                                + name);
                serverWeightTimer.cancel();
            }
        }));
    }

    public void shutdown() {
        if (serverWeightTimer != null) {
            logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
            serverWeightTimer.cancel();
        }
    }

    List<Double> getAccumulatedWeights() {
        return Collections.unmodifiableList(accumulatedWeights);
    }

    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
    @Override
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            // accumulatedWeights 裏面封裝的是已經計算完畢權重的全部服務器,具體在 ServerWeight類中
            List<Double> currentWeights = accumulatedWeights;
            if (Thread.interrupted()) {
                return null;
            }
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();

            if (serverCount == 0) {
                return null;
            }

            int serverIndex = 0;

            // 取出已經計算完權重的服務器列表中的最後一個權重,見下面解釋,最後一個權重爲 當前全部權重之和
            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
            // 若是沒有命中任何一個服務器或者是服務器列表權重尚未被初始化
            // 那麼就使用 默認的 RoundRobinRule 算法從新進行選擇
            if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
                server =  super.choose(getLoadBalancer(), key);
                if(server == null) {
                    return server;
                }
            } else {
                // 生成一個隨機權重  0 <= randomWeight < maxTotalWeight
                double randomWeight = random.nextDouble() * maxTotalWeight;
                // 看當前的 randomWeight 在哪一個區間,那麼該區間對應的服務器即爲被選中
                int n = 0;
                for (Double d : currentWeights) {
                    if (d >= randomWeight) {
                        serverIndex = n;
                        break;
                    } else {
                        n++;
                    }
                }

                server = allList.get(serverIndex);
            }

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            // Next.
            server = null;
        }
        return server;
    }

    class DynamicServerWeightTask extends TimerTask {
        public void run() {
            ServerWeight serverWeight = new ServerWeight();
            try {
                serverWeight.maintainWeights();
            } catch (Exception e) {
                logger.error("Error running DynamicServerWeightTask for {}", name, e);
            }
        }
    }

    //進行服務器的權重設置
    class ServerWeight {

        public void maintainWeights() {
            ILoadBalancer lb = getLoadBalancer();
            if (lb == null) {
                return;
            }
            
            if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
                return; 
            }
            
            try {
                logger.info("Weight adjusting job started");
                AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
                //在AbstractLoadBalancer中維護了一個服務器列表,裏面有當前服務器的統計信息
                LoadBalancerStats stats = nlb.getLoadBalancerStats();
                if (stats == null) {
                    // 若是沒有統計信息,返回
                    return;
                }
                //循環全部服務器,將全部服務器的平均響應時間 相加
                double totalResponseTime = 0; 
                for (Server server : nlb.getAllServers()) {
                    // 取出某個服務器的統計信息
                    ServerStats ss = stats.getSingleServerStat(server);
                    totalResponseTime += ss.getResponseTimeAvg();
                }
                // 計算權重的方式是: 權重 =  totalResponseTime - 該服務器的響應時間
                // 即響應時間越長的服務器,權重就會越小,因此被選擇的機會就越小
                Double weightSoFar = 0.0;
                
                // 這個for循環就是按照上述方法來計算每一個服務器的權重
                List<Double> finalWeights = new ArrayList<Double>();
                for (Server server : nlb.getAllServers()) {
                    ServerStats ss = stats.getSingleServerStat(server);
                    double weight = totalResponseTime - ss.getResponseTimeAvg();
                    //這裏的值,至關因而一個區間段,起始是0.0,日後每個數都比前面大當前的weight
                    //eg:0.0--5---10---15 ,那麼最後一個數就是當前全部權重的總和
                    weightSoFar += weight;
                    finalWeights.add(weightSoFar);   
                }
                setWeights(finalWeights);
            } catch (Exception e) {
                logger.error("Error calculating server weights", e);
            } finally {
                serverWeightAssignmentInProgress.set(false);
            }

        }
    }

    void setWeights(List<Double> weights) {
        this.accumulatedWeights = weights;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        super.initWithNiwsConfig(clientConfig);
        serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL);
    }

}

既然是按照響應時間權重來選擇服務,那麼先整理一下權重算法是怎麼作的。dom

觀察initialize方法,啓動了定時器定時執行DynamicServerWeightTask的run來調用計算服務權重,計算權重是經過內部類ServerWeight的maintainWeights方法來進行。ide

整理一下maintainWeights方法的邏輯,裏面有兩個for循環,第一個for循環拿到全部服務的總響應時間,第二個for循環計算每一個服務的權重以及總權重。

第一個for循環。

假設有4個服務,每一個服務的響應時間(ms):

A: 200

B: 500

C: 30

D: 1200

總響應時間:

200+500+30+1200=1930ms

接下來第二個for循環,計算每一個服務的權重。

服務的權重=總響應時間-服務自身的響應時間:

A: 1930-200=1730

B: 1930-500=1430

C: 1930-30=1900

D: 1930-1200=730

總權重:

1730+1430+1900+730=5790

響應時間及權重計算結果示意圖:

img

結果就是響應時間越短的服務,它的權重就越大。

再看一下choose方法。

重點在while循環的第3個if這裏。

首先若是斷定沒有服務或者權重還沒計算出來時,會採用父類RoundRobinRule以線性輪詢的方式選擇服務器。

有服務,有權重計算結果後,就是以總權重值爲限制,拿到一個隨機數,而後看隨機數落到哪一個區間,就選擇對應的服務。

因此選取服務的結論就是:

響應時間越短的服務,它的權重就越大,被選中的可能性就越大。

AvaliabilityFilteringRule

//抽象策略,繼承自ClientConfigEnabledRoundRobinRule
//基於Predicate的策略
//Predicateshi Google Guava Collection工具對集合進行過濾的條件接口
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {

    //定義了一個抽象函數來獲取AbstractServerPredicate
    public abstract AbstractServerPredicate getPredicate();

    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        //經過AbstractServerPredicate的chooseRoundRobinAfterFiltering函數來選出具體的服務實例
        //AbstractServerPredicate的子類實現的Predicate邏輯來過濾一部分服務實例
        //而後在以輪詢的方式從過濾後的實例中選出一個
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
 
    public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers) {
        //先經過內部定義的getEligibleServers函數來獲取備選清單(實現了過濾)
        List<Server> eligible = getEligibleServers(servers);
        if (eligible.size() == 0) {
            //若是返回的清單爲空,則用Optional.absent()來表示不存在
            return Optional.absent();
        }
        //以線性輪詢的方式從備選清單中獲取一個實例
        return Optional.of(eligible.get(random.nextInt(eligible.size())));
    }
 
    public List<Server> getEligibleServers(List<Server> servers) {
        return getEligibleServers(servers, null);
    }
    /**
     * Get servers filtered by this predicate from list of servers.
     */
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        if (loadBalancerKey == null) {
            return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
        } else {
            List<Server> results = Lists.newArrayList();
            //遍歷服務清單,使用apply方法來判斷實例是否須要保留,若是是,就添加到結果列表中
            //因此apply方法須要在子類中實現,子類就可實現高級策略
            for (Server server: servers) {
                if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                    results.add(server);
                }
            }
            return results;            
        }
    }
}
public Server choose(Object key) {
    int count = 0;
    //經過輪詢選擇一個server
    Server server = roundRobinRule.choose(key);
    //嘗試10次若是都不知足要求,就放棄,採用父類的choose
    //這裏爲啥嘗試10次?
    //1. 輪詢結果相互影響,可能致使某個請求每次調用輪詢返回的都是同一個有問題的server
    //2. 集羣很大時,遍歷整個集羣判斷效率低,咱們假設集羣中健康的實例要比不健康的多,若是10次找不到,就用父類的choose,這也是一種快速失敗機制
    while (count++ <= 10) {
        if (predicate.apply(new PredicateKey(server))) {
            return server;
        }
        server = roundRobinRule.choose(key);
    }
    return super.choose(key);
}

斷定規則:

如下兩項有一項成立,就表示該服務不可用,不能使用該服務。

配置項niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped爲true(未配置則默認爲true),而且已經觸發斷路。

服務的活動請求數 > 配置項niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit(未配則默認爲Interge.MAX_VALUE)。

在AvailabilityFilteringRule的choose中沒法選出服務的狀況下,會調用父類PredicateBasedRule的choose,PredicateBasedRule採用先過濾後線性輪行方法選擇服務,不過,用來斷定的predicate仍是AvailabilityPredicate,因此過濾用的斷定規則和上面是同樣的。

public class AvailabilityFilteringRule extends PredicateBasedRule {    

    private AbstractServerPredicate predicate;
    
    public AvailabilityFilteringRule() {
    	super();
    	predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
                .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                .build();
    }
    
    
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    	predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
    	            .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
    	            .build();
    }

    @Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE)
    public int getAvailableServersCount() {
    	ILoadBalancer lb = getLoadBalancer();
    	List<Server> servers = lb.getAllServers();
    	if (servers == null) {
    		return 0;
    	}
    	return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size();
    }


    /**
     * This method is overridden to provide a more efficient implementation which does not iterate through
     * all servers. This is under the assumption that in most cases, there are more available instances 
     * than not. 
     */
    @Override
    public Server choose(Object key) {
        int count = 0;
        Server server = roundRobinRule.choose(key);
        while (count++ <= 10) {
            if (predicate.apply(new PredicateKey(server))) {
                return server;
            }
            server = roundRobinRule.choose(key);
        }
        return super.choose(key);
    }

    @Override
    public AbstractServerPredicate getPredicate() {
        return predicate;
    }
}

RetryRule

顧名思義,可重試的策略。

public class RetryRule extends AbstractLoadBalancerRule {
	IRule subRule = new RoundRobinRule();
	long maxRetryMillis = 500;

	public RetryRule() {
	}

	public RetryRule(IRule subRule) {
		this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
	}

	public RetryRule(IRule subRule, long maxRetryMillis) {
		this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
		this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500;
	}

	public void setRule(IRule subRule) {
		this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
	}

	public IRule getRule() {
		return subRule;
	}

	public void setMaxRetryMillis(long maxRetryMillis) {
		if (maxRetryMillis > 0) {
			this.maxRetryMillis = maxRetryMillis;
		} else {
			this.maxRetryMillis = 500;
		}
	}

	public long getMaxRetryMillis() {
		return maxRetryMillis;
	}

	
	
	@Override
	public void setLoadBalancer(ILoadBalancer lb) {		
		super.setLoadBalancer(lb);
		subRule.setLoadBalancer(lb);
	}

	/*
	 * Loop if necessary. Note that the time CAN be exceeded depending on the
	 * subRule, because we're not spawning additional threads and returning
	 * early.
	 */
	public Server choose(ILoadBalancer lb, Object key) {
		long requestTime = System.currentTimeMillis();
        //超時時間爲 當前時間+500 ms
		long deadline = requestTime + maxRetryMillis;

		Server answer = null;
		//默認的策略是 RoundRobinRule
		answer = subRule.choose(key);
		//若是默認策略選出的服務器爲空,或者該服務器狀態爲不存活而且當前時間還在超時時間內
		if (((answer == null) || (!answer.isAlive()))
				&& (System.currentTimeMillis() < deadline)) {

			InterruptTask task = new InterruptTask(deadline
					- System.currentTimeMillis());
			只要知足條件,一直重試
			while (!Thread.interrupted()) {
				answer = subRule.choose(key);

				if (((answer == null) || (!answer.isAlive()))
						&& (System.currentTimeMillis() < deadline)) {
					/* pause and retry hoping it's transient */
					Thread.yield();
				} else {
					break;
				}
			}

			task.cancel();
		}
		//若是在最大超時時間內仍未能選出可用的服務器那就返回空
		if ((answer == null) || (!answer.isAlive())) {
			return null;
		} else {
			return answer;
		}
	}

	@Override
	public Server choose(Object key) {
		return choose(getLoadBalancer(), key);
	}

	@Override
	public void initWithNiwsConfig(IClientConfig clientConfig) {
	}
}

這個策略默認就是用RoundRobinRule策略選取服務,固然能夠經過配置,在構造RetryRule的時候傳進想要的策略。

爲了應對在有可能出現沒法選取出服務的狀況,好比瞬時斷線的狀況,那麼就要提供一種重試機制,在最大重試時間的限定下重複嘗試選取服務,直到選取出一個服務或者超時。

最大重試時間maxRetryMillis是可配置的。

BestAvailableRule

該策略繼承ClientConfigEnabledRoundRobinRule,在實現中它注入了負載均衡的統計對象LoadBalancerStats,同時在具體的choose算法中利用LoadBalancerStats保存的實例統計信息來選擇知足要求的實例。它經過遍歷負載均衡器中的維護的全部實例,會過濾掉故障的實例,並找出併發請求數最小的一個,因此該策略的特性時可選出最空閒的實例。

該算法核心依賴與LoadBalancerStats統計信息,當其爲空時候策略是沒法執行,默認執行父類的線性輪詢機制。

public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {

    private LoadBalancerStats loadBalancerStats;
    
    @Override
    public Server choose(Object key) {
        if (loadBalancerStats == null) {
            return super.choose(key);
        }
        //獲取當前全部的服務器信息
        List<Server> serverList = getLoadBalancer().getAllServers();
        int minimalConcurrentConnections = Integer.MAX_VALUE;
        long currentTime = System.currentTimeMillis();
        Server chosen = null;
        for (Server server: serverList) {
            //循環每個服務器,獲取當前服務器的統計信息
            ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
            //若是當前服務器沒有發生故障
            if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                //獲取服務器當前的併發請求量
                int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                //若是當前請求量小於minimalConcurrentConnections,就用當前值覆蓋
                //那麼最後chosen 就是併發量最小的服務器啦
                if (concurrentConnections < minimalConcurrentConnections) {
                    minimalConcurrentConnections = concurrentConnections;
                    chosen = server;
                }
            }
        }
        if (chosen == null) {
            return super.choose(key);
        } else {
            return chosen;
        }
    }

    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        if (lb instanceof AbstractLoadBalancer) {
            loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();            
        }
    }

}

ZoneAvoidanceRule

該策略是com.netflix.loadbalancer.PredicateBasedRule的具體實現類。它使用了CompositePredicate來進行服務實例清單的過濾。這是一個組合過濾條件,在其構造函數中,它以ZoneAvoidancePredicate爲主要過濾條件,判斷斷定一個zone的運行性能是否可用,剔除不可用的zone(全部server),AvailabilityPredicate爲次要過濾條件,用於過濾掉鏈接數過多的Server,初始化了組合過濾條件的實例。

查看源碼發現,ZoneAvoidanceRule並無重寫choose方法,而是直接使用了父類PredicateBasedRule的choose方法。

public class ZoneAvoidanceRule extends PredicateBasedRule {

    private static final Random random = new Random();
    //使用CompositePredicate來進行服務實例清單過濾。
    private CompositePredicate compositePredicate;
    
    public ZoneAvoidanceRule() {
        super();
        //判斷一個區域的服務是否可用的過濾條件
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
        //判斷一個服務的鏈接數是否過多的過濾條件
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
        //將這兩個條件組合到一塊兒
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }
    
    //這裏構造了一個兩個過濾條件的Predicate
    private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
        return CompositePredicate.withPredicates(p1, p2)
                             .addFallbackPredicate(p2)
                             .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                             .build();
        
    }
    
    
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }

    static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {
        Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();
        for (String zone : lbStats.getAvailableZones()) {
            ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
            map.put(zone, snapshot);
        }
        return map;
    }

    static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
            Set<String> chooseFrom) {
        if (chooseFrom == null || chooseFrom.size() == 0) {
            return null;
        }
        String selectedZone = chooseFrom.iterator().next();
        if (chooseFrom.size() == 1) {
            return selectedZone;
        }
        int totalServerCount = 0;
        for (String zone : chooseFrom) {
            totalServerCount += snapshot.get(zone).getInstanceCount();
        }
        int index = random.nextInt(totalServerCount) + 1;
        int sum = 0;
        for (String zone : chooseFrom) {
            sum += snapshot.get(zone).getInstanceCount();
            if (index <= sum) {
                selectedZone = zone;
                break;
            }
        }
        return selectedZone;
    }

    public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
        if (snapshot.isEmpty()) {
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;

        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                double loadPerServer = zoneSnapshot.getLoadPerServer();
                if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                        // they are the same considering double calculation
                        // round error
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }

        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
            // zone override is not needed here
            return availableZones;
        }
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        if (zoneToAvoid != null) {
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;

    }

    public static Set<String> getAvailableZones(LoadBalancerStats lbStats,
            double triggeringLoad, double triggeringBlackoutPercentage) {
        if (lbStats == null) {
            return null;
        }
        Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);
        return getAvailableZones(snapshot, triggeringLoad,
                triggeringBlackoutPercentage);
    }

    @Override
    public AbstractServerPredicate getPredicate() {
        return compositePredicate;
    }    
}

上面的源碼中看到 在構造函數中用兩個過濾條件構造了一個CompositePredicate,那麼它裏面怎麼作的呢?

public class CompositePredicate extends AbstractServerPredicate {

    private AbstractServerPredicate delegate;
    
    private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
        
    private int minimalFilteredServers = 1;
    
    private float minimalFilteredPercentage = 0;    
    
    @Override
    public boolean apply(@Nullable PredicateKey input) {
        return delegate.apply(input);
    }

    
    ...
    ...
    ...

    /**
     * Get the filtered servers from primary predicate, and if the number of the filtered servers
     * are not enough, trying the fallback predicates  
     */
    @Override
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        ////使用主過濾條件對全部實例過濾並返回過濾後的清單
        List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
        Iterator<AbstractServerPredicate> i = fallbacks.iterator();
        //依次使用次過濾條件對主過濾條件的結果進行過濾
        //不管是主過濾條件仍是次過濾條件,都須要判斷下面兩個條件
        //只要有一個條件符合,就再也不過濾,將當前結果返回供線性輪詢
        //算法選擇
        //第1個條件:過濾後的實例總數>=最小過濾實例數(默認爲1)
        //第2個條件:過濾互的實例比例>最小過濾百分比(默認爲0)
        while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
                && i.hasNext()) {
            AbstractServerPredicate predicate = i.next();
            result = predicate.getEligibleServers(servers, loadBalancerKey);
        }
        return result;
    }
相關文章
相關標籤/搜索