Dubbo提供了哪些負載均衡機制?如何實現的?html
LoadBalance接口:能夠看出,經過SPI機制默認爲RandomLoadBalance,生成的適配器類執行select()方法。node
1 /**
2 * LoadBalance. (SPI, Singleton, ThreadSafe) 3 * <p> 4 * <a href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load-Balancing</a> 5 * 6 * @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory) 7 */ 8 @SPI(RandomLoadBalance.NAME) 9 public interface LoadBalance { 10 11 /** 12 * select one invoker in list. 13 * 14 * @param invokers invokers. 15 * @param url refer url 16 * @param invocation invocation. 17 * @return selected invoker. 18 */ 19 @Adaptive("loadbalance") 20 <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; 21 22 }
實現類的基本類圖以下所示:算法
0、AbstractLoadBalance是LoadBalance接口的默認實現抽象類,爲子類提供了實現框架。咱們來看看此類具體的實現:數組
(1)主要方法固然是select(),經過選擇實現了負載均衡策略。實現主要是調用doSelect()方法,它是個抽象方法,留給具體子類實現不一樣的負載均衡策略;app
(2)getWeight()方法計算出invoker權重,計算公式爲:weight = (int) (uptime(提供者正常運行時間) / warmup(升溫時間) /weight(設定權重)))負載均衡
1 protected int getWeight(Invoker<?> invoker, Invocation invocation) {
2 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); 3 if (weight > 0) { 4 long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L); 5 if (timestamp > 0L) { 6 int uptime = (int) (System.currentTimeMillis() - timestamp); 7 int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); 8 if (uptime > 0 && uptime < warmup) { 9 weight = calculateWarmupWeight(uptime, warmup, weight); 10 } 11 } 12 } 13 return weight; 14 }
一、Random LoadBalance:隨機,按權重設置隨機機率。框架
在一個截面上碰撞的機率高,但調用量越大分佈越均勻,並且按機率使用權重後也比較均勻,有利於動態調整提供者權重。 dom
RandomLoadBalance子類,主要經過doSelect()實現按權重的隨機算法,實現邏輯爲:ide
(1)計算總權重;ui
(2)若是沒有設置權重或者全部權重都同樣,直接從invokers列表隨機返回一個;
(3)不然:使用總權重隨機計算一個offset(偏移量),循環invokers列表,offset=offset -(當前invoker權重),即剩餘權重,而後返回第一個大於offset權重的invoker;此算法兼顧了權重和輪詢(千重相同則輪詢,權重不一樣則從大到小的節點順序輪詢選中)兩個因素。
具體實現以下:
1 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
2 int length = invokers.size(); // Number of invokers
3 int totalWeight = 0; // The sum of weights
4 boolean sameWeight = true; // Every invoker has the same weight?
5 for (int i = 0; i < length; i++) { 6 int weight = getWeight(invokers.get(i), invocation); 7 totalWeight += weight; // Sum 8 if (sameWeight && i > 0 9 && weight != getWeight(invokers.get(i - 1), invocation)) { 10 sameWeight = false; 11 } 12 } 13 if (totalWeight > 0 && !sameWeight) { 14 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. 15 int offset = random.nextInt(totalWeight); 16 // Return a invoker based on the random value. 17 for (int i = 0; i < length; i++) { 18 offset -= getWeight(invokers.get(i), invocation); 19 if (offset < 0) { 20 return invokers.get(i); 21 } 22 } 23 } 24 // If all invokers have the same weight value or totalWeight=0, return evenly. 25 return invokers.get(random.nextInt(length)); 26 }
二、RoundRobin LoadBalance:輪循,按公約後的權重設置輪循比率。
存在慢的提供者累積請求的問題,好比:第二臺機器很慢,但沒掛,當請求調到第二臺時就卡在那,長此以往,全部請求都卡在調到第二臺上。
RoundRobinLoadBalance子類,用doSelect()實現了按公約後的權重設置輪訓比率,
1 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//一個service接口的一個方法爲一個key
2 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); 3 int length = invokers.size(); // Number of invokers 4 int maxWeight = 0; // The maximum weight 5 int minWeight = Integer.MAX_VALUE; // The minimum weight 6 final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); 7 int weightSum = 0;
//輪訓計算總權重值、最大權重值、最小權重值 8 for (int i = 0; i < length; i++) { 9 int weight = getWeight(invokers.get(i), invocation); 10 maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight 11 minWeight = Math.min(minWeight, weight); // Choose the minimum weight 12 if (weight > 0) { 13 invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); 14 weightSum += weight; 15 } 16 }
//給每一個請求方法設置一個原子Integer 17 AtomicPositiveInteger sequence = sequences.get(key); 18 if (sequence == null) { 19 sequences.putIfAbsent(key, new AtomicPositiveInteger()); 20 sequence = sequences.get(key); 21 } 22 int currentSequence = sequence.getAndIncrement(); 23 if (maxWeight > 0 && minWeight < maxWeight) { 24 int mod = currentSequence % weightSum; 25 for (int i = 0; i < maxWeight; i++) { 26 for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { 27 final Invoker<T> k = each.getKey(); 28 final IntegerWrapper v = each.getValue(); 29 if (mod == 0 && v.getValue() > 0) { 30 return k; 31 } 32 if (v.getValue() > 0) { 33 v.decrement(); 34 mod--; 35 } 36 } 37 } 38 } 39 // Round robin 40 return invokers.get(currentSequence % length); 41 }
算法原理及實現討論另外寫了一篇博客,見:《負載均衡算法WeightedRoundRobin(加權輪詢)簡介及算法實現》
三、LeastActive LoadBalance:最少活躍調用數,相同活躍數的隨機,活躍數指調用先後計數差,即響應一次請求所花費的時長。
使慢的提供者收到更少請求,由於越慢的提供者的調用先後計數差會越大。
算法實現邏輯爲:
假如節點活躍數依次爲{Node0=3ms,Node1=6ms,Node2=2ms,Node3=2ms,Node4=4ms},
(1)沒有設置權重,或者權重都同樣的狀況下,遍歷全部節點,找出節點中最小活躍數的節點,結果爲{Node2=2ms,Node3=2ms};
(2)按照算法約束:相同活躍數的隨機取,則從{Node2,Node3}中隨機取出一個節點返回;
(3)設置了權重,且權重不同的狀況下,從最小活躍數子集{Node2,Node3}中取出權重大的一個節點返回。具體實現與隨機訪問算法Random LoadBalance相似,構造一個考慮了權重和輪詢(多個相同權重的節點輪詢選擇)兩個因素的算法,使用總權重隨機計算一個offset(偏移量),循環invokers列表,offset = offset -(當前invoker權重),即剩餘權重,而後返回第一個大於offset權重的invoker。
具體算法實現以下:
1 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
2 int length = invokers.size(); // Number of invokers
3 int leastActive = -1; // 記錄最少的活躍數
4 int leastCount = 0; // 擁有最少活躍數,且活躍數相同的節點個數
5 int[] leastIndexs = new int[length]; // 最少活躍數節點索引數組(數組內節點的活躍數相同)
6 int totalWeight = 0; // The sum of weights
7 int firstWeight = 0; // Initial value, used for comparision
8 boolean sameWeight = true; // Every invoker has the same weight value?
9 for (int i = 0; i < length; i++) { 10 Invoker<T> invoker = invokers.get(i); 11 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); //從上下文記錄中取得方法的活躍數 Active number 12 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight 13 if (leastActive == -1 || active < leastActive) { // 當找到一個更小的活躍數節點時,重置變量Restart, when find a invoker having smaller least active value. 14 leastActive = active; // Record the current least active value 15 leastCount = 1; // Reset leastCount, count again based on current leastCount 16 leastIndexs[0] = i; // Reset 17 totalWeight = weight; // Reset 18 firstWeight = weight; // Record the weight the first invoker 19 sameWeight = true; // Reset, every invoker has the same weight value? 20 } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating. 21 leastIndexs[leastCount++] = i; // Record index number of this invoker 22 totalWeight += weight; // Add this invoker's weight to totalWeight. 23 // If every invoker has the same weight? 24 if (sameWeight && i > 0 25 && weight != firstWeight) { 26 sameWeight = false; 27 } 28 } 29 } 30 // assert(leastCount > 0) 31 if (leastCount == 1) { 32 // If we got exactly one invoker having the least active value, return this invoker directly. 33 return invokers.get(leastIndexs[0]); 34 } 35 if (!sameWeight && totalWeight > 0) { 36 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
//考慮了權重和輪詢(多個相同權重的節點輪詢選擇)兩個因素的算法,使用總權重隨機計算一個offset(偏移量),循環invokers列表,offset = offset -(當前invoker權重),即剩餘權重,而後返回第一個大於offset權重的invoker 37 int offsetWeight = random.nextInt(totalWeight); 38 // Return a invoker based on the random value. 39 for (int i = 0; i < leastCount; i++) { 40 int leastIndex = leastIndexs[i]; 41 offsetWeight -= getWeight(invokers.get(leastIndex), invocation); 42 if (offsetWeight <= 0) 43 return invokers.get(leastIndex); 44 } 45 } 46 // If all invokers have the same weight value or totalWeight=0, return evenly. 47 return invokers.get(leastIndexs[random.nextInt(leastCount)]); 48 }
四、ConsistentHash LoadBalance:一致性哈希。適用場景爲:相同參數的請求始終發送到同一個提供者。
<dubbo:parameter key="hash.arguments" value="0,1" />
<dubbo:parameter key="hash.nodes" value="320" />
具體dubbo實現以下:
1 private static final class ConsistentHashSelector<T> {
2
3 private final TreeMap<Long, Invoker<T>> virtualInvokers; 4 5 private final int replicaNumber; 6 7 private final int identityHashCode; 8 9 private final int[] argumentIndex; 10 11 ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { 12 this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); 13 this.identityHashCode = identityHashCode; 14 URL url = invokers.get(0).getUrl();
//沒有設置,默認虛擬節點(分片)數160個 15 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); 16 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); 17 argumentIndex = new int[index.length]; 18 for (int i = 0; i < index.length; i++) { 19 argumentIndex[i] = Integer.parseInt(index[i]); 20 } 21 for (Invoker<T> invoker : invokers) { 22 String address = invoker.getUrl().getAddress(); 23 for (int i = 0; i < replicaNumber / 4; i++) { 24 byte[] digest = md5(address + i); 25 for (int h = 0; h < 4; h++) { 26 long m = hash(digest, h); 27 virtualInvokers.put(m, invoker); 28 } 29 } 30 } 31 } 32 33 public Invoker<T> select(Invocation invocation) { 34 String key = toKey(invocation.getArguments()); 35 byte[] digest = md5(key); 36 return selectForKey(hash(digest, 0)); 37 } 38 39 private String toKey(Object[] args) { 40 StringBuilder buf = new StringBuilder(); 41 for (int i : argumentIndex) { 42 if (i >= 0 && i < args.length) { 43 buf.append(args[i]); 44 } 45 } 46 return buf.toString(); 47 } 48 49 private Invoker<T> selectForKey(long hash) { 50 Invoker<T> invoker; 51 Long key = hash; 52 if (!virtualInvokers.containsKey(key)) { 53 SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key); 54 if (tailMap.isEmpty()) { 55 key = virtualInvokers.firstKey(); 56 } else { 57 key = tailMap.firstKey(); 58 } 59 } 60 invoker = virtualInvokers.get(key); 61 return invoker; 62 } 63 64 private long hash(byte[] digest, int number) { 65 return (((long) (digest[3 + number * 4] & 0xFF) << 24) 66 | ((long) (digest[2 + number * 4] & 0xFF) << 16) 67 | ((long) (digest[1 + number * 4] & 0xFF) << 8) 68 | (digest[number * 4] & 0xFF)) 69 & 0xFFFFFFFFL; 70 } 71 72 private byte[] md5(String value) { 73 MessageDigest md5; 74 try { 75 md5 = MessageDigest.getInstance("MD5"); 76 } catch (NoSuchAlgorithmException e) { 77 throw new IllegalStateException(e.getMessage(), e); 78 } 79 md5.reset(); 80 byte[] bytes; 81 try { 82 bytes = value.getBytes("UTF-8"); 83 } catch (UnsupportedEncodingException e) { 84 throw new IllegalStateException(e.getMessage(), e); 85 } 86 md5.update(bytes); 87 return md5.digest(); 88 } 89 90 }
配置:
服務端服務級別:<dubbo:service interface="..." loadbalance="roundrobin" />
服務端方法級別:<dubbo:service interface="..."><dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:service>
客戶端服務級別:<dubbo:
reference
interface="..." loadbalance="roundrobin" />
客戶端方法級別:<dubbo:
reference
interface="..."><dubbo:method name="..." loadbalance="roundrobin"/></dubbo:
reference
>