負載均衡算法 最小活躍數 一致性哈希算法
首先引出一點 負載均衡的目的是什麼?java
當一臺服務器的承受能力達到上限時,那麼就須要多臺服務器來組成集羣,提高應用總體的吞吐量,那麼這個時候就涉及到如何合理分配客戶端請求到集羣中不一樣的機器,這個過程就叫作負載均衡node
策略就是根據權重佔比隨機。算法很簡單,就是一根數軸。而後利用僞隨機數產生點, *看點落在了哪一個區域從而選擇對應的 服務器*git
輪詢算法是指依次訪問可用服務器列表,其和隨機本質是同樣的處理,在無權重因素下,輪詢只是在選數軸上的點時採起自增對長度取餘方式。有權重因素下依然自增取餘,再看選取的點落在了哪一個區域。github
利用Hash算法定位相同的服務器面試
Server
節點—————————— 下面這部分是來源於dubbo 官方文檔 ————————————apache
在集羣負載均衡時,Dubbo 提供了多種均衡策略,缺省爲 Random LoadBalance 隨機調用api
<dubbo:parameter key="hash.arguments" value="0,1" />
<dubbo:parameter key="hash.nodes" value="320" />
<dubbo:service interface="..." loadbalance="roundrobin" />
<dubbo:reference interface="..." loadbalance="roundrobin" />
<dubbo:service interface="..."> <dubbo:method name="..." loadbalance="roundrobin"/> </dubbo:service>
<dubbo:reference interface="..."> <dubbo:method name="..." loadbalance="roundrobin"/> </dubbo:reference>
———————————————— Dubbo 官方文檔已結束 ——————————————數組
上面官網文檔已經說明 Dubbo 的負載均衡算法總共有4種服務器
首先查看 LoadBalance 接口
Invoker select(List invokers, URL url, Invocation invocation) throws RpcException;
LoadBalance 定義了一個方法就是從 invokers 列表中選取一個
AbstractLoadBalance 抽象類是全部負載均衡策略實現類的父類,實現了LoadBalance接口 的方法,同時提供抽象方法交由子類實現,
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); } protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //先得到invoker 集合大小 int length = invokers.size(); // Number of invokers //總權重 int totalWeight = 0; // The sum of weights //每一個invoker是否有相同的權重 boolean sameWeight = true; // Every invoker has the same weight? // 計算總權重 for (int i = 0; i < length; i++) { //得到單個invoker 的權重 int weight = getWeight(invokers.get(i), invocation); //累加 totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } // 權重不相等,隨機後,判斷在哪一個 Invoker 的權重區間中 if (totalWeight > 0 && !sameWeight) { // 隨機 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 區間判斷 // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 權重相等,平均隨機 // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(ThreadLocalRandom.current().nextInt(length)); }
假定有3臺dubbo provider: 10.0.0.1:20884, weight=2 10.0.0.1:20886, weight=3 10.0.0.1:20888, weight=4 隨機算法的實現: totalWeight=9; 假設offset=1(即random.nextInt(9)=1) 1-2=-1<0?是,因此選中 10.0.0.1:20884, weight=2
假設offset=4(即random.nextInt(9)=4) 4-2=2<0?否,這時候offset=2, 2-3<0?是,因此選中 10.0.0.1:20886, weight=3
假設offset=7(即random.nextInt(9)=7) 7-2=5<0?否,這時候offset=5, 5-3=2<0?否,這時候offset=2, 2-4<0?是,因此選中 10.0.0.1:20888, weight=4
RoundRobinLoadBalance#doSelect()(輪詢)
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); // Number of invokers int maxWeight = 0; // The maximum weight int minWeight = Integer.MAX_VALUE; // The minimum weight final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; // 計算最小、最大權重,總的權重和。 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight minWeight = Math.min(minWeight, weight); // Choose the minimum weight if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } // 計算最小、最大權重,總的權重和。 AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } // 得到當前順序號,並遞增 + 1 int currentSequence = sequence.getAndIncrement(); // 權重不相等,順序根據權重分配 if (maxWeight > 0 && minWeight < maxWeight) { int mod = currentSequence % weightSum;// 剩餘權重 for (int i = 0; i < maxWeight; i++) {// 循環最大權重 for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); // 剩餘權重歸 0 ,當前 Invoker 還有剩餘權重,返回該 Invoker 對象 if (mod == 0 && v.getValue() > 0) { return k; } // 若 Invoker 還有權重值,扣除它( value )和剩餘權重( mod )。 if (v.getValue() > 0) { v.decrement(); mod--; } } } } // 權重相等,平均順序得到 // Round robin return invokers.get(currentSequence % length); }
假定有3臺權重都同樣的dubbo provider: 10.0.0.1:20884, weight=100 10.0.0.1:20886, weight=100 10.0.0.1:20888, weight=100 輪詢算法的實現: 其調用方法某個方法(key)的sequence從0開始: sequence=0時,選擇invokers.get(0%3)=10.0.0.1:20884 sequence=1時,選擇invokers.get(1%3)=10.0.0.1:20886 sequence=2時,選擇invokers.get(2%3)=10.0.0.1:20888 sequence=3時,選擇invokers.get(3%3)=10.0.0.1:20884 sequence=4時,選擇invokers.get(4%3)=10.0.0.1:20886 sequence=5時,選擇invokers.get(5%3)=10.0.0.1:20888
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 總個數 int length = invokers.size(); // Number of invokers // 最少的活躍數 int leastActive = -1; // The least active value of all invokers // 相同最小活躍數的個數 int leastCount = 0; // The number of invokers having the same least active value (leastActive) // 相同最小活躍數的下標 int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive) //總權重 int totalWeight = 0; // The sum of weights // 第一個權重,用於於計算是否相同 int firstWeight = 0; // Initial value, used for comparision // 是否全部權重相同 boolean sameWeight = true; // Every invoker has the same weight value? // 計算得到相同最小活躍數的數組和個數 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 活躍數 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number // 權重 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight // 發現更小的活躍數,從新開始 if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value. // 記錄最小活躍數 leastActive = active; // Record the current least active value // 從新統計相同最小活躍數的個數 leastCount = 1; // Reset leastCount, count again based on current leastCount // 從新記錄最小活躍數下標 leastIndexs[0] = i; // Reset // 從新統計總權重 totalWeight = weight; // Reset // 記錄第一個權重 firstWeight = weight; // Record the weight the first invoker // 還原權重標識 sameWeight = true; // Reset, every invoker has the same weight value? // 累計相同最小的活躍數 } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating. // 累計相同最小活躍數下標 leastIndexs[leastCount++] = i; // Record index number of this invoker // 累計總權重 totalWeight += weight; // Add this invoker's weight to totalWeight. // 判斷全部權重是否同樣 // If every invoker has the same weight? if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // 若是隻有一個最小則直接返回 // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexs[0]); } if (!sameWeight && totalWeight > 0) { // 若是權重不相同且權重大於0則按總權重數隨機 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // 並肯定隨機值落在哪一個片段上 // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // 若是權重相同或權重爲0則均等隨機 // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(leastIndexs[ThreadLocalRandom.current().nextInt(leastCount)]); }
簡單思路介紹
歸納起來就兩部分,一部分是 活躍數
和 權重
的統計,另外一部分是選擇 invoker
.也就是他把最小活躍數的 invoker
統計到 leastIndexs
數組中,若是權重一致(這個一致的規則參考上面的隨機算法)或者總權重爲0,則均等隨機調用,若是不一樣,則從 leastIndexs
數組中按照權重比例調用
最小活躍數算法實現: 假定有3臺dubbo provider: 10.0.0.1:20884, weight=2,active=2 10.0.0.1:20886, weight=3,active=4 10.0.0.1:20888, weight=4,active=3 active=2最小,且只有一個2,因此選擇10.0.0.1:20884
假定有3臺dubbo provider: 10.0.0.1:20884, weight=2,active=2 10.0.0.1:20886, weight=3,active=2 10.0.0.1:20888, weight=4,active=3 active=2最小,且有2個,因此從[10.0.0.1:20884,10.0.0.1:20886 ]中選擇; 接下來的算法與隨機算法相似: 假設offset=1(即random.nextInt(5)=1) 1-2=-1<0?是,因此選中 10.0.0.1:20884, weight=2 假設offset=4(即random.nextInt(5)=4) 4-2=2<0?否,這時候offset=2, 2-3<0?是,因此選中 10.0.0.1:20886, weight=3
源碼其實分爲四個步驟
ConcurrentMap<String, ConsistentHashSelector<?>> selectors
,key爲方法名稱,例如com.alibaba.dubbo.demo.TestService.getRandomNumberinvokers.size()*replicaNumber
public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; /** * 服務方法與一致性哈希選擇器的映射 * * KEY:serviceKey + "." + methodName */ private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); // 基於 invokers 集合,根據對象內存地址來計算定義哈希值 String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; int identityHashCode = System.identityHashCode(invokers); // 得到 ConsistentHashSelector 對象。若爲空,或者定義哈希值變動(說明 invokers 集合發生變化), // 進行建立新的 ConsistentHashSelector 對象 ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector<T> { /** * 虛擬節點與 Invoker 的映射關係 */ private final TreeMap<Long, Invoker<T>> virtualInvokers; /** * 每一個Invoker 對應的虛擬節點數 */ private final int replicaNumber; /** * 定義哈希值 */ private final int identityHashCode; /** * 取值參數位置數組 */ private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); // 設置 identityHashCode this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 初始化 replicaNumber this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // 初始化 argumentIndex String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 初始化 virtualInvokers for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); // 每四個虛擬結點爲一組,爲何這樣?下面會說到 for (int i = 0; i < replicaNumber / 4; i++) { // 這組虛擬結點獲得唯一名稱 byte[] digest = md5(address + i); // Md5是一個16字節長度的數組,將16字節的數組每四個字節一組, // 分別對應一個虛擬結點,這就是爲何上面把虛擬結點四個劃分一組的緣由 for (int h = 0; h < 4; h++) { // 對於每四個字節,組成一個long值數值,作爲這個虛擬節點的在環中的唯一key long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker<T> select(Invocation invocation) { // 基於方法參數,得到 KEY String key = toKey(invocation.getArguments()); // 計算 MD5 值 byte[] digest = md5(key); // 計算 KEY 值 return selectForKey(hash(digest, 0)); } /** * 基於方法參數,得到 KEY * @param args * @return */ private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } /** * 選一個 Invoker 對象 * @param hash * @return */ private Invoker<T> selectForKey(long hash) { // 獲得大於當前 key 的那個子 Map ,而後從中取出第一個 key ,就是大於且離它最近的那個 key Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); // 不存在,則取 virtualInvokers 第一個 if (entry == null) { entry = virtualInvokers.firstEntry(); } // 存在,則返回 return entry.getValue(); } /** * 對於每四個字節,組成一個 Long 值數值,作爲這個虛擬節點的在環中的唯一 KEY * @param digest * @param number * @return */ private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } /** * MD5 是一個 16 字節長度的數組,將 16 字節的數組每四個字節一組, * 分別對應一個虛擬結點,這就是爲何上面把虛擬結點四個劃分一組的緣由 * @param value * @return */ private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); } } }