源碼分析Dubbo負載算法

Dubbo支持在服務調用方對服務提供者採用負載均衡算法,LoadBalance 接口定義以下:node

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

	/**
	 * select one invoker in list.
	 * 
	 * @param invokers invokers.
	 * @param url refer url
	 * @param invocation invocation.
	 * @return selected invoker.
	 */
    @Adaptive("loadbalance")
	<t> Invoker<t> select(List<invoker<t>&gt; invokers, URL url, Invocation invocation) throws RpcException;
}

從中透露出以下幾個信息: 默認若是不配置,使用RandomLoadBalance策略(加權隨機負載算法)。整個Dubbo的負載均衡類圖以下所示: 這裏寫圖片描述 上述各類路由負載策略,對應的配置值以下:dubbo-cluster\src\main\resources\META-INF\dubbo\internal\com.alibaba.dubbo.rpc.cluster.LoadBalance算法

  • random random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance緩存

  • roundrobin roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance數據結構

  • leastactive leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance架構

  • consistenthash consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance 其配置使用,一般通常在< dubbo:consumer/>、< dubbo:service />、< dubbo:reference />的loadbalance屬性配置,一般< dubbo:consumer/>這個屬性指定消費端的默認策略,某些服務須要指定特殊負載均衡策略的話,通常經過< dubbo:reference />來指定。 若是各位對其源碼實現比較有興趣的話,能夠看接下來的部分,源碼分析各類負載算法的具體實現細節。併發

一、源碼分析ConsistentHashLoadBalance(一致性Hash算法)

這裏寫圖片描述 一致Hash算法,一般用在緩存領域,主要解決的問題是當數據節點數量發送變化後,儘可能減小數據的遷移,在負責算法領域,我的不建議使用。Dubbo一致性Hash算法的實現邏輯主要分佈在ConsistentHashLoadBalance$ConsistentHashSelector中。app

1.1 核心屬性與構造方法

private final TreeMap<long, invoker<t>&gt; virtualInvokers;
private final int                       replicaNumber;
private final int                       identityHashCode;
private final int[]                     argumentIndex;

TreeMap< Long, Invoker< T>> virtualInvokers:虛擬節點,使用TreeMap實現Hash環,將Invoker分佈在環上。 int replicaNumber:虛擬節點個數。 int identityHashCode:HashCode。 int[] argumentIndex:須要參與hash的參數索引,,argumentIndex = [0,1]表示服務方法的第一個,第二個參數參與hashcode計算。 接下來看一下其構造方法:負載均衡

public ConsistentHashSelector(List<invoker<t>&gt; invokers, String methodName, int identityHashCode) {
    this.virtualInvokers = new TreeMap<long, invoker<t>&gt;();
    this.identityHashCode = System.identityHashCode(invokers);    // @1
    URL url = invokers.get(0).getUrl();
    this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);   // @2
    String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));   // @3 start
    argumentIndex = new int[index.length];
    for (int i = 0; i &lt; index.length; i ++) {
          argumentIndex[i] = Integer.parseInt(index[i]);
    }  // @3 end
    for (Invoker<t> invoker : invokers) {    // @4
         for (int i = 0; i &lt; replicaNumber / 4; i++) {
               byte[] digest = md5(invoker.getUrl().toFullString() + i);
               for (int h = 0; h &lt; 4; h++) { 
                     long m = hash(digest, h);
                     virtualInvokers.put(m, invoker);  
                }
           }
      } // @4 end
    }

代碼@1:根據全部的調用者生成一個HashCode,用該HashCode值來判斷服務提供者是否發生了變化。dom

代碼@2:獲取服務提供者< dubbo:method/>標籤的hash.nodes屬性,若是爲空,默認爲160,表示一致性hash算法中虛擬節點數量。其配置方式以下: < dubbo:method ... > < dubbo:parameter key="hash.nodes" value="160" /> < dubbo:parameter key="hash.arguments" value="0,1" /> < /dubbo:method/>分佈式

代碼@3:一致性Hash算法,在dubbo中,相同的服務調用參數走固定的節點,hash.arguments表示哪些參數參與hashcode,默認值「0」,表示第一個參數。

代碼@4:爲每個Invoker建立replicaNumber 個虛擬節點,每個節點的Hashcode不一樣。同一個Invoker不一樣hashcode的建立邏輯爲: invoker.getUrl().toFullString() + i (0-39)的值,對其md5,而後用該值+h(0-3)的值取hash。一致性hash實現的一個關鍵是若是將一個Invoker建立的replicaNumber 個虛擬節點(hashcode)可以均勻分佈在Hash環上,Dubbo給出的實現以下,因爲能力有限,目前並未真正理解以下方法的實現依據:

