上一篇分析了Ribbon如何發送出去一個自帶負載均衡效果的HTTP請求,本節就重點分析各個算法都是如何實現。java
負載均衡總體是從IRule進去的:算法
public interface IRule{ /* * choose one alive server from lb.allServers or * lb.upServers according to key * * @return choosen Server object. NULL is returned if none * server is available */ public Server choose(Object key); public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); }
經過 choose方法選擇指定的算法。後端
完整的算法包含以下:服務器
下面咱們一塊兒分析每個算法的實現。併發
public class RandomRule extends AbstractLoadBalancerRule { Random rand; public RandomRule() { rand = new Random(); } /** * Randomly choose from all living servers */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } //獲取全部 可用的節點 List<Server> upList = lb.getReachableServers(); //獲取全部節點,不區分是否可用 List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } //使用 隨機數算法,將 總結點數做爲種子 int index = rand.nextInt(serverCount); //在全部可用的節點中隨機算則一個節點 server = upList.get(index); if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; } @Override public Server choose(Object key) { return choose(getLoadBalancer(), key); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { // TODO Auto-generated method stub } }
隨機算法的實現原理很簡單,將當前總節點數做爲種子,生成一個隨機數,在可用節點中選擇一個節點返回便可。app
輪詢負載均衡策略,該算法順序查找全部服務列表,直到遇到第一個可用的服務就返回。限制了最多隻查詢10次,超過10次還未查到可用服務直接返回空。負載均衡
public class RoundRobinRule extends AbstractLoadBalancerRule { private AtomicInteger nextServerCyclicCounter; private static final boolean AVAILABLE_ONLY_SERVERS = true; private static final boolean ALL_SERVERS = false; private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class); public RoundRobinRule() { nextServerCyclicCounter = new AtomicInteger(0); } public RoundRobinRule(ILoadBalancer lb) { this(); setLoadBalancer(lb); } public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; //最多嘗試10次,若是都沒有找到可用的服務器 就返回null while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } //1,2,3...... 順序獲取 index int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } /** * nextServerCyclicCounter 初始值爲0,modulo 爲全部服務器總數 * next值 爲 1,2,3...... * 正常狀況下 current 和 next 確定是相等的 * * @param modulo The modulo to bound the value of the counter. * @return The next value. */ private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } } @Override public Server choose(Object key) { return choose(getLoadBalancer(), key); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { } }
響應時間做爲選取權重的負載均衡策略,響應時間越短的服務被選中的可能性大。less
public class WeightedResponseTimeRule extends RoundRobinRule { public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() { @Override public String key() { return "ServerWeightTaskTimerInterval"; } @Override public String toString() { return key(); } @Override public Class<Integer> type() { return Integer.class; } }; public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000; private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL; private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class); // holds the accumulated weight from index 0 to current index // for example, element at index 2 holds the sum of weight of servers from 0 to 2 private volatile List<Double> accumulatedWeights = new ArrayList<Double>(); private final Random random = new Random(); protected Timer serverWeightTimer = null; protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false); String name = "unknown"; public WeightedResponseTimeRule() { super(); } public WeightedResponseTimeRule(ILoadBalancer lb) { super(lb); } @Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); if (lb instanceof BaseLoadBalancer) { name = ((BaseLoadBalancer) lb).getName(); } initialize(lb); } void initialize(ILoadBalancer lb) { if (serverWeightTimer != null) { serverWeightTimer.cancel(); } serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true); serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval); // do a initial run ServerWeight sw = new ServerWeight(); sw.maintainWeights(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { logger .info("Stopping NFLoadBalancer-serverWeightTimer-" + name); serverWeightTimer.cancel(); } })); } public void shutdown() { if (serverWeightTimer != null) { logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name); serverWeightTimer.cancel(); } } List<Double> getAccumulatedWeights() { return Collections.unmodifiableList(accumulatedWeights); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") @Override public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { // accumulatedWeights 裏面封裝的是已經計算完畢權重的全部服務器,具體在 ServerWeight類中 List<Double> currentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // 取出已經計算完權重的服務器列表中的最後一個權重,見下面解釋,最後一個權重爲 當前全部權重之和 double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // 若是沒有命中任何一個服務器或者是服務器列表權重尚未被初始化 // 那麼就使用 默認的 RoundRobinRule 算法從新進行選擇 if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // 生成一個隨機權重 0 <= randomWeight < maxTotalWeight double randomWeight = random.nextDouble() * maxTotalWeight; // 看當前的 randomWeight 在哪一個區間,那麼該區間對應的服務器即爲被選中 int n = 0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Next. server = null; } return server; } class DynamicServerWeightTask extends TimerTask { public void run() { ServerWeight serverWeight = new ServerWeight(); try { serverWeight.maintainWeights(); } catch (Exception e) { logger.error("Error running DynamicServerWeightTask for {}", name, e); } } } //進行服務器的權重設置 class ServerWeight { public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; //在AbstractLoadBalancer中維護了一個服務器列表,裏面有當前服務器的統計信息 LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // 若是沒有統計信息,返回 return; } //循環全部服務器,將全部服務器的平均響應時間 相加 double totalResponseTime = 0; for (Server server : nlb.getAllServers()) { // 取出某個服務器的統計信息 ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // 計算權重的方式是: 權重 = totalResponseTime - 該服務器的響應時間 // 即響應時間越長的服務器,權重就會越小,因此被選擇的機會就越小 Double weightSoFar = 0.0; // 這個for循環就是按照上述方法來計算每一個服務器的權重 List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); //這裏的值,至關因而一個區間段,起始是0.0,日後每個數都比前面大當前的weight //eg:0.0--5---10---15 ,那麼最後一個數就是當前全部權重的總和 weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } } } void setWeights(List<Double> weights) { this.accumulatedWeights = weights; } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { super.initWithNiwsConfig(clientConfig); serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL); } }
既然是按照響應時間權重來選擇服務,那麼先整理一下權重算法是怎麼作的。dom
觀察initialize方法,啓動了定時器定時執行DynamicServerWeightTask的run來調用計算服務權重,計算權重是經過內部類ServerWeight的maintainWeights方法來進行。ide
整理一下maintainWeights方法的邏輯,裏面有兩個for循環,第一個for循環拿到全部服務的總響應時間,第二個for循環計算每一個服務的權重以及總權重。
第一個for循環。
假設有4個服務,每一個服務的響應時間(ms):
A: 200
B: 500
C: 30
D: 1200
總響應時間:
200+500+30+1200=1930ms
接下來第二個for循環,計算每一個服務的權重。
服務的權重=總響應時間-服務自身的響應時間:
A: 1930-200=1730
B: 1930-500=1430
C: 1930-30=1900
D: 1930-1200=730
總權重:
1730+1430+1900+730=5790
響應時間及權重計算結果示意圖:
結果就是響應時間越短的服務,它的權重就越大。
再看一下choose方法。
重點在while循環的第3個if這裏。
首先若是斷定沒有服務或者權重還沒計算出來時,會採用父類RoundRobinRule以線性輪詢的方式選擇服務器。
有服務,有權重計算結果後,就是以總權重值爲限制,拿到一個隨機數,而後看隨機數落到哪一個區間,就選擇對應的服務。
因此選取服務的結論就是:
響應時間越短的服務,它的權重就越大,被選中的可能性就越大。
//抽象策略,繼承自ClientConfigEnabledRoundRobinRule //基於Predicate的策略 //Predicateshi Google Guava Collection工具對集合進行過濾的條件接口 public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule { //定義了一個抽象函數來獲取AbstractServerPredicate public abstract AbstractServerPredicate getPredicate(); @Override public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); //經過AbstractServerPredicate的chooseRoundRobinAfterFiltering函數來選出具體的服務實例 //AbstractServerPredicate的子類實現的Predicate邏輯來過濾一部分服務實例 //而後在以輪詢的方式從過濾後的實例中選出一個 Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> { public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers) { //先經過內部定義的getEligibleServers函數來獲取備選清單(實現了過濾) List<Server> eligible = getEligibleServers(servers); if (eligible.size() == 0) { //若是返回的清單爲空,則用Optional.absent()來表示不存在 return Optional.absent(); } //以線性輪詢的方式從備選清單中獲取一個實例 return Optional.of(eligible.get(random.nextInt(eligible.size()))); } public List<Server> getEligibleServers(List<Server> servers) { return getEligibleServers(servers, null); } /** * Get servers filtered by this predicate from list of servers. */ public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List<Server> results = Lists.newArrayList(); //遍歷服務清單,使用apply方法來判斷實例是否須要保留,若是是,就添加到結果列表中 //因此apply方法須要在子類中實現,子類就可實現高級策略 for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } } }
public Server choose(Object key) { int count = 0; //經過輪詢選擇一個server Server server = roundRobinRule.choose(key); //嘗試10次若是都不知足要求,就放棄,採用父類的choose //這裏爲啥嘗試10次? //1. 輪詢結果相互影響,可能致使某個請求每次調用輪詢返回的都是同一個有問題的server //2. 集羣很大時,遍歷整個集羣判斷效率低,咱們假設集羣中健康的實例要比不健康的多,若是10次找不到,就用父類的choose,這也是一種快速失敗機制 while (count++ <= 10) { if (predicate.apply(new PredicateKey(server))) { return server; } server = roundRobinRule.choose(key); } return super.choose(key); }
斷定規則:
如下兩項有一項成立,就表示該服務不可用,不能使用該服務。
配置項niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped爲true(未配置則默認爲true),而且已經觸發斷路。
服務的活動請求數 > 配置項niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit(未配則默認爲Interge.MAX_VALUE)。
在AvailabilityFilteringRule的choose中沒法選出服務的狀況下,會調用父類PredicateBasedRule的choose,PredicateBasedRule採用先過濾後線性輪行方法選擇服務,不過,用來斷定的predicate仍是AvailabilityPredicate,因此過濾用的斷定規則和上面是同樣的。
public class AvailabilityFilteringRule extends PredicateBasedRule { private AbstractServerPredicate predicate; public AvailabilityFilteringRule() { super(); predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } @Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE) public int getAvailableServersCount() { ILoadBalancer lb = getLoadBalancer(); List<Server> servers = lb.getAllServers(); if (servers == null) { return 0; } return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size(); } /** * This method is overridden to provide a more efficient implementation which does not iterate through * all servers. This is under the assumption that in most cases, there are more available instances * than not. */ @Override public Server choose(Object key) { int count = 0; Server server = roundRobinRule.choose(key); while (count++ <= 10) { if (predicate.apply(new PredicateKey(server))) { return server; } server = roundRobinRule.choose(key); } return super.choose(key); } @Override public AbstractServerPredicate getPredicate() { return predicate; } }
顧名思義,可重試的策略。
public class RetryRule extends AbstractLoadBalancerRule { IRule subRule = new RoundRobinRule(); long maxRetryMillis = 500; public RetryRule() { } public RetryRule(IRule subRule) { this.subRule = (subRule != null) ? subRule : new RoundRobinRule(); } public RetryRule(IRule subRule, long maxRetryMillis) { this.subRule = (subRule != null) ? subRule : new RoundRobinRule(); this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500; } public void setRule(IRule subRule) { this.subRule = (subRule != null) ? subRule : new RoundRobinRule(); } public IRule getRule() { return subRule; } public void setMaxRetryMillis(long maxRetryMillis) { if (maxRetryMillis > 0) { this.maxRetryMillis = maxRetryMillis; } else { this.maxRetryMillis = 500; } } public long getMaxRetryMillis() { return maxRetryMillis; } @Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); subRule.setLoadBalancer(lb); } /* * Loop if necessary. Note that the time CAN be exceeded depending on the * subRule, because we're not spawning additional threads and returning * early. */ public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); //超時時間爲 當前時間+500 ms long deadline = requestTime + maxRetryMillis; Server answer = null; //默認的策略是 RoundRobinRule answer = subRule.choose(key); //若是默認策略選出的服務器爲空,或者該服務器狀態爲不存活而且當前時間還在超時時間內 if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); 只要知足條件,一直重試 while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { break; } } task.cancel(); } //若是在最大超時時間內仍未能選出可用的服務器那就返回空 if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } } @Override public Server choose(Object key) { return choose(getLoadBalancer(), key); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { } }
這個策略默認就是用RoundRobinRule策略選取服務,固然能夠經過配置,在構造RetryRule的時候傳進想要的策略。
爲了應對在有可能出現沒法選取出服務的狀況,好比瞬時斷線的狀況,那麼就要提供一種重試機制,在最大重試時間的限定下重複嘗試選取服務,直到選取出一個服務或者超時。
最大重試時間maxRetryMillis是可配置的。
該策略繼承ClientConfigEnabledRoundRobinRule
,在實現中它注入了負載均衡的統計對象LoadBalancerStats
,同時在具體的choose算法中利用LoadBalancerStats
保存的實例統計信息來選擇知足要求的實例。它經過遍歷負載均衡器中的維護的全部實例,會過濾掉故障的實例,並找出併發請求數最小的一個,因此該策略的特性時可選出最空閒的實例。
該算法核心依賴與LoadBalancerStats
統計信息,當其爲空時候策略是沒法執行,默認執行父類的線性輪詢機制。
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule { private LoadBalancerStats loadBalancerStats; @Override public Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key); } //獲取當前全部的服務器信息 List<Server> serverList = getLoadBalancer().getAllServers(); int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; for (Server server: serverList) { //循環每個服務器,獲取當前服務器的統計信息 ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); //若是當前服務器沒有發生故障 if (!serverStats.isCircuitBreakerTripped(currentTime)) { //獲取服務器當前的併發請求量 int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); //若是當前請求量小於minimalConcurrentConnections,就用當前值覆蓋 //那麼最後chosen 就是併發量最小的服務器啦 if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; } } @Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); if (lb instanceof AbstractLoadBalancer) { loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats(); } } }
該策略是com.netflix.loadbalancer.PredicateBasedRule
的具體實現類。它使用了CompositePredicate
來進行服務實例清單的過濾。這是一個組合過濾條件,在其構造函數中,它以ZoneAvoidancePredicate
爲主要過濾條件,判斷斷定一個zone的運行性能是否可用,剔除不可用的zone(全部server),AvailabilityPredicate
爲次要過濾條件,用於過濾掉鏈接數過多的Server,初始化了組合過濾條件的實例。
查看源碼發現,ZoneAvoidanceRule並無重寫choose方法,而是直接使用了父類PredicateBasedRule的choose方法。
public class ZoneAvoidanceRule extends PredicateBasedRule { private static final Random random = new Random(); //使用CompositePredicate來進行服務實例清單過濾。 private CompositePredicate compositePredicate; public ZoneAvoidanceRule() { super(); //判斷一個區域的服務是否可用的過濾條件 ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this); //判斷一個服務的鏈接數是否過多的過濾條件 AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); //將這兩個條件組合到一塊兒 compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); } //這裏構造了一個兩個過濾條件的Predicate private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) { return CompositePredicate.withPredicates(p1, p2) .addFallbackPredicate(p2) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); } static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) { Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>(); for (String zone : lbStats.getAvailableZones()) { ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone); map.put(zone, snapshot); } return map; } static String randomChooseZone(Map<String, ZoneSnapshot> snapshot, Set<String> chooseFrom) { if (chooseFrom == null || chooseFrom.size() == 0) { return null; } String selectedZone = chooseFrom.iterator().next(); if (chooseFrom.size() == 1) { return selectedZone; } int totalServerCount = 0; for (String zone : chooseFrom) { totalServerCount += snapshot.get(zone).getInstanceCount(); } int index = random.nextInt(totalServerCount) + 1; int sum = 0; for (String zone : chooseFrom) { sum += snapshot.get(zone).getInstanceCount(); if (index <= sum) { selectedZone = zone; break; } } return selectedZone; } public static Set<String> getAvailableZones( Map<String, ZoneSnapshot> snapshot, double triggeringLoad, double triggeringBlackoutPercentage) { if (snapshot.isEmpty()) { return null; } Set<String> availableZones = new HashSet<String>(snapshot.keySet()); if (availableZones.size() == 1) { return availableZones; } Set<String> worstZones = new HashSet<String>(); double maxLoadPerServer = 0; boolean limitedZoneAvailability = false; for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) { String zone = zoneEntry.getKey(); ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); int instanceCount = zoneSnapshot.getInstanceCount(); if (instanceCount == 0) { availableZones.remove(zone); limitedZoneAvailability = true; } else { double loadPerServer = zoneSnapshot.getLoadPerServer(); if (((double) zoneSnapshot.getCircuitTrippedCount()) / instanceCount >= triggeringBlackoutPercentage || loadPerServer < 0) { availableZones.remove(zone); limitedZoneAvailability = true; } else { if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) { // they are the same considering double calculation // round error worstZones.add(zone); } else if (loadPerServer > maxLoadPerServer) { maxLoadPerServer = loadPerServer; worstZones.clear(); worstZones.add(zone); } } } } if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) { // zone override is not needed here return availableZones; } String zoneToAvoid = randomChooseZone(snapshot, worstZones); if (zoneToAvoid != null) { availableZones.remove(zoneToAvoid); } return availableZones; } public static Set<String> getAvailableZones(LoadBalancerStats lbStats, double triggeringLoad, double triggeringBlackoutPercentage) { if (lbStats == null) { return null; } Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats); return getAvailableZones(snapshot, triggeringLoad, triggeringBlackoutPercentage); } @Override public AbstractServerPredicate getPredicate() { return compositePredicate; } }
上面的源碼中看到 在構造函數中用兩個過濾條件構造了一個CompositePredicate,那麼它裏面怎麼作的呢?
public class CompositePredicate extends AbstractServerPredicate { private AbstractServerPredicate delegate; private List<AbstractServerPredicate> fallbacks = Lists.newArrayList(); private int minimalFilteredServers = 1; private float minimalFilteredPercentage = 0; @Override public boolean apply(@Nullable PredicateKey input) { return delegate.apply(input); } ... ... ... /** * Get the filtered servers from primary predicate, and if the number of the filtered servers * are not enough, trying the fallback predicates */ @Override public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { ////使用主過濾條件對全部實例過濾並返回過濾後的清單 List<Server> result = super.getEligibleServers(servers, loadBalancerKey); Iterator<AbstractServerPredicate> i = fallbacks.iterator(); //依次使用次過濾條件對主過濾條件的結果進行過濾 //不管是主過濾條件仍是次過濾條件,都須要判斷下面兩個條件 //只要有一個條件符合,就再也不過濾,將當前結果返回供線性輪詢 //算法選擇 //第1個條件:過濾後的實例總數>=最小過濾實例數(默認爲1) //第2個條件:過濾互的實例比例>最小過濾百分比(默認爲0) while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage)) && i.hasNext()) { AbstractServerPredicate predicate = i.next(); result = predicate.getEligibleServers(servers, loadBalancerKey); } return result; }