前言:在上一篇博客中,介紹了zookeeper做爲dubbo的註冊中心是如何工做的,有一個很重要的點,咱們的程序是分佈式應用,服務部署在幾個節點(服務器)上,當消費者調用服務時,zk返回給dubbo的是一個節點列表,可是dubbo只會選擇一臺服務器,那麼它究竟會選擇哪一臺呢?這就是dubbo的負載均衡策略了,本篇博客就來聚焦dubbo的負載均衡策略。node
本篇博客的目錄算法
一:負載均衡介紹spring
1.1:負載均衡簡介數組
如下是wikipedia對負載均衡的定義:springboot
負載均衡改善了跨多個計算資源(例如計算機,計算機集羣,網絡連接,中央處理單元或磁盤驅動的的工做負載分佈。負載平衡旨在優化資源使用,最大化吞吐量,最小化響應時間,並避免任何單個資源的過載。使用具備負載平衡而不是單個組件的多個組件能夠經過冗餘提升可靠性和可用性。負載平衡一般涉及專用軟件或硬件服務器
1.2:簡單解釋網絡
這個概念如何理解呢?通俗點來講假如一個請求從客戶端發起,好比(查詢訂單列表),要選擇服務器進行處理,可是咱們的集羣環境提供了5個服務器A\B\C\D\E,每一個服務器都有處理這個請求的能力,此時客戶端就必須選擇一個服務器來進行處理(不存在先選擇A,處理一會又選擇C,又跳到D).說白了就是一個選擇的問題。當請求多了的話,就要考慮各服務器的負載,一共5個服務器,不可能每次都讓一個服務器都來處理吧,好比把讓其餘服務器來分壓。這就是負載均衡的優勢:避免單個服務器響應同一請求,容易形成服務器宕機、崩潰等問題。架構
二:dubbo的loadBalance接口app
1.1:loadBalance負載均衡
dubbo的負載均衡策略,主體向外暴露出來是一個接口,名字叫作loadBlace,位於com.alibaba.dubbo.rpc.cluster包下,很明顯根據包名就能夠看出它是用來管理集羣的:
這個接口就一個方法,select方法的做用就是從衆多的調用的List選擇出一個調用者,Invoker能夠理解爲客戶端的調用者,dubbo專門封裝一個類來表示,URL就是調用者發起的URL請求連接,從這個URL中能夠獲取不少請求的具體信息,Invocation表示的是調用的具體過程,dubbo用這個類模擬調用具體細節過程:
1.2:AbstractLoadBlance
該接口在下面的子類都會對其進行實現。接口下是一個抽象類AbstractLoadBlance
package com.alibaba.dubbo.rpc.cluster;
public interface LoadBalance {@Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
AbstractLoadBlance抽象類繼承自LoadBalance,其中有個static方法代表它在類加載的時候就會運行,它表示的含義是計算預熱加載權重,參數是uptime,這裏能夠理解爲服務啓動的時間,warmup就是預熱時間,weight是權重的值,下面會對比進行詳細解釋:
public abstract class AbstractLoadBalance implements LoadBalance{ static int calculateWarmupWeight(int uptime, int warmup, int weight){ // } @Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation){ // } protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker<?> invoker, Invocation invocation) { } }
1.2.1:select方法
抽象類方法中有個有方法體的方法select,先判斷調用者組成的List是否爲null,若是是null就返回null。再判斷調用者的大小,若是隻有一個就返回那個惟一的調用者(試想,若是服務調用另外一個服務時,當服務的提供者機器只有一個,那麼就能夠返回那一個,由於沒有選擇了!)若是這些都不成立,就繼續往下走,走doSelect方法:
@Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); }
1.2.2:doSelect方法
該方法是抽象的,交給具體的子類去實現,由此也能夠思考出一個問題就是:dubbo爲何要將一個接口首先作出一個實現抽象類,再由不一樣的子類去實現?緣由是抽象類中的非抽象方法,再子類中都是必需要實現的,而他們子類的不一樣點就是具體作出選擇的策略不一樣,將公共的邏輯提取出來放在抽象類裏,子類不用寫多餘的代碼,只用維護和實現最終要的本身的邏輯
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
1.2.3:getWeight方法
顧名思義,這個方法的含義就是獲取權重,首先經過URL(URL爲dubbo封裝的一個實體)獲取基本的權重,若是權重大於0,會獲取服務啓動時間,再用當前的時間-啓動時間就是服務到目前爲止運行了多久,所以這個upTime就能夠理解爲服務啓動時間,再獲取配置的預熱時間,若是啓動時間小於預熱時間,就會再次調用獲取權重。這個預熱的方法其實dubbo針對JVM作出的一個很契合的優化,由於JVM從啓動到起來都運行到最佳狀態是須要一點時間的,這個時間叫作warmup,而dubbo就會對這個時間進行設定,而後等到服務運行時間和warmup相等時再計算權重,這樣就能夠保障服務的最佳運行狀態!
protected int getWeight(Invoker<?> invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0) { long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L); if (timestamp > 0L) { int uptime = (int) (System.currentTimeMillis() - timestamp); int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; }
三:dubbo的幾種負載均衡策略
3.1:總體架構圖
能夠看出抽象的負載均衡下的類分爲4個,這4個類表示了4種負載均衡策略,分別是一致性Hash均衡算法、隨機調用法、輪詢法、最少活動調用法
3.2:RandomLoadBalance
隨機調用負載均衡,該類實現了抽象的AbstractLoadBalance接口,重寫了doSelect方法,看方法的細節就是首先遍歷每一個提供服務的機器,獲取每一個服務的權重,而後累加權重值,判斷每一個服務的提供者權重是否相同,若是每一個調用者的權重不相同,而且每一個權重大於0,那麼就會根據權重的總值生成一個隨機數,再用這個隨機數,根據調用者的數量每次減去調用者的權重,直到計算出當前的服務提供者隨機數小於0,就選擇那個提供者!另外,若是每一個機器的權重的都相同,那麼權重就不會參與計算,直接選擇隨機算法生成的某一個選擇,徹底隨機。能夠看出,隨機調用法,
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int totalWeight = 0; // The sum of weights boolean sameWeight = true; // Every invoker has the same weight? for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(random.nextInt(length)); }
3.3:RoundRobinLoadBlance
輪詢調用,輪詢調用的過程主要是維護了局部變量的一個LinkdesHashMap(有順序的Map)去存儲調用者和權重值的對應關係,而後遍歷每一個調用者,把調用者和當前大於0的權重值放進去,再累加權重值。還有一個全局變量的map,找到第一個服務調用者,首先是找到每一個服務的key值和method,這裏能夠理解爲標識第一個調用者的惟一key,而後再給它對應的值保證原子性的+1(AtomicPositiveInteger是原子的),再對這個值取模總權重,再每次對其權重值-1,知道它取模與總權重值等於0就選擇該調用者,能夠稱之爲"降權取模"(只是一種的計算層面,而不是真正降權)。總結:輪詢調用並非簡單的一個接着一個依次調用,它是根據權重的值進行循環的。
public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); // Number of invokers int maxWeight = 0; // The maximum weight int minWeight = Integer.MAX_VALUE; // The minimum weight final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight minWeight = Math.min(minWeight, weight); // Choose the minimum weight if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } } // Round robin return invokers.get(currentSequence % length); }
2.4:LeastActiveLoadBlance
最少活躍數調用法:這個方法的主要做用根據服務的提供者的運行狀態去選擇服務器,主要的思路就是遍歷每一個調用者,而後獲取每一個服務器的運行狀態,若是當前運行的運行狀態小於最小的狀態-1,把它保存在leastIndexs中的第一個位置,而且認定全部的調用者權重都相同,而後直接返回那個調用者(這裏的邏輯是:找到最少活躍數(在代碼層反應就是:active的值))。若是計算出的權重值和最少的權重值相同,那麼把它保存在leastIndexs數組裏面,累加權重值,若是當前的權重值不等於初始值firstWeight,那麼就認定不是全部的調用者的權重不一樣。而後再遍歷lestIndexs,取權重累加值的隨機數生成權重偏移量,在累減它,到它小於0的時候返回那個調用者。若是這些都不符合,就從leastIndexs隨機選一個index,返回那個調用者!
public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; private final Random random = new Random(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int leastActive = -1; // The least active value of all invokers int leastCount = 0; // The number of invokers having the same least active value (leastActive) int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive) int totalWeight = 0; // The sum of weights int firstWeight = 0; // Initial value, used for comparision boolean sameWeight = true; // Every invoker has the same weight value? for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value. leastActive = active; // Record the current least active value leastCount = 1; // Reset leastCount, count again based on current leastCount leastIndexs[0] = i; // Reset totalWeight = weight; // Reset firstWeight = weight; // Record the weight the first invoker sameWeight = true; // Reset, every invoker has the same weight value? } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating. leastIndexs[leastCount++] = i; // Record index number of this invoker totalWeight += weight; // Add this invoker's weight to totalWeight. // If every invoker has the same weight? if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexs[0]); } if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offsetWeight = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(leastIndexs[random.nextInt(leastCount)]); } }
2.2.5:ConsistentHashLoadBalance
一致性Hash算法,doSelect方法進行選擇。一致性Hash負載均衡涉及到兩個主要的配置參數爲hash.arguments與hash.nodes
public class ConsistentHashLoadBalance extends AbstractLoadBalance { private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
//若選擇器不存在去建立 if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); }
//私有內部類 private static final class ConsistentHashSelector<T> { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h);
//虛擬調用者 virtualInvokers.put(m, invoker); } } } } //選擇調用 public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); } //轉化爲服務的key值 private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } // private Invoker<T> selectForKey(long hash) {
//從TreeMap中去尋找 Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry(); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } //計算Hash值 private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } //md5加密 private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); } } }
三:dubbo的默認負載均衡策略
3.1:由@SPI註解能夠看到,dubbo默認的負載均衡策略是隨機調用法
3.2:如何改變dubbo的負載均衡策略?
3.2.1:若是是springboot項目,直接註解在@Reference中引用,而後註明loadblance="xx".其中xx爲每一個實現類中的name的值
3.2.2:xml配置的方式
<dubbo:serviceinterface="..."loadbalance="roundrobin"/>
四:總結
本篇博客講述了dubbo的負載均衡機制,其中能夠看到除了一致性Hash算法,其它都是根據權重進行計算的,在實際的分佈式應用中,理解dubbo如何與zookeeper進行通訊選擇,如何實現負載均衡,如何維護服務的高可用性,理解負載均衡對於微服務的重要意義,將對於咱們學習分佈式的開發起着推波助瀾的做用。