private long hash(byte[] digest, int number) {
            return (((long) (digest[3 + number * 4] &amp; 0xFF) &lt;&lt; 24)
                    | ((long) (digest[2 + number * 4] &amp; 0xFF) &lt;&lt; 16)
                    | ((long) (digest[1 + number * 4] &amp; 0xFF) &lt;&lt; 8)
                    | (digest[number * 4] &amp; 0xFF))
                    &amp; 0xFFFFFFFFL;
        }

綜上所述,構造函數主要完成一致性Hash算法Hash環的構建,利用了TreeMap的有序性來實現。

1.2 源碼分析public Invoker< T> select(Invocation invocation):根據調用環境根據一致性Hash算法選擇一個Invoker

public Invoker<t> select(Invocation invocation) {
     String key = toKey(invocation.getArguments());   // @1
     byte[] digest = md5(key);                                      // @2
     return selectForKey(hash(digest, 0));                   // @3
}

代碼@1:根據調用參數,並根據hash.arguments配置值,獲取指定的位置的參數值,追加一塊兒返回。

代碼@2:對Key進行md5簽名。

代碼@3:根據key進行選擇調用者。

1.2.1 ConsistentHashLoadBalance$ConsistentHashSelector#selectForKey
private Invoker<t> selectForKey(long hash) {
     Map.Entry<long, invoker<t>&gt; entry = virtualInvokers.tailMap(hash, true).firstEntry();    // @1
     if (entry == null) {    // @2
     	entry = virtualInvokers.firstEntry();
     }
     return entry.getValue();   // @3
}

代碼@1,對虛擬節點,從virtualInvokers中選取一個子集,subMap(hash,ture,lastKey,true),其實就是實現根據待查找hashcode(key)順時針,選中大於等於指定key的第一個key。

代碼@2,若是未找到,則返回virtualInvokers第一個key。

代碼@3:根據key返回指定的Invoker便可。

這裏實現,應該能夠不使用tailMap,代碼修改以下:

private Invoker<t> selectForKey(long hash) {
     Map.Entry<long, invoker<t>&gt; entry = virtualInvokers.ceilingEntry(hash);
     if(entry == null ) {
     	entry = virtualInvokers.firstEntry();
     }
     return entry.getValue();
}

若是想要了解TreeMap關於這一塊的特性(tailMap、ceillingEntry、headMap)等API的詳細解釋,能夠查看個人另一篇博文:https://blog.csdn.net/prestigeding/article/details/80821576

二、源碼分析RandomLoadBalance實現細節(隨機從調用者列表中選擇一個Invoker)

2.1 Dubbo預熱機制(權重)

因爲roundrobin(加權輪詢)、random(加權隨機)、leastactive(最小活躍鏈接數)都與權重有關係,在介紹這兩種負載均衡算法以前,咱們首先看一下Dubbo關於權重的獲取邏輯,代碼見AbstractLoadBalance#getWeigh方法:

protected int getWeight(Invoker<!--?--> invoker, Invocation invocation) {
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);   // @1
        if (weight &gt; 0) {
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);  // @2
            if (timestamp &gt; 0L) {
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);  // @3
                if (uptime &gt; 0 &amp;&amp; uptime &lt; warmup) {
                    weight = calculateWarmupWeight(uptime, warmup, weight);   // @4
                }
            }
        }
        return weight;
    }

代碼@1:首先獲取服務提供者的權重(weight)。

代碼@2:獲取服務提供者的啓動時間,在服務提供者啓動時,會將啓動時間戳存儲在服務提供者的URL中,在服務發現(RegistryDirecotry)服務發現時,會將服務提供者的時間戳KEY,換成REMOTE_TIMESTAMP_KEY,避免與服務消費者的啓動時間戳衝突。

代碼@3:獲取服務提供者是否開啓預熱機制,經過服務提供者< dubbo:service warmup=""/>參數來設置,若是未設置,去默認值10 * 60 * 1000(10分鐘)。

代碼@4:若是服務提供者啓動時間小於預熱時間(預熱期間),須要根據啓動時間,來計算預熱期間服務提供者的權重。

AbstractLoadBalance#calculateWarmupWeight

static int calculateWarmupWeight(int uptime, int warmup, int weight) { // @1
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        return ww &lt; 1 ? 1 : (ww &gt; weight ? weight : ww);
    }

