Spring Cloud Ribbon是一個基於HTTP和TCP的客戶端負載均衡工具,基於Netflix Ribbon實現。java
負載均衡器相關內容見上一篇文章算法
負載均衡策略的抽象類,在該抽象類中定義了負載均衡器ILoadBalancer對象,該對象可以在具體實現選擇服務策略時,獲取到一些負載均衡器中維護的信息做爲分配依據,並以此設計一些算法來實現針對特定場景的高效策略。併發
package com.netflix.loadbalancer; import com.netflix.client.IClientConfigAware; public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware { private ILoadBalancer lb; @Override public void setLoadBalancer(ILoadBalancer lb){ this.lb = lb; } @Override public ILoadBalancer getLoadBalancer(){ return lb; } }
該策略實現了從服務實例清單中隨機選擇一個服務實例的功能。下面先看一下源碼:app
package com.netflix.loadbalancer; import java.util.List; import java.util.Random; import com.netflix.client.config.IClientConfig; public class RandomRule extends AbstractLoadBalancerRule { Random rand; public RandomRule() { rand = new Random(); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } int index = rand.nextInt(serverCount); server = upList.get(index); if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; } @Override public Server choose(Object key) { return choose(getLoadBalancer(), key); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { // TODO Auto-generated method stub } }
分析源碼能夠看出,IRule接口中Server choose(Object key)函數的實現委託給了該類中的Server choose(ILoadBalancer lb, Object key)函數,該方法增長了一個負載均衡器參數。從具體的實現能夠看出,它會使用負載均衡器來得到可用實例列表upList和全部的實例列表allList,而且使用rand.nextInt(serverCount)函數來獲取一個隨機數,並將該隨機數做爲upList的索引值來返回具體實例。同時,具體的選擇邏輯在一個while (server == null)循環以內,而根據選擇邏輯的實現,正常狀況下每次都應該選出一個服務實例,若是出現死循環獲取不到服務實例時,則頗有可能存在併發的Bug。負載均衡
該策略實現了按照線性輪詢的方式依次選擇每一個服務實例的功能。下面看一下源碼:less
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class RoundRobinRule extends AbstractLoadBalancerRule { private AtomicInteger nextServerCyclicCounter; public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } /** * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}. * * @param modulo The modulo to bound the value of the counter. * @return The next value. */ private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } } }
RoundRobinRule具體實現和RandomRule相似,可是循環條件和從可用列表獲取實例的邏輯不一樣。循環條件中增長了一個count計數變量,該變量會在每次循環以後累加,若是循環10次還沒獲取到Server,就會結束,並打印一個警告信息No available alive servers after 10 tries from load balancer:...。dom
線性輪詢的實現是經過AtomicInteger nextServerCyclicCounter對象實現,每次進行實例選擇時經過調用int incrementAndGetModulo(int modulo)方法來實現。ide
該策略實現了一個具有重試機制的實例選擇功能。從源碼中能夠看出,內部定義了一個IRule對象,默認是RoundRobinRule實例,choose方法中則實現了對內部定義的策略進行反覆嘗試的策略,若期間可以選擇到具體的服務實例就返回,若選擇不到而且超過設置的嘗試結束時間(maxRetryMillis參數定義的值 + choose方法開始執行的時間戳)就返回null。函數
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; public class RetryRule extends AbstractLoadBalancerRule { IRule subRule = new RoundRobinRule(); long maxRetryMillis = 500; /* * Loop if necessary. Note that the time CAN be exceeded depending on the * subRule, because we're not spawning additional threads and returning * early. */ public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + maxRetryMillis; Server answer = null; answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { break; } } task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } } }
該策略是對RoundRobinRule的擴展,增長了根據實例的運行狀況來計算權重,並根據權重來挑選實例,以達到更優的分配效果。它的實現主要有三個核心內容。微服務
WeightedResponseTimeRule策略在初始化的時候會經過serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval)啓動一個定時任務,用來爲每一個服務實例計算權重,該任務默認30s執行一次。
在源碼中咱們能夠輕鬆找到用於存儲權重的對象private volatile List<Double> accumulatedWeights = new ArrayList<Double>();該List中每一個權重值所處的位置對應了負載均衡器維護的服務實例清單中全部實例在清單中的位置。下面看一下權重計算函數maintainWeights的源碼:
public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } double totalResponseTime = 0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // weight for each server is (sum of responseTime of all servers - responseTime) // so that the longer the response time, the less the weight and the less likely to be chosen Double weightSoFar = 0.0; // create new list and hot swap the reference List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } }
該方法的實現主要分爲兩個步驟:
經過概算計算出來的權重值只是表明了各實例權重區間的上限。下面圖節選自Spring Cloud 微服務實戰。
下面看一下Server choose(ILoadBalancer lb, Object key)如何選擇Server的
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { // get hold of the current reference in case it is changed from the other thread List<Double> currentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // last one in the list is the sum of all weights double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // No server has been hit yet and total weight is not initialized // fallback to use round robin if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive) double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Next. server = null; } return server; }
下面咱們看一下源碼的主要步驟有:
細心的可能會發現第一個服務實例的權重區間是雙閉,最後一個服務實例的權重區間是雙開,其餘服務實例的區間都是左開右閉。這是由於隨機數的最小值能夠爲0,因此第一個實例下限是閉區間,同時隨機數的最大值取不到最大權重值,因此最後一個實例的上限是開區間。
該策略比較特殊,通常不直接使用它。由於他自己並無實現特殊的處理邏輯,在他內部定義了一個RoundRobinRule策略,choose函數的實現其實就是採用了RoundRobinRule的線性輪詢機制。
在實際開發中,咱們並不會直接使用該策略,而是基於它作高級策略擴展。
該策略繼承自ClientConfigEnabledRoundRobinRule,在實現中它注入了負載均衡器的統計對象LoadBalancerStats,同時在choose方法中利用LoadBalancerStats保存的實例統計信息來選擇知足要求的服務實例。
當LoadBalancerStats爲空時,會使用RoundRobinRule線性輪詢策略,當有LoadBalancerStats時,會經過遍歷負載均衡器中維護的全部服務實例,會過濾掉故障的實例,並找出併發請求數最小的一個。
該策略的特性是能夠選出最空閒的服務實例。
這是一個抽象策略,它繼承了ClientConfigEnabledRoundRobinRule,從命名中能夠猜出這是一個基於Predicate實現的策略,Predicate是Google Guava Collection工具對集合進行過濾的條件接口。
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }
在該源碼中,它定義了一個抽象函數getPredicate來獲取AbstractServerPredicate對象的實現,在choose方法中,經過AbstractServerPredicate的chooseRoundRobinAfterFiltering函數來選擇具體的服務實例。從該方法的命名咱們能夠看出大體的邏輯:首先經過子類中實現的Predicate邏輯來過濾一部分服務實例,而後再以線性輪詢的方式從過濾後的實例清單中選出一個。
在上面choose函數中調用的chooseRoundRobinAfterFiltering方法先經過內部定義的getEligibleServers函數來獲取備選的實例清單(實現了過濾),若是返回的清單爲空,則用Optional.absent來表示不存在,反之則以線性輪詢的方式從備選清單中獲取一個實例。
下面看一下getEligibleServers方法的源碼
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List<Server> results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } }
上述源碼的大體邏輯是遍歷服務清單,使用this.apply方法來判斷實例是否須要保留,若是是就添加到結果列表中。
實際上,AbstractServerPredicate實現了com.google.common.base.Predicate接口,apply方法是接口中的定義,主要用來實現過濾條件的判斷邏輯,它輸入的參數則是過濾條件須要用到的一些信息(好比源碼中的new PredicateKey(loadBalancerKey, server)),傳入了關於實例的統計信息和負載均衡器的選擇算法傳遞過來的key。
AbstractServerPredicate沒有apply的實現,因此這裏的chooseRoundRobinAfterFiltering方法只是定義了一個模板策略:先過濾清單,再輪詢選擇。
對於如何過濾,須要在AbstractServerPredicate的子類中實現apply方法來肯定具體的過濾策略。
&emsps;該類繼承自PredicateBasedRule,遵循了先過濾清單,再輪詢選擇的基本處理邏輯,其中過濾條件使用了AvailabilityPredicate,下面看一下AvailabilityPredicate的源碼:
package com.netflix.loadbalancer; import javax.annotation.Nullable; import com.netflix.client.config.IClientConfig; import com.netflix.config.ChainedDynamicProperty; import com.netflix.config.DynamicBooleanProperty; import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; public class AvailabilityPredicate extends AbstractServerPredicate { @Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; } }
從上面的源碼能夠看出,主要過的過濾邏輯都是在boolean shouldSkipServer(ServerStats stats)方法中實現,該方法主要判斷服務實例的兩項內容:
上面兩項只要知足一項,apply方法就返回false,表明該服務實例可能存在故障或負載太高,都不知足就返回true。
在AvailabilityFilteringRule進行實例選擇時作了小小的優化,它並無向父類同樣先遍歷全部的節點進行過濾,而後在過濾後的集合中選擇實例。而是先以線性的方式選擇一個實例,接着使用過濾條件來判斷該實例是否知足要求,若知足就直接使用該實例,若不知足要求就再選擇下一個實例,檢查是否知足要求,這個過程循環10次若是尚未找到合適的服務實例,就採用父類的實現方案。
該策略經過線性輪詢的方式直接嘗試尋找可用且比較空閒的實例來用,優化了每次都要遍歷全部實例的開銷。
該類也是PredicateBasedRule的子類,它的實現是經過組合過濾條件CompositePredicate,以ZoneAvoidancePredicate爲主過濾條件,以AvailabilityPredicate爲次過濾條件。
ZoneAvoidanceRule的實現並無像AvailabilityFilteringRule重寫choose函數來優化,因此它遵循了先過濾清單再輪詢選擇的基本邏輯。
下面看一下CompositePredicate的源碼
package com.netflix.loadbalancer; import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Lists; public class CompositePredicate extends AbstractServerPredicate { private AbstractServerPredicate delegate; private List<AbstractServerPredicate> fallbacks = Lists.newArrayList(); private int minimalFilteredServers = 1; private float minimalFilteredPercentage = 0; @Override public boolean apply(@Nullable PredicateKey input) { return delegate.apply(input); } @Override public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { List<Server> result = super.getEligibleServers(servers, loadBalancerKey); Iterator<AbstractServerPredicate> i = fallbacks.iterator(); while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage)) && i.hasNext()) { AbstractServerPredicate predicate = i.next(); result = predicate.getEligibleServers(servers, loadBalancerKey); } return result; } }
從源碼中能夠看出,CompositePredicate定義了一個主過濾條件delegate和一組過濾條件列表fallbacks,次過濾條件的過濾順序是按存儲順序執行的。
在獲取結果的getEligibleServers函數中的主要邏輯是:
後面會介紹Spring Cloud Ribbon配置方式,請持續關注!!!