文章異常囉嗦且繞彎。java
Dubbo 版本 : dubbo 3.0apache
Dubbo LoadBalance 是 Dubbo Consumer 中用於負載均衡的組件,位於 Cluster 層中。數組
LoadBalance 的組件遵循 Dubbo 的通常設計規律,接口在 dubbo-cluster 模塊中:緩存
package org.apache.dubbo.rpc.cluster; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.Adaptive; import org.apache.dubbo.common.extension.SPI; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance; import java.util.List; @SPI(RandomLoadBalance.NAME) // RandomLoadBalance.NAME = random public interface LoadBalance { /** * 能夠在 url 裏傳入 loadbalance 參數來切換負載均衡策略,默認根據 spi 機制,會使用 random */ @Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.cluster.LoadBalance; import java.util.List; import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_SERVICE_REFERENCE_PATH; import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP; import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT; import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY; import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY; /** * 負載均衡組件模板 */ public abstract class AbstractLoadBalance implements LoadBalance { static int calculateWarmupWeight(int uptime, int warmup, int weight) { // ww = (當前時間 - 啓動時間) / 預熱時間 * 權重 // 取 ww 和 權重 中的最小值 // 若是 當前時間 還在 預熱時間 內,那麼此處 ww 必然小於 權重 // 若是 當前時間 和 啓動時間 相差很是近,或者 預熱時間 很長,那麼此處 ww 有可能會小於 1,此處會返回 1 // 若是 當前時間 小於 啓動時間,那麼是服務的時間問題,ww 就會小於 0,此處會返回 1 // 從 getWeight(...) 方法可知,此處 ww 必然小於 weight int ww = (int) ( uptime / ((float) warmup / weight)); return ww < 1 ? 1 : (Math.min(ww, weight)); } /** * 接口抽象方法 select 的實現,也是模版的核心方法 * * @param invokers 全部的服務提供者信息的封裝 * @param url 當前調用者的 url * @param invocation 要發送的信息 */ @Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 若是沒有服務提供者,此處返回 null if (CollectionUtils.isEmpty(invokers)) { return null; } // 服務的提供者只有一個,直接返回就能夠了,沒有負載均衡的必要 if (invokers.size() == 1) { return invokers.get(0); } // 有多個,那麼此處須要不一樣的策略自行完成具體邏輯 return doSelect(invokers, url, invocation); } /** * 模板方法的具體實現,從列表中選擇一個 invoker * * @param invokers 全部的服務提供者信息的封裝 * @param url 當前調用者的 url * @param invocation 要發送的信息 */ protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation); /** * 獲取 invoker 權重的方法,在 random 和 robin * 中很重要的方法 */ int getWeight(Invoker<?> invoker, Invocation invocation) { int weight; // 獲取 url URL url = invoker.getUrl(); // REGISTRY_SERVICE_REFERENCE_PATH = org.apache.dubbo.registry.RegistryService // REGISTRY_KEY = registry // WEIGHT_KEY = weight // DEFAULT_WEIGHT = 100 // TIMESTAMP_KEY = timestamp // WARMUP_KEY = warmup // DEFAULT_WARMUP = 600000 if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) { // 入參 registry.weight 和 100 weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT); } else { // provider 的權重 weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT); if (weight > 0) { // 權重大於 0 // provider 的啓動的時間戳 long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L); if (timestamp > 0L) { long uptime = System.currentTimeMillis() - timestamp; if (uptime < 0) { // 啓動的時間戳小於當前時間戳,這種狀況多是存在服務器時間問題 // 此處爲什麼返回 1 ? return 1; } // warmup 是預熱時間,若是當前時間內,這個 provider 還處於預熱當中 // 那麼就會調用到 calculateWarmupWeight(...) 方法 int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight((int)uptime, warmup, weight); } } } } // 權重不能低於 0 return Math.max(weight, 0); } }
在 Dubbo 3.0 中,負載均衡策略存在如下幾種:服務器
筆者這裏暫時只列舉前三種,後面兩種有緣補充 (其實是由於還沒看完)。負載均衡
默認策略,其實是考慮了權重以後的隨機選擇,若是每一個服務提供者的權重都一致,那麼就使用 java 的隨機函數去選擇一個。dom
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * 考慮權重值以後的隨機負載均衡 */ public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 獲取服務提供者的數量 int length = invokers.size(); // 默認全部的服務提供者是相同的權重 boolean sameWeight = true; // 權重數組 int[] weights = new int[length]; // 獲取第一個服務提供者的權重 int firstWeight = getWeight(invokers.get(0), invocation); // 存入數組 weights[0] = firstWeight; // 權重的和 int totalWeight = firstWeight; // 輪詢全部的提供者的權重並記錄下來 for (int i = 1; i < length; i++) { // 此處和上方代碼相似 int weight = getWeight(invokers.get(i), invocation); weights[i] = weight; totalWeight += weight; // 若是遇到不同的就把標識改爲 false if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 不一樣權重模式下的隨機計算 // 大概思路是 row 一個隨機值,並按照順序進行相減,觀察落在哪一個區間內 if (totalWeight > 0 && !sameWeight) { int offset = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 相同權重下的隨機計算 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
輪詢負載均衡策略,本質上也是考慮了權重以後的輪循。若是 A 服務提供者的權重是 B 服務提供者的兩倍,那麼理論上 A 被輪循到的次數就會是 B 的兩倍。ide
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; /** * Round robin load balance. * * 輪詢負載均衡策略 */ public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private static final int RECYCLE_PERIOD = 60000; /** * 權重的封裝 */ protected static class WeightedRoundRobin { private int weight; private AtomicLong current = new AtomicLong(0); private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); /** * get invoker addr list cached for specified invocation * <p> * <b>for unit test only</b> * * @param invokers * @param invocation * @return */ protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map != null) { return map.keySet(); } return null; } @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // key = serviceKey + methodName // 這個 key 表明一個 provider 接口 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // 獲取權重記錄,若是沒有的話會建立一個空 map ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; // 被選中的 provider WeightedRoundRobin selectedWRR = null; // 被選中的 provider 的權重 entity for (Invoker<T> invoker : invokers) { // 此處若是存在權重記錄就直接返回,不存在就初始化一個 // identifyString 是緩存的 key String identifyString = invoker.getUrl().toIdentityString(); /* 獲取權重的封裝對象,若是沒有的話會建立一個 WeightedRoundRobin 維護兩個重要的參數, 一個數 current,表明該 provider 當前的調用權重 一個是 weight,表明該 provider 恆定的配置權重 */ int weight = getWeight(invoker, invocation); WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> { WeightedRoundRobin wrr = new WeightedRoundRobin(); wrr.setWeight(weight); return wrr; }); // 改權重數據 if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // cur = weightedRoundRobin.current + weightedRoundRobin.weight long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); // 此處的 cur > maxCurrent,本質上選出了全部 provider 中 current 最大的一個 // 此處結合上述邏輯,至關於給每一個 provider 的 current 增長了一次 weight // 並選出了 current 最大的那一個,做爲調用方 if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } // 對 map 進行自檢 // 若是超過 60 秒都沒有被調用,此處即認爲服務已經異常,就會移除 if (invokers.size() != map.size()) { map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); } if (selectedInvoker != null) { // weightedRoundRobin.current = weightedRoundRobin.current - totalWeight // 至關於 selectedWRR.sel(totalWeight); return selectedInvoker; } /** * 上述邏輯簡圖 * 假設三個服務 s1,s2,s3 權重均爲 10 * * 第一輪疊加權重後的 current: * 10 10 10 * 第一輪選擇推送 s1,推送完成後的 current: * -20 10 10 * * 第二輪疊加權重後的 current: * -10 20 20 * 第二輪選擇推送 s2,推送完成後的 current: * -10 -10 20 * * 第三輪疊加權重後的 current: * 0 0 30 * 第三輪選擇推送 s3,推送完成後的 current: * 0 0 0 * * 第四輪疊加權重後的 current: * 10 10 10 * 第四輪選擇推送 s1,推送完成後的 current: * -20 10 10 * * * 以此類推。 */ // 上述代碼出問題的狀況下默認選第一個 return invokers.get(0); } }
根據響應時間和當前服務的請求量去得到一個最優解。若是存在多個最優解,則考慮權重,若是僅有一個則權重無效。函數
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcStatus; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * 根據最優解選擇服務提供者 */ public class ShortestResponseLoadBalance extends AbstractLoadBalance { public static final String NAME = "shortestresponse"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 可調用的服務提供者的數量 int length = invokers.size(); // 初始化一個最短 response 時間 long shortestResponse = Long.MAX_VALUE; // 初始化一個最短 response 總數 int shortestCount = 0; // The index of invokers having the same estimated shortest response time int[] shortestIndexes = new int[length]; // 每一個服務提供者的權重 int[] weights = new int[length]; // 權重和 int totalWeight = 0; // 調用平均返回時間最短的服務提供者的權重 int firstWeight = 0; // 權重是否相同 boolean sameWeight = true; // 輪詢全部的服務提供者 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 獲取服務提供者的狀態 RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // 平均服務調用成功返回時間 long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed(); // 正在活躍的請求數 int active = rpcStatus.getActive(); // 此處用平均時間乘以活躍數,得到打分 // 若是服務提供方很健壯,平均時間很短,可是請求分配的不少,這裏分數也會比較高 // 分數越低,優先級越高 long estimateResponse = succeededAverageElapsed * active; // 獲取權重 int afterWarmup = getWeight(invoker, invocation); weights[i] = afterWarmup; /** * 計算最短數組,shortestResponse 記錄當前最短的 */ if (estimateResponse < shortestResponse) { // 若是當前服務提供者的得分低於最低的得分,則更新最低得分, // 並將最優提供者數組的首位置爲當前的提供者 shortestResponse = estimateResponse; shortestCount = 1; shortestIndexes[0] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; } else if (estimateResponse == shortestResponse) { // 若是相等,則可能存在多個最優解 shortestIndexes[shortestCount++] = i; totalWeight += afterWarmup; if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // 最優解只有一個的狀況,直接選最優解進行調用 if (shortestCount == 1) { return invokers.get(shortestIndexes[0]); } // 最優解不止一個,且最優解之間的權重不一樣,那麼此處根據權重去隨機選擇一個 if (!sameWeight && totalWeight > 0) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < shortestCount; i++) { int shortestIndex = shortestIndexes[i]; offsetWeight -= weights[shortestIndex]; if (offsetWeight < 0) { return invokers.get(shortestIndex); } } } // 最優解不止一個,且權重相同,則隨機選擇 return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]); } }
本文僅爲我的的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充學習