接上篇http://www.javashuo.com/article/p-poudvoae-u.htmljava
既然有集羣容錯,天然會有負載均衡。dubbo經過spi默認實現了4種lb策略
分別是
權重隨機(random),實現類RandomLoadBalance
權重輪詢(roundrobin),實現類RoundRobinLoadBalance
最少活躍(leastactive)負載策略,實現類LeastActiveLoadBalance
一致性hash(consistenthash)實現類ConsistentHashLoadBalance
類關係圖:算法
4種實現都擴展了抽象類AbstractLoadBalance,
並實現了doSelect抽象方法,
這點和集羣容錯結構使用了一樣的設計模式,這個doSelect方法在AbstractLoadBalance的select方法中被調用,select方法也是接口LoadBalance的惟一方法,是負載均衡的實現方法。設計模式
代碼以下:數組
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); //回調子類的doSelect實現,實現具體的lb策略 return doSelect(invokers, url, invocation); }
dubbo負載均衡,默認是隨機(random)
這個可經過上篇提到的AbstractClusterInvoker的invoke方法實現看到,代碼:
負載均衡
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { //從url經過key "loadbalance" 取不到值,就取默認random隨機策略 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { //取默認random隨機策略 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
可是,這篇只說,最少活躍(leastactive)負載策略。dom
首先想說的是,要理解最少活躍數負載策略,就要先弄明白這裏的最少活躍數,指的是什麼數
先看實現代碼:ide
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 總個數 int leastActive = -1; // 最小的活躍數 int leastCount = 0; // 相同最小活躍數的個數 int[] leastIndexs = new int[length]; // 相同最小活躍數的下標 int totalWeight = 0; // 總權重 int firstWeight = 0; // 第一個權重,用於於計算是否相同 boolean sameWeight = true; // 是否全部權重相同 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 活躍數是經過RpcStatus,getStatus(invoker.getUrl(), invocation.getMethodName()).getActive()獲取的。 // 能夠先跳過去看下文的RpcStatus類解讀 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權重 if (leastActive == -1 || active < leastActive) { // 發現更小的活躍數,從新開始 leastActive = active; // 記錄最小活躍數 leastCount = 1; // 從新統計相同最小活躍數的個數 leastIndexs[0] = i; // 從新記錄最小活躍數下標 totalWeight = weight; // 從新累計總權重 firstWeight = weight; // 記錄第一個權重 sameWeight = true; // 還原權重相同標識 } else if (active == leastActive) { // 累計相同最小的活躍數 leastIndexs[leastCount ++] = i; // 累計相同最小活躍數下標 totalWeight += weight; // 累計總權重 // 判斷全部權重是否同樣 if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // 若是隻有一個最小則直接返回 return invokers.get(leastIndexs[0]); } if (! sameWeight && totalWeight > 0) { // 若是權重不相同且權重大於0則按總權重數隨機 int offsetWeight = random.nextInt(totalWeight); // 並肯定隨機值落在哪一個片段上 for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; //這裏getWeight獲得權重,不必定就是配置的,它兼容了java的warmup問題, //大概意思是,若是warmup時間設置爲10分鐘,權重配置爲100, //而當前服務只啓動了1分鐘,那麼這個方法爲計算出一個值爲10的新權值 //這其實,這會有個小問題的,應爲上面計算的totalWeight是沒有按warmup降權的, //因此,按目前落在哪一個片斷上的算法,有可能一個也選不到。特別是服務剛啓動時。 offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // 若是權重相同或權重爲0則均等隨機 return invokers.get(leastIndexs[random.nextInt(leastCount)]); }
這個方法,就是把invokers裏,有最小活躍數的invoker(一個或多個)的下標,記錄到leastIndexs數組裏。
若是隻有一個,就直接返回,不用選了。若是有多個,再計算這其中每一個的invoker的權重。
若是權重同樣,就均等隨機選一個。
若是權重不同,就再按權重隨機(random策略)從中選一個。url
RpcStatus類,它是url統計類,有如下屬性.net
//私有靜態map,存調用統計信息用的 private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>(); private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>(); //具體表明各個調用指標統計值 private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>(); //活躍數 private final AtomicInteger active = new AtomicInteger(); private final AtomicLong total = new AtomicLong(); private final AtomicInteger failed = new AtomicInteger(); private final AtomicLong totalElapsed = new AtomicLong(); private final AtomicLong failedElapsed = new AtomicLong(); private final AtomicLong maxElapsed = new AtomicLong(); private final AtomicLong failedMaxElapsed = new AtomicLong(); private final AtomicLong succeededMaxElapsed = new AtomicLong();
上文提到的鏈接數是經過下面方法獲得
public int getActive() {
return active.get();
}
而能改變這個active值的只有下面兩個方法
private static void beginCount(RpcStatus status) {
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
}設計
這兩個方法,何時被調用的呢
經過源碼find usages發如今ActiveLimitFilter和ExecuteLimitFilter兩個過濾器中調用的。
經過註解知道,ExecuteLimitFilter是服務端過濾器,ActiveLimitFilter是客戶端過濾器(之後能夠寫專門介紹過濾器的)
咱們這邊是調用方,應該用ActiveLimitFilter。而啓用這個過濾器,則須要在調用方應用上配置filter="activelimit"
因爲dubbo默認調用是沒有啓用這個過濾器的,因此要想使用最少活躍(leastactive)負載策略,須要配置啓用這個activelimit過濾器。看下過濾器,惟一一個方法:
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (max > 0) { long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); if (active >= max) { synchronized (count) { while ((active = count.getActive()) >= max) { try { count.wait(remain); } catch (InterruptedException e) { } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } try { //業務方法調用前,調用beginCoun long begin = System.currentTimeMillis(); RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); //調用成功後,返回後,調用endCount RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result; } catch (RuntimeException t) { //調用失敗後結束統計 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); throw t; } } finally { if(max>0){ synchronized (count) { count.notify(); } } } }
再回頭看下兩個方法的具體實現:
/** * @param url */ public static void beginCount(URL url, String methodName) { //dubbo這裏,把調用的url或方法名作key ,RpcStatus對象做爲value是方法,靜態map屬性裏 //經過這樣把調用信息存起來。 //它能夠統計一個url被調用的信息,也能夠記錄一個url裏某個方法被調用的統計信息 beginCount(getStatus(url)); beginCount(getStatus(url, methodName)); } private static void beginCount(RpcStatus status) { status.active.incrementAndGet();//active值加1 } //beginCount的做用,能夠理解某個方法調用前,它對應的active數目加1 private static void endCount(RpcStatus status, long elapsed, boolean succeeded) { status.active.decrementAndGet();//某個方法正調用結束,它對應的active減一 status.total.incrementAndGet(); status.totalElapsed.addAndGet(elapsed); if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } if (succeeded) { if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { status.failed.incrementAndGet(); status.failedElapsed.addAndGet(elapsed); if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } } //endCount的做用,能夠理解某個方法調用結束後,它對應的active數目減1
因此,這個active數目就是表示,某個方法當前有多少正在執行(開始調用,但尚未返回) 也能夠說最少活躍(leastactive)負載策略,就選擇那些返回比較快的主機,或者本機調用較少的主機。