dubbo負載均衡代碼分析1(leastactive策略)

接上篇http://www.javashuo.com/article/p-poudvoae-u.htmljava

既然有集羣容錯,天然會有負載均衡。dubbo經過spi默認實現了4種lb策略
分別是
權重隨機(random),實現類RandomLoadBalance
權重輪詢(roundrobin),實現類RoundRobinLoadBalance
最少活躍(leastactive)負載策略,實現類LeastActiveLoadBalance
一致性hash(consistenthash)實現類ConsistentHashLoadBalance
類關係圖:算法

 

4種實現都擴展了抽象類AbstractLoadBalance,
並實現了doSelect抽象方法,
這點和集羣容錯結構使用了一樣的設計模式,這個doSelect方法在AbstractLoadBalance的select方法中被調用,select方法也是接口LoadBalance的惟一方法,是負載均衡的實現方法。設計模式

代碼以下:數組

public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.size() == 0)
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        //回調子類的doSelect實現,實現具體的lb策略
        return doSelect(invokers, url, invocation);
    }

dubbo負載均衡,默認是隨機(random)
這個可經過上篇提到的AbstractClusterInvoker的invoke方法實現看到,代碼:
 負載均衡

public Result invoke(final Invocation invocation) throws RpcException {

        checkWhetherDestroyed();

        LoadBalance loadbalance;

        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            //從url經過key "loadbalance" 取不到值,就取默認random隨機策略
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            //取默認random隨機策略
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

可是,這篇只說,最少活躍(leastactive)負載策略。dom

首先想說的是,要理解最少活躍數負載策略,就要先弄明白這裏的最少活躍數,指的是什麼數
先看實現代碼:ide

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 總個數
        int leastActive = -1; // 最小的活躍數
        int leastCount = 0; // 相同最小活躍數的個數
        int[] leastIndexs = new int[length]; // 相同最小活躍數的下標
        int totalWeight = 0; // 總權重
        int firstWeight = 0; // 第一個權重,用於於計算是否相同
        boolean sameWeight = true; // 是否全部權重相同
        for (int i = 0; i < length; i++) {
        	Invoker<T> invoker = invokers.get(i);
	 // 活躍數是經過RpcStatus,getStatus(invoker.getUrl(), invocation.getMethodName()).getActive()獲取的。
	 // 能夠先跳過去看下文的RpcStatus類解讀
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); 
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權重
            if (leastActive == -1 || active < leastActive) { // 發現更小的活躍數,從新開始
                leastActive = active; // 記錄最小活躍數
                leastCount = 1; // 從新統計相同最小活躍數的個數
                leastIndexs[0] = i; // 從新記錄最小活躍數下標
                totalWeight = weight; // 從新累計總權重
                firstWeight = weight; // 記錄第一個權重
                sameWeight = true; // 還原權重相同標識
            } else if (active == leastActive) { // 累計相同最小的活躍數
                leastIndexs[leastCount ++] = i; // 累計相同最小活躍數下標
                totalWeight += weight; // 累計總權重
                // 判斷全部權重是否同樣
                if (sameWeight && i > 0 
                        && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount > 0)
        if (leastCount == 1) {
            // 若是隻有一個最小則直接返回
            return invokers.get(leastIndexs[0]);
        }
        if (! sameWeight && totalWeight > 0) {
            // 若是權重不相同且權重大於0則按總權重數隨機
            int offsetWeight = random.nextInt(totalWeight);
            // 並肯定隨機值落在哪一個片段上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
		//這裏getWeight獲得權重,不必定就是配置的,它兼容了java的warmup問題,
		//大概意思是,若是warmup時間設置爲10分鐘,權重配置爲100,
		//而當前服務只啓動了1分鐘,那麼這個方法爲計算出一個值爲10的新權值
		//這其實,這會有個小問題的,應爲上面計算的totalWeight是沒有按warmup降權的,
		//因此,按目前落在哪一個片斷上的算法,有可能一個也選不到。特別是服務剛啓動時。
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 若是權重相同或權重爲0則均等隨機
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

這個方法,就是把invokers裏,有最小活躍數的invoker(一個或多個)的下標,記錄到leastIndexs數組裏。
若是隻有一個,就直接返回,不用選了。若是有多個,再計算這其中每一個的invoker的權重。
若是權重同樣,就均等隨機選一個。
若是權重不同,就再按權重隨機(random策略)從中選一個。url

RpcStatus類,它是url統計類,有如下屬性.net

//私有靜態map,存調用統計信息用的
    private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
    //具體表明各個調用指標統計值
    private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
    //活躍數
    private final AtomicInteger active = new AtomicInteger();
    private final AtomicLong total = new AtomicLong();
    private final AtomicInteger failed = new AtomicInteger();
    private final AtomicLong totalElapsed = new AtomicLong();
    private final AtomicLong failedElapsed = new AtomicLong();
    private final AtomicLong maxElapsed = new AtomicLong();
    private final AtomicLong failedMaxElapsed = new AtomicLong();
    private final AtomicLong succeededMaxElapsed = new AtomicLong();

  上文提到的鏈接數是經過下面方法獲得
  public int getActive() {
        return active.get();
  }
  而能改變這個active值的只有下面兩個方法
   private static void beginCount(RpcStatus status) {
    }
  private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
    }設計

這兩個方法,何時被調用的呢
經過源碼find usages發如今ActiveLimitFilter和ExecuteLimitFilter兩個過濾器中調用的。
經過註解知道,ExecuteLimitFilter是服務端過濾器,ActiveLimitFilter是客戶端過濾器(之後能夠寫專門介紹過濾器的)
咱們這邊是調用方,應該用ActiveLimitFilter。而啓用這個過濾器,則須要在調用方應用上配置filter="activelimit"
因爲dubbo默認調用是沒有啓用這個過濾器的,因此要想使用最少活躍(leastactive)負載策略,須要配置啓用這個activelimit過濾器。看下過濾器,惟一一個方法:

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            if (active >= max) {
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                                   + invoker.getInterface().getName() + ", method: "
                                                   + invocation.getMethodName() + ", elapsed: " + elapsed
                                                   + ", timeout: " + timeout + ". concurrent invokes: " + active
                                                   + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        try {
	    //業務方法調用前,調用beginCoun
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
		//調用成功後,返回後,調用endCount
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
	        //調用失敗後結束統計
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if(max>0){
                synchronized (count) {
                    count.notify();
                } 
            }
        }
    }

 再回頭看下兩個方法的具體實現:

/**
     * @param url
     */
    public static void beginCount(URL url, String methodName) {
    	//dubbo這裏,把調用的url或方法名作key ,RpcStatus對象做爲value是方法,靜態map屬性裏
	//經過這樣把調用信息存起來。
	//它能夠統計一個url被調用的信息,也能夠記錄一個url裏某個方法被調用的統計信息
        beginCount(getStatus(url));
        beginCount(getStatus(url, methodName));
    }
    
    private static void beginCount(RpcStatus status) {
        status.active.incrementAndGet();//active值加1
    }

   //beginCount的做用,能夠理解某個方法調用前,它對應的active數目加1
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
        status.active.decrementAndGet();//某個方法正調用結束,它對應的active減一
        status.total.incrementAndGet();
        status.totalElapsed.addAndGet(elapsed);
        if (status.maxElapsed.get() < elapsed) {
            status.maxElapsed.set(elapsed);
        }
        if (succeeded) {
            if (status.succeededMaxElapsed.get() < elapsed) {
                status.succeededMaxElapsed.set(elapsed);
            }
        } else {
            status.failed.incrementAndGet();
            status.failedElapsed.addAndGet(elapsed);
            if (status.failedMaxElapsed.get() < elapsed) {
                status.failedMaxElapsed.set(elapsed);
            }
        }
    }
//endCount的做用,能夠理解某個方法調用結束後,它對應的active數目減1

因此,這個active數目就是表示,某個方法當前有多少正在執行(開始調用,但尚未返回) 也能夠說最少活躍(leastactive)負載策略,就選擇那些返回比較快的主機,或者本機調用較少的主機。

相關文章
相關標籤/搜索