負載均衡是指在集羣中,將多個數據請求分散在不一樣單元上進行執行,主要爲了提升系統容錯能力和增強系統對數據的處理能力。java
在 Dubbo 中,一次服務的調用就是對全部實體域 Invoker 的一次篩選過濾,最終選定具體調用的 Invoker。首先在 Directory 中獲取所有 Invoker 列表,經過路由篩選出符合規則的 Invoker,最後再通過負載均衡選出具體的 Invoker。因此 Dubbo 負載均衡機制是決定一次服務調用使用哪一個提供者的服務。
Dubbo 負載均衡的分析入口是 org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance 抽象類,查看這個類繼承關係。node
這個被 RandomLoadBalance、LeastActiveLoadBalance、RoundRobinLoadBalance 及 ConsistentHashLoadBalance 類繼承,這四個類是 Dubbo 中提供的四種負載均衡算法的實現。算法
名稱 | 說明 |
---|---|
RandomLoadBalance | 隨機算法,根據權重設置隨機的機率 |
LeastActiveLoadBalance | 最少活躍數算法,指請求數和完成數之差,使執行效率高的服務接收更多請求 |
RoundRobinLoadBalance | 加權輪訓算法,根據權重設置輪訓比例 |
ConsistentHashLoadBalance | Hash 一致性算法,相同請求參數分配到相同提供者 |
以上則是 Dubbo 提供的四種負載均衡算法。apache
從上圖中,看到 AbstractLoadBalance 實現了 LoadBalance 接口,同時是一個 SPI 接口,指定默認實現爲 RandomLoadBalance 隨機算法機制。數組
抽象類 AbstractLoadBalance 中,實現了負載均衡通用的邏輯,同時給子類聲明瞭一個抽象方法供子類實現其負載均衡的邏輯。緩存
public abstract class AbstractLoadBalance implements LoadBalance { /** * * @param 運行時間(毫秒) * @param 預熱時間(毫秒) * @param 要計算的 Invoker 權重值 */ static int calculateWarmupWeight(int uptime, int warmup, int weight) { // 計算預熱時期的權重 int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); // 返回的權重值區間在: 1 ~ weight return ww < 1 ? 1 : (ww > weight ? weight : ww); } @Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 校驗 invokers 是否爲空 if (CollectionUtils.isEmpty(invokers)) { return null; } // 當到達負載均衡流程時,invokers 中只有一個 Invoker 時,直接返回該 Invoker if (invokers.size() == 1) { return invokers.get(0); } // 在不一樣負載均衡策略中完成具體的實現 return doSelect(invokers, url, invocation); } // 聲明抽象方法,在子類中具體實現 protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation); protected int getWeight(Invoker<?> invoker, Invocation invocation) { // 獲取當前Invoker配置的權重值 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT); if (weight > 0) { // 服務啓動時間 long timestamp = invoker.getUrl().getParameter(REMOTE_TIMESTAMP_KEY, 0L); if (timestamp > 0L) { // 服務已運行時長 int uptime = (int) (System.currentTimeMillis() - timestamp); // 服務預熱時間,默認 DEFAULT_WARMUP = 10 * 60 * 1000 ,預熱十分鐘 int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP); // 若是服務運行時長小於預熱時長,從新計算出預熱時期的權重 if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } // 保證最後返回的權重值不小於0 return weight >= 0 ? weight : 0; } }
在 AbstractLoadBalance 中,getWeight 和 calculateWarmupWeight 方法是獲取和計算當前 Invoker 的權重值。bash
getWeight 中獲取當前權重值,經過 URL 獲取當前 Invoker 設置的權重,若是當前服務提供者啓動時間小於預熱時間,則會從新計算權重值,對服務進行降權處理,保證服務能在啓動初期不分發設置比例的所有流量,健康運行下去。服務器
calculateWarmupWeight 是從新計算權重值的方法,計算公式爲:服務運行時長 / (預熱時長 / 設置的權重值)
,等價於(服務運行時長 / 預熱時長) * 設置的權重值
,同時條件服務運行時長 < 預熱時長
。由該公式可知,預熱時長和設置的權重值不變,服務運行時間越長,計算出的值越接近 weight,但不會等於 weight。
在返回計算後的權重結果中,對小於1和大於設置的權重值進行了處理,當從新計算後的權重小於1時返回1;處於1和設置的權重值之間時,直接返回計算後的結果;當權重大於設置的權重值時(由於條件限制,不會出現該類狀況),返回設置的權重值。因此得出結論:從新計算後的權重值爲 1 ~ 設置的權重值,運行時間越長,計算出的權重值越接近設置的權重值。app
經過 XML 配置方式:負載均衡
<!-- 服務級別配置 --> <dubbo:service id="xXXXService" interface="top.ytao.service.XXXXService" class="top.ytao.service.impl.XXXXServiceImpl" loadbalance="負載策略" /> <!-- 方法級別配置 --> <dubbo:service id="xXXXService" interface="top.ytao.service.XXXXService" class="top.ytao.service.impl.XXXXServiceImpl"> <dubbo:method name="方法名" loadbalance="負載策略"/> </dubbo:service>
經過 Properties 配置:
dubbo.service.loadbalance=負載策略
經過註解方式:
@Service(loadbalance = "負載策略")
經過 XML 配置方式:
<!-- 服務級別配置 --> <dubbo:reference id="xXXXService" interface="top.ytao.service.XXXXService" loadbalance="負載策略" /> <!-- 方法級別配置 --> <dubbo:reference id="xXXXService" interface="top.ytao.service.XXXXService"> <dubbo:method name="方法名" loadbalance="負載策略"/> </dubbo:reference>
經過 Properties 配置:
dubbo.reference.loadbalance=負載策略
經過註解配置方式:
@Reference(loadbalance = "負載策略")
實現方式也可經過 Dubbo-Admin 管理後臺進行配置,如圖:
加權隨機算法負載均衡策略(RandomLoadBalance)是 dubbo 負載均衡的默認實現方式,根據權重分配各個 Invoker 隨機選中的比例。這裏的意思是:將到達負載均衡流程的 Invoker 列表中的 權重進行求和,而後求出單個 Invoker 權重在總權重中的佔比,隨機數就在總權重值的範圍內生成。
如圖,假如當前有192.168.1.10
和192.168.1.11
兩個負載均衡的服務,權重分別爲 四、6 ,則它們的被選中的比例爲 2/五、3/5。
當生成隨機數爲 6 時,就會選中192.168.1.11
的服務。
dubbo 中 RandomLoadBalance 的 doSelect 實現代碼:
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Invoker 數量 int length = invokers.size(); // 標識全部 Invoker 的權重是否都同樣 boolean sameWeight = true; // 用一個數組保存每一個 Invoker 的權重 int[] weights = new int[length]; // 第一個 Invoker 的權重 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // 求和總權重 int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // 保存每一個 Invoker 的權重到數組總 weights[i] = weight; // 累加求和總權重 totalWeight += weight; // 若是不是全部 Invoker 的權重都同樣,就給標記上 sameWeight = false if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 計算隨機數取到的 Invoker,條件是必須總權重大於0,而且每一個 Invoker 的權重都不同 if (totalWeight > 0 && !sameWeight) { // 基於 0~總數 範圍內生成隨機數 int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 計算隨機數對應的 Invoker for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 若是全部 Invoker 的權重都同樣則隨機從 Invoker 列表中返回一個 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
以上就是加權隨機策略的實現,這裏比較主要關注計算生成的隨機數對應的 Invoker。經過遍歷權重數組,生成的數累減當前權重值,當 offset 爲 0 時,就表示 offset 對應當前的 Invoker 服務。
以生成的隨機數爲 6 爲例,遍歷 Invokers 長度:
加權隨機策略並不是必定按照比例被選到,理論上調用次數越多,分佈的比例越接近權重所佔的比例。
最小活躍數負載均衡策略(LeastActiveLoadBalance)是從最小活躍數的 Invoker 中進行選擇。什麼是活躍數呢?活躍數是一個 Invoker 正在處理的請求的數量,當 Invoker 開始處理請求時,會將活躍數加 1,完成請求處理後,將相應 Invoker 的活躍數減 1。找出最小活躍數後,最後根據權重進行選擇最終的 Invoker。若是最後找出的最小活躍數相同,則隨機從中選中一個 Invoker。
public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Invoker 數量 int length = invokers.size(); // 全部 Invoker 中的最小活躍值都是 -1 int leastActive = -1; // 最小活躍值 Invoker 的數量 int leastCount = 0; // 最小活躍值 Invoker 在 Invokers 列表中對應的下標位置 int[] leastIndexes = new int[length]; // 保存每一個 Invoker 的權重 int[] weights = new int[length]; // 總權重 int totalWeight = 0; // 第一個最小活躍數的權重 int firstWeight = 0; // 最小活躍數 Invoker 列表的權重是否同樣 boolean sameWeight = true; // 找出最小活躍數 Invoker 的下標 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 獲取最小活躍數 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 獲取權重 int afterWarmup = getWeight(invoker, invocation); // 保存權重 weights[i] = afterWarmup; // 若是當前最小活躍數爲-1(-1爲最小值)或小於leastActive if (leastActive == -1 || active < leastActive) { // 重置最小活躍數 leastActive = active; // 重置最小活躍數 Invoker 的數量 leastCount = 1; // 保存當前 Invoker 在 Invokers 列表中的索引至leastIndexes數組中 leastIndexes[0] = i; // 重置最小活躍數 invoker 的總權重值 totalWeight = afterWarmup; // 記錄當前 Invoker 權重爲第一個最小活躍數 Invoker 的權重 firstWeight = afterWarmup; // 由於當前 Invoker 重置爲第一個最小活躍數 Invoker ,因此標識全部最小活躍數 Invoker 權重都同樣的值爲 true sameWeight = true; // 若是當前最小活躍數和已聲明的最小活躍數相等 } else if (active == leastActive) { // 記錄當前 Invoker 的位置 leastIndexes[leastCount++] = i; // 累加當前 Invoker 權重到總權重中 totalWeight += afterWarmup; // 若是當前權重與firstWeight不相等,則將 sameWeight 改成 false if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // 若是最小活躍數 Invoker 只有一個,直接返回該 Invoker if (leastCount == 1) { return invokers.get(leastIndexes[0]); } if (!sameWeight && totalWeight > 0) { // 根據權重隨機從最小活躍數 Invoker 列表中選擇一個 int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 若是全部 Invoker 的權重都同樣則隨機從 Invoker 列表中返回一個 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
這段代碼的整個邏輯就是,從 Invokers 列表中篩選出最小活躍數的 Invoker,而後相似加權隨機算法策略方式選擇最終的 Invoker 服務。
加權輪詢負載均衡策略(RoundRobinLoadBalance)是基於權重來決定輪詢的比例。普通輪詢會將請求均勻的分佈在每一個節點,但不能很好調節不一樣性能服務器的請求處理,因此加權負載均衡來根據權重在輪詢機制中分配相對應的請求比例給每臺服務器。
public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private static final int RECYCLE_PERIOD = 60000; protected static class WeightedRoundRobin { private int weight; private AtomicLong current = new AtomicLong(0); private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); private AtomicBoolean updateLock = new AtomicBoolean(); /** * get invoker addr list cached for specified invocation * <p> * <b>for unit test only</b> * * @param invokers * @param invocation * @return */ protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map != null) { return map.keySet(); } return null; } @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // key 爲 接口名+方法名 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // 查看緩存中是否存在相應服務接口的信息,若是沒有則新添加一個元素到緩存中 ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } // 總權重 int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; // 當前時間戳 long now = System.currentTimeMillis(); // 最大 current 的 Invoker Invoker<T> selectedInvoker = null; // 保存選中的 WeightedRoundRobin 對象 WeightedRoundRobin selectedWRR = null; // 遍歷 Invokers 列表 for (Invoker<T> invoker : invokers) { // 從緩存中獲取 WeightedRoundRobin 對象 String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); // 獲取當前 Invoker 對象 int weight = getWeight(invoker, invocation); // 若是當前 Invoker 沒有對應的 WeightedRoundRobin 對象,則新增一個 if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } // 若是當前 Invoker 權重不等於對應的 WeightedRoundRobin 對象中的權重,則從新設置當前權重到對應的 WeightedRoundRobin 對象中 if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // 累加權重到 current 中 long cur = weightedRoundRobin.increaseCurrent(); // 設置 weightedRoundRobin 對象最後更新時間 weightedRoundRobin.setLastUpdate(now); // 最大 current 的 Invoker,並賦值給相應的變量 if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } // 累加權重到總權重中 totalWeight += weight; } // 若是 Invokers 列表中的數量不等於緩存map中的數量 if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false, true)) { try { // 拷貝 map 到 newMap 中 ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>(); newMap.putAll(map); // newMap 轉化爲 Iterator Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator(); // 循環刪除超過設定時長沒更新的緩存 while (it.hasNext()) { Entry<String, WeightedRoundRobin> item = it.next(); if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) { it.remove(); } } // 將當前newMap服務緩存中 methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } // 若是存在被選中的 Invoker if (selectedInvoker != null) { // 計算 current = current - totalWeight selectedWRR.sel(totalWeight); return selectedInvoker; } // 正常狀況這裏不會到達 return invokers.get(0); } }
上面選中 Invoker 邏輯爲:每一個 Invoker 都有一個 current 值,初始值爲自身權重。在每一個 Invoker 中current = current + weight
。遍歷完 Invoker 後,current 最大的那個 Invoker 就是本次選中的 Invoker。選中 Invoker 後,將本次 current 值計算current = current - totalWeight
。
以上面192.168.1.10
和192.168.1.11
兩個負載均衡的服務,權重分別爲 四、6 。基於選中前current = current + weight
、選中後current = current - totalWeight
計算公式得出以下
請求次數 | 選中前 current | 選中後 current | 被選中服務 |
---|---|---|---|
1 | [4, 6] | [4, -4] | 192.168.1.11 |
2 | [8, 2] | [-2, 2] | 192.168.1.10 |
3 | [2, 8] | [2, -2] | 192.168.1.11 |
4 | [6, 4] | [-4, 4] | 192.168.1.10 |
5 | [0, 10] | [0, 0] | 192.168.1.11 |
一致性 Hash 負載均衡策略(ConsistentHashLoadBalance)是讓參數相同的請求分配到同一機器上。把每一個服務節點分佈在一個環上,請求也分佈在環形中。以請求在環上的位置,順時針尋找換上第一個服務節點。如圖所示:
同時,爲避免請求散列不均勻,dubbo 中會將每一個 Invoker 再虛擬多個節點出來,使得請求調用更加均勻。
一致性 Hash 修改配置以下:
<!-- dubbo 默認只對第一個參數進行 hash 標識,指定hash參數 --> <dubbo:parameter key="hash.arguments" value="1" /> <!-- 虛擬節點數量 --> <dubbo:parameter key="hash.nodes" value="200" />
一致性 Hash 實現以下:
public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; /** * Hash nodes name */ public static final String HASH_NODES = "hash.nodes"; /** * Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments"; private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 獲取請求的方法名 String methodName = RpcUtils.getMethodName(invocation); // key = 接口名+方法名 String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // invokers 的 hashcode int identityHashCode = System.identityHashCode(invokers); // 查看緩存中是否存在對應 key 的數據,或 Invokers 列表是否有過變更。若是沒有,則新添加到緩存中,而且返回負載均衡得出的 Invoker ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } // ConsistentHashSelector class ... }
doSelect 中主要實現緩存檢查和 Invokers 變更檢查,一致性 hash 負載均衡的實如今這個內部類 ConsistentHashSelector 中實現。
private static final class ConsistentHashSelector<T> { // 存儲虛擬節點 private final TreeMap<Long, Invoker<T>> virtualInvokers; // 節點數 private final int replicaNumber; // invoker 列表的 hashcode,用來判斷 Invoker 列表是否變化 private final int identityHashCode; // 請求中用來做Hash映射的參數的索引 private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 獲取節點數 this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); // 獲取配置中的 參數索引 String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { // 獲取 Invoker 中的地址,包括端口號 String address = invoker.getUrl().getAddress(); // 建立虛擬節點 for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } // 找出 Invoker public Invoker<T> select(Invocation invocation) { // 將參數轉爲字符串 String key = toKey(invocation.getArguments()); // 字符串參數轉換爲 md5 byte[] digest = md5(key); // 根據 md5 找出 Invoker return selectForKey(hash(digest, 0)); } // 將參數拼接成字符串 private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } // 利用 md5 匹配到對應的 Invoker private Invoker<T> selectForKey(long hash) { // 找到第一個大於當前 hash 的 Invoker Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } // hash 運算 private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } // md5 運算 private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); } }
一致 hash 實現過程就是先建立好虛擬節點,虛擬節點保存在 TreeMap 中。TreeMap 的 key 爲配置的參數先進行 md5 運算,而後將 md5 值進行 hash 運算。TreeMap 的 value 爲被選中的 Invoker。
最後請求時,計算參數的 hash 值,去從 TreeMap 中獲取 Invoker。
Dubbo 負載均衡的實現,技巧上仍是比較優雅,能夠多多學習其編碼思惟。在研究其代碼時,須要仔細研究其實現原理,不然比較難懂其思想。
推薦閱讀