代碼@1:參數說明,uptime:服務提供者啓動時間;warmup:設置的預熱時間;weight:服務提供者的權重,該方法在uptime < warmup時被調用 該方法的實現,就是在預熱期間,根據啓動時間,動態返回該服務提供者的權重,而且啓動時間越長,返回的權重越接近weight,啓動時間超過預熱時間,則直接返回weight。

該方法單元測試: 這裏寫圖片描述

其輸出結果: 這裏寫圖片描述

2.2 RandomLoadBalance 加權隨機算法實現分析

protected <t> Invoker<t> doSelect(List<invoker<t>&gt; invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers                    
        int totalWeight = 0; // The sum of weights       // @1 start
        boolean sameWeight = true; // Every invoker has the same weight?
        for (int i = 0; i &lt; length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // Sum
            if (sameWeight &amp;&amp; i &gt; 0
                    &amp;&amp; weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false;
            }
        }   // @1 end
        if (totalWeight &gt; 0 &amp;&amp; !sameWeight) {    // @2
            // If (not every invoker has the same weight &amp; at least one invoker's weight&gt;0), select randomly based on totalWeight.
            int offset = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i &lt; length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset &lt; 0) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(random.nextInt(length));  // @3
    }

代碼@1:首先求全部服務提供者的總權重,並判斷每一個服務提供者的權重是否相同。

代碼@2:若是提供者之間的權重不相同,則產生一個隨機數(0-totalWeight),視爲offset,而後依次用offset減去服務提供者的權重,若是減去(offset - provider.weight < 0),則該invoker命中。

代碼@3:若是服務提供者的權重相同,則隨機產生[0-invoker.size)便可。

2.3 RoundRobinLoadBalance 加權輪詢算法分析

加權輪詢算法的核心算法是按權重輪詢,一個基本點是應該是一個當前序號與服務提供者數量取模,須要結合權重。Dubbo使用以下數據結構存儲當前序號:

