你們好,今天給你們分享 Dubbo 中的負載均衡。在前一個章節中咱們介紹 Dubbo延遲服務暴露,咱們例舉了常見的使用場景而且進行了源碼解析來分析其實現原理,同時咱們也知道 Dubbo 延遲服務暴露其核心就是經過一個 延遲的調度器指定延遲時間後開始服務的暴露。不少小夥伴可能會好奇:咱們的服務部署基本都是集羣形式,那服務消費端究竟是調用哪個服務提供者呢?都有哪些常見的服務選擇算法?爲了揭開這些困惑,下面就讓咱們快速開始吧!java
那麼什麼是負載均衡呢?舉個簡單例子:在火車站購買火車票場景,咱們知道節假日確定有很是多的人購買火車票,那麼一個售票窗口賣票的話確定會排很長的隊對用戶來講體驗很是差。那咱們能不能多增長几個售票窗口呢?是否是並行處理的能力立刻獲得提升了呢?沒錯!咱們在軟件工程裏面也是如此。好比 Nginx 經常用做軟件負載均衡、F5用做硬件的負載均衡器等等。負載均衡算法能夠有效提升系統的併發處理能力、容錯能力、一致性訪問等。以下圖所示:node
隨機算法比較簡單就和數學中的隨機值選取同樣,好比:1-100中隨機選擇一個數字。在 Dubbo 中對隨機算法增長了一個權重概念。舉個例子:git
服務列表 | 權重 |
---|---|
Service01 | 10 |
Service02 | 20 |
Service03 | 20 |
Service04 | 50 |
從上表中咱們知道4個服務權重分別是十、20、20、50,參考下圖描述了隨機選擇步驟:算法
offset
,假設 offset
值爲 60。offset
- weight
結果值爲 50 > 0 繼續執行從新賦值offset
= 50。offset
- weight
結果值爲 30 > 0 繼續執行。offset
- weight
結果值爲 10 > 0 繼續執行。offset
- weight
結果值爲 -40 > 0 找到服務終止。Tips:此算法數據量越大數據分配越均勻。
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 服務總數 int length = invokers.size(); // 標識這些服務是否具備相同的權重 boolean sameWeight = true; // 全部服務權重集合 int[] weights = new int[length]; // 第一個服務權重 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // 權重總和 int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // 記錄權重 weights[i] = weight; // 合計權重值 totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // 從(0,totalWeight]範圍選擇權重值 int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 基於隨機值返回服務 for (int i = 0; i < length; i++) { // offset = offset - weights[i] offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 若是權重值相同或者totalWeight=0返回服務列表中隨機服務 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
invokers
服務列表計算出權重總和並標記出是否存在權重所有相同狀況。invokers
服務列表全部服務權重是否相同,相同則隨機返回服務列表中的一個服務。invokers
服務列表存在服務權重不相同時,產生一個權重總和範圍內的一個大於0的隨着整數賦值給 offset
,而後循環訪問服務列表中的元素而且使用 offset
減去當前循環元素的權重值,若是差值大於0則賦值給當前的 offset
繼續執行元素循環,當 offset
減去當前元素的權重大於零時中止循環,則當前查找的元素爲選擇的服務。輪詢負載均衡算法顧名思義就是輪流調用服務。舉個例子:假設4個請求經過 Nginx 代理轉發到後端服務:編程
同時 Dubbo 中的輪詢算法也是須要權重支持。輪詢負載均衡算法可讓 RPC 調用嚴格按照咱們設置的比例來分配。不論是少許的調用仍是大量的調用。可是輪詢負載均衡算法也有不足的地方,存在慢的服務累積請求的問題,好比:第二臺 Service2
很慢,但沒掛,當請求調到第二臺時就卡在那,長此以往,全部請求都卡在調到第二臺上,致使整個系統變慢。如下是加權輪詢算法圖:後端
public class RoundRobinLoadBalance extends AbstractLoadBalance { @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //服務惟一標識,獲取這一組服務第一個服務:由於這是一組服務全部key是相同的 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();//DemoService.method1 //經過服務惟一標識對這一組服務進行緩存 key -> Map ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; for (Invoker<T> invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString();//服務實例惟一表示:[test://127.0.0.1:1/DemoService,test://127.0.0.1:2/DemoService,test://127.0.0.1:3/DemoService] WeightedRoundRobin weightedRoundRobin = map.get(identifyString); //獲取服務權重 int weight = getWeight(invoker, invocation); //WeightedRoundRobin封裝實體 if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } //權重發生變動 if (weight != weightedRoundRobin.getWeight()) { //從新賦值 weightedRoundRobin.setWeight(weight); } long cur = weightedRoundRobin.increaseCurrent(); //記錄更新時間 weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) {//大於當前最大權重則被選擇 maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } //計算全部權重值 totalWeight += weight; } //獲取鎖而且存在服務下線狀況【invokers.size() != map.size()】 if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false, true)) { try { // copy -> modify -> update reference ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map); //回收週期 newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } if (selectedInvoker != null) { //將其選擇服務權重值減去總權重值 selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0); } }
maxCurrent
爲最小值 Long.MIN_VALUE
、totalWeight
爲0。invokers
服務列表,每一個元素中的 current
(初始化爲0)加上元素權重值大於 maxCurrent
,maxCurrent
而且賦值選中服務( selectedInvoker
)爲當前元素,而且累計當前元素權重值賦值給 totalWeight
。Tips:上述算法存在必定的難度,稍做了解便可。
咱們在實際服務調度場景中通常存在多服務實例部署狀況,當相同的應用部署到不一樣機器極可能存在不一樣服務處理的能力不一樣有快有慢,那麼咱們理想化的認爲在服務調用過程當中處理能力慢的服務器接收較少的服務請求更爲符合咱們的要求。以下示例圖:數組
從圖中能夠看出假設咱們這裏有5個請求經過 Ngingx 代理轉發到後端服務,這裏 Service02
只接收到一個請求(處理能力較弱)。緩存
public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); //初始化最小活躍值,最小活躍數,最小活躍的invoker索引(index),權重數組 int leastActive = -1; int leastCount = 0; int[] leastIndexes = new int[length]; int[] weights = new int[length]; int totalWeight = 0; int firstWeight = 0; boolean sameWeight = true; // 過濾出全部最少活躍服務 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 獲取這個invoker活躍數 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 獲取invoker的權重 默認100. int afterWarmup = getWeight(invoker, invocation); // 權重保存到集合中 weights[i] = afterWarmup; //找到最小活躍數invoker if (leastActive == -1 || active < leastActive) { //重置最小活躍數值 leastActive = active; //重置最小活躍計數 leastCount = 1; // 記錄最小活躍數對應索引 leastIndexes[0] = i; // 重置權重合計 totalWeight = afterWarmup; // 記錄第一個最小活躍數對應權重值 firstWeight = afterWarmup; // 重置是否具備相同權重標誌 sameWeight = true; //當前invoker活躍數等於最小活躍數 } else if (active == leastActive) { // 最小活躍數leastCount增長而且記錄當前invoker對應索引 leastIndexes[leastCount++] = i; // 累計invoker權重值 totalWeight += afterWarmup; // 判斷是否權重值相同 if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } //最小活躍數invoker只有一個 if (leastCount == 1) { // 直接返回 return invokers.get(leastIndexes[0]); } //具備多個相同活躍數、具備不一樣權重且權重和大於0 if (!sameWeight && totalWeight > 0) { // 根據權重值進行一個(0,totalWeight]整數隨機值 int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // 基於隨機值選擇所在區間和隨機負載均衡算法一致 for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 若是全部權重值相同或者totalWeight=0返回服務列表中隨機一個服務 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
invoker
索引(index
),權重數組。invokers
服務列表獲取 invoker
活躍數、權重且過濾出全部最少活躍服務並記錄最小活躍數對應索引,同時標記服務列表是否存在不一樣權重值。totalWeight
]整數隨機值totalWeight=0
返回服務列表中隨機一個服務。Tips:上述算法存在必定的難度,稍做了解便可。
要明白什麼是一致性 Hash 首先得知道什麼是 Hash 算法。咱們在學習 HashMap 的 JDK 源碼時能夠了解到 HashMap 的數據結構由一個數組+鏈表構成以下所示:服務器
上圖簡單畫出了 HashMap 底層數據存儲結構,從中能夠看出當咱們調用 HashMap.put(K key, V value)
方法時經過hash(key)
計算出一個 Hash 值與數組長度求模獲得一個索引即爲數組下標對應位置,當這個位置存在元素時即發生 Hash 碰撞就會產生一個鏈。微信
一個常見的場景,好比咱們在請求某個服務時須要固定訪問某一臺服務器以下所示:
從圖中能夠看出 Client01
經過 Nginx 固定訪問 Service01
、Client02
經過 Nginx 固定訪問 Service02
、Client03
經過 Nginx 固定訪問 Service03
。要知足這種場景就須要使用 Hash 算法。可是普通 Hash 算法存在一個問題是 Hash 出來的是實際後端服務節點,當其中某個服務宕機時 Hash 計算因子發生改變(求模的分子)若是計算 Hash 因子不調整則可能會路由到宕機那臺服務就會存在問題。那一致性 Hash 就是解決這樣的問題。
什麼是一致性 Hash 呢?一致性哈希算法是採用的環形哈希空間的方式,它首先根據 Ip 或者其餘信息爲機器節點生成一個 Hash 值,它投射到一個[0,2^32-1]的圓環中。以後咱們能夠認爲它是一個錨點。當請求進來後,它攜帶的數據,都統一輩子成一個 Hash 值,這時候比對以前的機器節點信息產生的 Hash 值,當遇到第一個大於或者等於該 Hash 值的緩存節點,咱們將這個數據便歸屬於這個機器節點。
假設虛擬節點12個、真實服務3個分別是:Invoker1
、Invoker2
、Invoker3
。下圖展現了服務節點怎樣進行虛擬節點映射的過程:
Invoker1
通過 Hash 計算後映射到環上對應位置
Invoker2
通過 Hash 計算後映射到環上對應位置
Invoker3
通過 Hash 計算後映射到環上對應位置
上圖中長方形色塊表明真實的服務,圓形表明虛擬節點。全部同一種顏色的圓形虛擬節點歸屬於同一色塊的長方形色塊所表明的真實服務。
public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; /** * Hash nodes name */ public static final String HASH_NODES = "hash.nodes"; /** * Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments"; private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //獲取調用方法名稱 String methodName = RpcUtils.getMethodName(invocation); //生成惟一標識 String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; int identityHashCode = System.identityHashCode(invokers); //對ConsistentHashSelector進行緩存 ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); //緩存中selector == null 或 當invokers中服務發生變化 服務下限/服務上線 invokers hash值發生變動須要從新構建ConsistentHashSelector if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } //執行服務選擇 return selector.select(invocation); } private static final class ConsistentHashSelector<T> { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { //虛擬Invoker(虛擬節點) this.virtualInvokers = new TreeMap<>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); //虛擬節點數 this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));//參與hash的參數索引 argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress();//127.0.0.1:1 // 將虛擬節點細化爲 (replicaNumber / 4) 份 for (int i = 0; i < replicaNumber / 4; i++) { //地址+序號進行摘要 byte[] digest = md5(address + i); // 每份4個點散列到treeMap中 for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); } private Invoker<T> selectForKey(long hash) { //獲取至少大於或等於給定hash值 Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { //獲取第一個Entry entry = virtualInvokers.firstEntry(); } return entry.getValue(); } } }
hash.arguments
屬性值,缺省是0參數位置。invokers
服務列表根據 ip+port+序號生成 replicaNumber
個的節點(生成虛擬節點),其中 key
就是算出來的 Hash 值,value
就是 invoker
且 key
的值從小到大排序的。hash.arguments
參數取出對應位的參數,拼接成 key
。md5
對 key
計算,使用 Hash 算法算出 key
對應的 Hash 值。key
的 Hash 值找出對應的 invoker
,這裏返回鍵值大於或等於 key
的那部分 ,而後再取第一個。Tips:上述算法存在必定的難度,稍做了解便可。
在本小節中咱們主要學習了 Dubbo 中常見的負載均衡算法以及使用場景。
本節課程的重點以下:
我的從事金融行業,就任過易極付、思建科技、某網約車平臺等重慶一流技術團隊,目前就任於某銀行負責統一支付系統建設。自身對金融行業有強烈的愛好。同時也實踐大數據、數據存儲、自動化集成和部署、分佈式微服務、響應式編程、人工智能等領域。同時也熱衷於技術分享創立公衆號和博客站點對知識體系進行分享。關注公衆號: 青年IT男 獲取最新技術文章推送!
博客地址: http://youngitman.tech
微信公衆號: