Dubbo支持在服務調用方對服務提供者採用負載均衡算法,LoadBalance 接口定義以下:node
@SPI(RandomLoadBalance.NAME) public interface LoadBalance { /** * select one invoker in list. * * @param invokers invokers. * @param url refer url * @param invocation invocation. * @return selected invoker. */ @Adaptive("loadbalance") <t> Invoker<t> select(List<invoker<t>> invokers, URL url, Invocation invocation) throws RpcException; }
從中透露出以下幾個信息: 默認若是不配置,使用RandomLoadBalance策略(加權隨機負載算法)。整個Dubbo的負載均衡類圖以下所示: 上述各類路由負載策略,對應的配置值以下:dubbo-cluster\src\main\resources\META-INF\dubbo\internal\com.alibaba.dubbo.rpc.cluster.LoadBalance算法
random random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance緩存
roundrobin roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance數據結構
leastactive leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance架構
consistenthash consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance 其配置使用,一般通常在< dubbo:consumer/>、< dubbo:service />、< dubbo:reference />的loadbalance屬性配置,一般< dubbo:consumer/>這個屬性指定消費端的默認策略,某些服務須要指定特殊負載均衡策略的話,通常經過< dubbo:reference />來指定。 若是各位對其源碼實現比較有興趣的話,能夠看接下來的部分,源碼分析各類負載算法的具體實現細節。併發
一致Hash算法,一般用在緩存領域,主要解決的問題是當數據節點數量發送變化後,儘可能減小數據的遷移,在負責算法領域,我的不建議使用。Dubbo一致性Hash算法的實現邏輯主要分佈在ConsistentHashLoadBalance$ConsistentHashSelector中。app
private final TreeMap<long, invoker<t>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex;
TreeMap< Long, Invoker< T>> virtualInvokers:虛擬節點,使用TreeMap實現Hash環,將Invoker分佈在環上。 int replicaNumber:虛擬節點個數。 int identityHashCode:HashCode。 int[] argumentIndex:須要參與hash的參數索引,,argumentIndex = [0,1]表示服務方法的第一個,第二個參數參與hashcode計算。 接下來看一下其構造方法:負載均衡
public ConsistentHashSelector(List<invoker<t>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<long, invoker<t>>(); this.identityHashCode = System.identityHashCode(invokers); // @1 URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // @2 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); // @3 start argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i ++) { argumentIndex[i] = Integer.parseInt(index[i]); } // @3 end for (Invoker<t> invoker : invokers) { // @4 for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(invoker.getUrl().toFullString() + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } // @4 end }
代碼@1:根據全部的調用者生成一個HashCode,用該HashCode值來判斷服務提供者是否發生了變化。dom
代碼@2:獲取服務提供者< dubbo:method/>標籤的hash.nodes屬性,若是爲空,默認爲160,表示一致性hash算法中虛擬節點數量。其配置方式以下: < dubbo:method ... > < dubbo:parameter key="hash.nodes" value="160" /> < dubbo:parameter key="hash.arguments" value="0,1" /> < /dubbo:method/>分佈式
代碼@3:一致性Hash算法,在dubbo中,相同的服務調用參數走固定的節點,hash.arguments表示哪些參數參與hashcode,默認值「0」,表示第一個參數。
代碼@4:爲每個Invoker建立replicaNumber 個虛擬節點,每個節點的Hashcode不一樣。同一個Invoker不一樣hashcode的建立邏輯爲: invoker.getUrl().toFullString() + i (0-39)的值,對其md5,而後用該值+h(0-3)的值取hash。一致性hash實現的一個關鍵是若是將一個Invoker建立的replicaNumber 個虛擬節點(hashcode)可以均勻分佈在Hash環上,Dubbo給出的實現以下,因爲能力有限,目前並未真正理解以下方法的實現依據:
private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; }
綜上所述,構造函數主要完成一致性Hash算法Hash環的構建,利用了TreeMap的有序性來實現。
public Invoker<t> select(Invocation invocation) { String key = toKey(invocation.getArguments()); // @1 byte[] digest = md5(key); // @2 return selectForKey(hash(digest, 0)); // @3 }
代碼@1:根據調用參數,並根據hash.arguments配置值,獲取指定的位置的參數值,追加一塊兒返回。
代碼@2:對Key進行md5簽名。
代碼@3:根據key進行選擇調用者。
private Invoker<t> selectForKey(long hash) { Map.Entry<long, invoker<t>> entry = virtualInvokers.tailMap(hash, true).firstEntry(); // @1 if (entry == null) { // @2 entry = virtualInvokers.firstEntry(); } return entry.getValue(); // @3 }
代碼@1,對虛擬節點,從virtualInvokers中選取一個子集,subMap(hash,ture,lastKey,true),其實就是實現根據待查找hashcode(key)順時針,選中大於等於指定key的第一個key。
代碼@2,若是未找到,則返回virtualInvokers第一個key。
代碼@3:根據key返回指定的Invoker便可。
這裏實現,應該能夠不使用tailMap,代碼修改以下:
private Invoker<t> selectForKey(long hash) { Map.Entry<long, invoker<t>> entry = virtualInvokers.ceilingEntry(hash); if(entry == null ) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); }
若是想要了解TreeMap關於這一塊的特性(tailMap、ceillingEntry、headMap)等API的詳細解釋,能夠查看個人另一篇博文:https://blog.csdn.net/prestigeding/article/details/80821576
因爲roundrobin(加權輪詢)、random(加權隨機)、leastactive(最小活躍鏈接數)都與權重有關係,在介紹這兩種負載均衡算法以前,咱們首先看一下Dubbo關於權重的獲取邏輯,代碼見AbstractLoadBalance#getWeigh方法:
protected int getWeight(Invoker<!--?--> invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // @1 if (weight > 0) { long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L); // @2 if (timestamp > 0L) { int uptime = (int) (System.currentTimeMillis() - timestamp); int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); // @3 if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); // @4 } } } return weight; }
代碼@1:首先獲取服務提供者的權重(weight)。
代碼@2:獲取服務提供者的啓動時間,在服務提供者啓動時,會將啓動時間戳存儲在服務提供者的URL中,在服務發現(RegistryDirecotry)服務發現時,會將服務提供者的時間戳KEY,換成REMOTE_TIMESTAMP_KEY,避免與服務消費者的啓動時間戳衝突。
代碼@3:獲取服務提供者是否開啓預熱機制,經過服務提供者< dubbo:service warmup=""/>參數來設置,若是未設置,去默認值10 * 60 * 1000(10分鐘)。
代碼@4:若是服務提供者啓動時間小於預熱時間(預熱期間),須要根據啓動時間,來計算預熱期間服務提供者的權重。
AbstractLoadBalance#calculateWarmupWeight
static int calculateWarmupWeight(int uptime, int warmup, int weight) { // @1 int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); }
代碼@1:參數說明,uptime:服務提供者啓動時間;warmup:設置的預熱時間;weight:服務提供者的權重,該方法在uptime < warmup時被調用 該方法的實現,就是在預熱期間,根據啓動時間,動態返回該服務提供者的權重,而且啓動時間越長,返回的權重越接近weight,啓動時間超過預熱時間,則直接返回weight。
該方法單元測試:
其輸出結果:
protected <t> Invoker<t> doSelect(List<invoker<t>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int totalWeight = 0; // The sum of weights // @1 start boolean sameWeight = true; // Every invoker has the same weight? for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } // @1 end if (totalWeight > 0 && !sameWeight) { // @2 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(random.nextInt(length)); // @3 }
代碼@1:首先求全部服務提供者的總權重,並判斷每一個服務提供者的權重是否相同。
代碼@2:若是提供者之間的權重不相同,則產生一個隨機數(0-totalWeight),視爲offset,而後依次用offset減去服務提供者的權重,若是減去(offset - provider.weight < 0),則該invoker命中。
代碼@3:若是服務提供者的權重相同,則隨機產生[0-invoker.size)便可。
加權輪詢算法的核心算法是按權重輪詢,一個基本點是應該是一個當前序號與服務提供者數量取模,須要結合權重。Dubbo使用以下數據結構存儲當前序號:
private final ConcurrentMap<string, atomicpositiveinteger> sequences = new ConcurrentHashMap<string, atomicpositiveinteger>();鍵值:serviceKey(<dubbo:service interface="" />+ methodname),每一個方法採用不一樣的計數器。 RoundRobinLoadBalance #doSelect protected <t> Invoker<t> doSelect(List<invoker<t>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // @1 int length = invokers.size(); // Number of invokers int maxWeight = 0; // The maximum weight int minWeight = Integer.MAX_VALUE; // The minimum weight final LinkedHashMap<invoker<t>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<invoker<t>, IntegerWrapper>(); // @2 start int weightSum = 0; for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight minWeight = Math.min(minWeight, weight); // Choose the minimum weight if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } // @2 end AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } int currentSequence = sequence.getAndIncrement(); // @3 if (maxWeight > 0 && minWeight < maxWeight) { // @4 int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { for (Map.Entry<invoker<t>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<t> k = each.getKey(); final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } } // Round robin return invokers.get(currentSequence % length); // @5 }
代碼@1:構建ConcurrentMap< String, AtomicPositiveInteger> sequences中的key,以interface+methodname爲鍵,裏面存儲的是當前序號(輪詢)。 代碼@2:構建LinkedHashMap< Invoker< T>, IntegerWrapper>存儲結構,經過遍歷全部Invoker,構建每一個Invoker的權重,與此同時算出總權重,而且得出全部服務提供者權重是否相同。
代碼@3:獲取當前的輪詢序號,用於取模。
代碼@4:若是服務提供者之間的權重有差異,須要按權重輪詢,實現方式是: 1)用當前輪詢序號與服務提供者總權重取模,餘數爲mod。 2)而後從0循環直到最大權重,針對每一次循環,按同一順序遍歷全部服務提供者,若是mod等於0而且對應的Invoker的權重計算器大於0,則選擇該服務提供者;不然,mod--,invoker對應的權重減一,權重是臨時比那裏LinkedHashMap< Invoker< T>, IntegerWrapper>。因爲外層循環的次數爲全部服務提供者的最大權重,內層循環當mod等於0時,確定會有一個服務提供者的權重計數器大於0,而返回對應的服務提供者。返回的服務提供者是第一個知足的服務提供者,後續的服務提供者在下一次就會有機會, 由於下一次mod會增大1,後續的服務提供者經過輪詢會被選擇,選擇的機會,取決於權重的大小。
代碼@5:若是各服務提供者權重相同,則直接對服務提供者取模便可,輪詢後遞增。
最小活躍鏈接數,其核心實現就是,首先找到服務提供者當前最小的活躍鏈接數,若是一個服務提供者的服務鏈接數比其餘的都要小,則選擇這個活躍鏈接數最小的服務提供者發起調用,若是存在多個服務提供者的活躍鏈接數,而且是最小的,則在這些服務提供者之間選擇加權隨機算法選擇一個服務提供者。
protected <t> Invoker<t> doSelect(List<invoker<t>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers // @1 start int leastActive = -1; // The least active value of all invokers int leastCount = 0; // The number of invokers having the same least active value (leastActive) int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive) int totalWeight = 0; // The sum of weights int firstWeight = 0; // Initial value, used for comparision boolean sameWeight = true; // Every invoker has the same weight value? // @1 end for (int i = 0; i < length; i++) { // @2 Invoker<t> invoker = invokers.get(i); int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // // Weight if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value. // @3 leastActive = active; // Record the current least active value leastCount = 1; // Reset leastCount, count again based on current leastCount leastIndexs[0] = i; // Reset totalWeight = weight; // Reset firstWeight = weight; // Record the weight the first invoker sameWeight = true; // Reset, every invoker has the same weight value? } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating. // @4 leastIndexs[leastCount++] = i; // Record index number of this invoker totalWeight += weight; // Add this invoker's weight to totalWeight. // If every invoker has the same weight? if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // @5 // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexs[0]); } if (!sameWeight && totalWeight > 0) { // @6 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offsetWeight = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(leastIndexs[random.nextInt(leastCount)]); }
代碼@1:解釋相關局部變量。 length :服務提供者數量。 leastActive :服務提供者的最小活躍鏈接數,初始化爲-1。 leastCount :服務提供者中都是活躍鏈接數的個數,例如,3個服務提供者當前的活躍鏈接數分別爲 100,102,100,則leastCount 爲2。 leastIndexs:存放擁有活躍鏈接數的Invoker索引,例如上面100,102,100,則leastIndexs[0]=0, leastIndexs[1] = 2; totalWeight:擁有最小活躍鏈接數的Invoker的總權重。 firstWeight :第一個最小活躍鏈接數的Invoker的權重。 sameWeight :擁有最小活躍鏈接數的Invoker權重是否相同。
代碼@2:遍歷全部的服務提供者,計算上述變量的值。
代碼@3:若是leastActive (最小活躍鏈接數爲-1,表示第一次遍歷)或最新鏈接數大於當前遍歷的Invoker的活躍鏈接數,須要reset以下值,從新計算: leastActive = active; // Record the current least active value leastCount = 1; // Reset leastCount, count again based on current leastCount leastIndexs[0] = i; // Reset totalWeight = weight; // Reset firstWeight = weight; // Record the weight the first invoker sameWeight = true; // Reset, every invoker has the same weight value?
代碼@4:若是當前遍歷的服務提供者的活躍數等於leastActive ,則將總權重想加,並在leastIndexs中記錄服務提供者序號。
代碼@5,若是最小活躍鏈接數的服務提供者數量只有一個,則直接返回該服務提供者。
代碼@6,若是最小活躍鏈接數的服務提供者有多個,則使用加權隨機算法選取服務提供者。
關於Dubbo的4種負載均衡算法的實現細節就分析到這裏了。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。 </t></invoker<t></t></t></t></invoker<t></invoker<t></invoker<t></invoker<t></t></t></string,></string,></invoker<t></t></t></long,></t></long,></t></t></t></long,></invoker<t></long,></invoker<t></t></t>