private final ConcurrentMap<string, atomicpositiveinteger> sequences = new ConcurrentHashMap<string, atomicpositiveinteger>();鍵值:serviceKey(<dubbo:service interface="" />+ methodname),每一個方法採用不一樣的計數器。
RoundRobinLoadBalance #doSelect
protected <t> Invoker<t> doSelect(List<invoker<t>&gt; invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();    // @1
        int length = invokers.size(); // Number of invokers
        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<invoker<t>, IntegerWrapper&gt; invokerToWeightMap = new LinkedHashMap<invoker<t>, IntegerWrapper&gt;();   // @2 start
        int weightSum = 0;
        for (int i = 0; i &lt; length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight &gt; 0) {
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                weightSum += weight;
            }
        }   // @2 end 
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        int currentSequence = sequence.getAndIncrement();    // @3
        if (maxWeight &gt; 0 &amp;&amp; minWeight &lt; maxWeight) {   // @4
            int mod = currentSequence % weightSum;
            for (int i = 0; i &lt; maxWeight; i++) {
                for (Map.Entry<invoker<t>, IntegerWrapper&gt; each : invokerToWeightMap.entrySet()) {
                    final Invoker<t> k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    if (mod == 0 &amp;&amp; v.getValue() &gt; 0) {
                        return k;
                    }
                    if (v.getValue() &gt; 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // Round robin
        return invokers.get(currentSequence % length);   // @5
    }

代碼@1:構建ConcurrentMap< String, AtomicPositiveInteger> sequences中的key,以interface+methodname爲鍵,裏面存儲的是當前序號(輪詢)。 代碼@2:構建LinkedHashMap< Invoker< T>, IntegerWrapper>存儲結構,經過遍歷全部Invoker,構建每一個Invoker的權重,與此同時算出總權重,而且得出全部服務提供者權重是否相同。

代碼@3:獲取當前的輪詢序號,用於取模。

代碼@4:若是服務提供者之間的權重有差異,須要按權重輪詢,實現方式是: 1)用當前輪詢序號與服務提供者總權重取模,餘數爲mod。 2)而後從0循環直到最大權重,針對每一次循環,按同一順序遍歷全部服務提供者,若是mod等於0而且對應的Invoker的權重計算器大於0,則選擇該服務提供者;不然,mod--,invoker對應的權重減一,權重是臨時比那裏LinkedHashMap< Invoker< T>, IntegerWrapper>。因爲外層循環的次數爲全部服務提供者的最大權重,內層循環當mod等於0時,確定會有一個服務提供者的權重計數器大於0,而返回對應的服務提供者。返回的服務提供者是第一個知足的服務提供者,後續的服務提供者在下一次就會有機會, 由於下一次mod會增大1,後續的服務提供者經過輪詢會被選擇,選擇的機會,取決於權重的大小。

代碼@5:若是各服務提供者權重相同,則直接對服務提供者取模便可,輪詢後遞增。

2.4 LeastActiveLoadBalance 最少活躍鏈接數負載均衡算法分析

最小活躍鏈接數,其核心實現就是,首先找到服務提供者當前最小的活躍鏈接數,若是一個服務提供者的服務鏈接數比其餘的都要小,則選擇這個活躍鏈接數最小的服務提供者發起調用,若是存在多個服務提供者的活躍鏈接數,而且是最小的,則在這些服務提供者之間選擇加權隨機算法選擇一個服務提供者。

protected <t> Invoker<t> doSelect(List<invoker<t>&gt; invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers                                                                                            // @1 start
        int leastActive = -1; // The least active value of all invokers
        int leastCount = 0; // The number of invokers having the same least active value (leastActive)
        int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
        int totalWeight = 0; // The sum of weights
        int firstWeight = 0; // Initial value, used for comparision
        boolean sameWeight = true; // Every invoker has the same weight value?                                                      // @1 end
        for (int i = 0; i &lt; length; i++) {                                                                                                                             // @2 
            Invoker<t> invoker = invokers.get(i);
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); //          
                                                    // Weight
            if (leastActive == -1 || active &lt; leastActive) { // Restart, when find a invoker having smaller least active value.              // @3
                leastActive = active; // Record the current least active value
                leastCount = 1; // Reset leastCount, count again based on current leastCount
                leastIndexs[0] = i; // Reset
                totalWeight = weight; // Reset
                firstWeight = weight; // Record the weight the first invoker
                sameWeight = true; // Reset, every invoker has the same weight value?
            } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.       // @4
                leastIndexs[leastCount++] = i; // Record index number of this invoker
                totalWeight += weight; // Add this invoker's weight to totalWeight.
                // If every invoker has the same weight?
                if (sameWeight &amp;&amp; i &gt; 0
                        &amp;&amp; weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount &gt; 0)
        if (leastCount == 1) {     // @5
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexs[0]);
        }
        if (!sameWeight &amp;&amp; totalWeight &gt; 0) {    // @6
            // If (not every invoker has the same weight &amp; at least one invoker's weight&gt;0), select randomly based on totalWeight.
            int offsetWeight = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i &lt; leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight &lt;= 0)
                    return invokers.get(leastIndex);
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

代碼@1:解釋相關局部變量。 length :服務提供者數量。 leastActive :服務提供者的最小活躍鏈接數,初始化爲-1。 leastCount :服務提供者中都是活躍鏈接數的個數,例如,3個服務提供者當前的活躍鏈接數分別爲 100,102,100,則leastCount 爲2。 leastIndexs:存放擁有活躍鏈接數的Invoker索引,例如上面100,102,100,則leastIndexs[0]=0, leastIndexs[1] = 2; totalWeight:擁有最小活躍鏈接數的Invoker的總權重。 firstWeight :第一個最小活躍鏈接數的Invoker的權重。 sameWeight :擁有最小活躍鏈接數的Invoker權重是否相同。

代碼@2:遍歷全部的服務提供者,計算上述變量的值。

代碼@3:若是leastActive (最小活躍鏈接數爲-1,表示第一次遍歷)或最新鏈接數大於當前遍歷的Invoker的活躍鏈接數,須要reset以下值,從新計算: leastActive = active; // Record the current least active value leastCount = 1; // Reset leastCount, count again based on current leastCount leastIndexs[0] = i; // Reset totalWeight = weight; // Reset firstWeight = weight; // Record the weight the first invoker sameWeight = true; // Reset, every invoker has the same weight value?

代碼@4:若是當前遍歷的服務提供者的活躍數等於leastActive ,則將總權重想加,並在leastIndexs中記錄服務提供者序號。

代碼@5,若是最小活躍鏈接數的服務提供者數量只有一個,則直接返回該服務提供者。

代碼@6,若是最小活躍鏈接數的服務提供者有多個,則使用加權隨機算法選取服務提供者。

關於Dubbo的4種負載均衡算法的實現細節就分析到這裏了。


做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。 </t></invoker<t></t></t></t></invoker<t></invoker<t></invoker<t></invoker<t></t></t></string,></string,></invoker<t></t></t></long,></t></long,></t></t></t></long,></invoker<t></long,></invoker<t></t></t>

相關文章
相關標籤/搜索