dubbo做爲分佈式遠程調用框架,要保證的點不少,好比:服務註冊與發現、故障轉移、高性能通訊、負載均衡等等!node
負載均衡的目的是爲了特定場景下,可以將請求合理地平分到各服務實例上,以便發揮全部機器的疊加做用。主要考慮的點如:不要分配請求到掛掉的機器,性能越好的機器能夠分配更多的請求。。。nginx
通常負載均衡是藉助外部工具,硬件負載均衡或軟件負載均衡,如F5/nginx。固然了,在當前分佈式環境遍地開花的狀況下,客戶端的負載均衡看起來就更輕量級,顯得不可或缺。算法
今天咱們就來看看dubbo是如何進行負載均衡的吧!apache
其出發點,天然也就是普通的負載均衡器的出發點了。將負載均衡功能實如今rpc客戶端側,以便可以隨時適應外部的環境變化,更好地發揮硬件做用。並且客戶端的負載均衡自然地就避免了單點問題。定製化的自有定製化的優點和劣勢。服務器
它能夠從配置文件中指定,也能夠在管理後臺進行配置修改。app
事實上,它支持 服務端服務/方法級別、客戶端服務/方法級別 的負載均衡配置。負載均衡
即dubbo提供了哪些負載均衡策略呢?框架
Dubbo內置了4種負載均衡策略:less
RandomLoadBalance:隨機負載均衡。隨機的選擇一個。是Dubbo的默認負載均衡策略。dom
RoundRobinLoadBalance:輪詢負載均衡。輪詢選擇一個。
LeastActiveLoadBalance:最少活躍調用數,相同活躍數的隨機。活躍數指調用先後計數差。使慢的 Provider 收到更少請求,由於越慢的 Provider 的調用先後計數差會越大。
ConsistentHashLoadBalance:一致性哈希負載均衡。相同參數的請求老是落在同一臺機器上。
其實在第一點時已經提過,有多種級別的配置:服務端服務/方法級別、客戶端服務/方法級別; 具體配置以下:
<!-- 服務端服務級別 --> <dubbo:service interface="..." loadbalance="roundrobin" /> <!-- 客戶端服務級別 --> <dubbo:reference interface="..." loadbalance="roundrobin" /> <!-- 服務端方法級別 --> <dubbo:service interface="..."> <dubbo:method name="hello" loadbalance="roundrobin"/> </dubbo:service> <!-- 客戶端方法級別 --> <dubbo:reference interface="..."> <dubbo:method name="hello" loadbalance="roundrobin"/> </dubbo:reference>
多個配置是有覆蓋關係的, 配置的優先級是:
1. 客戶端方法級別配置;(最優先)
2. 客戶端接口級別配置;
3. 服務端方法級別配置;
4. 服務端接口級別配置;(最後使用)
注意: 雖然說以上配置有全封閉服務端配置的,有針對客戶端配置的,可是,真正使負載均衡起做用的是,客戶端在發起調用的時候,使用相應負載均衡算法進行選擇調用。(服務端不可能有這能力)
負載均衡器的初始化過程以下:
// 調用提供者服務入口 // org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke @Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // binding attachments into invocation. Map<String, Object> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } List<Invoker<T>> invokers = list(invocation); // 根據可用的提供者列表和要調用的方法,決定選取的負載均衡器 LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } // 實例化一個負載均衡器,以備後續使用 // org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#initLoadBalance /** * Init LoadBalance. * <p> * if invokers is not empty, init from the first invoke's url and invocation * if invokes is empty, init a default LoadBalance(RandomLoadBalance) * </p> * * @param invokers invokers * @param invocation invocation * @return LoadBalance instance. if not need init, return null. */ protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) { if (CollectionUtils.isNotEmpty(invokers)) { // 從provider 的 url 地址中取出 loadbalance=xxx 配置,若是沒有仍使用 random 策略 return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE)); } else { // 默認是 random 策略 return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE); } }
因此,事實上,到最終客戶端決定使用哪一個負載均衡策略時,只是從請求參數中取出 loadbalance=xxx 的參數,進而決定具體實例。前面全部的配置,也都是爲決定這個參數作出的努力。
前面咱們看到,dubbo中提供了4種負載均衡策略,功能也是很明瞭。那麼他們都是如何實現的呢?
先來看下其繼承圖:
很明顯,多個負載均衡器都有一些共同點,因此統一使用 AbstractLoadBalance 進行抽象模板方法,差別點由各子算法決定便可。
那麼抽象類中,到底有多少公用功能被抽取出來了呢?到底什麼是公用的呢?
// org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance#select @Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 沒有可用的提供者,沒得選 if (CollectionUtils.isEmpty(invokers)) { return null; } // 只有一個提供者,沒得均衡的,直接用 if (invokers.size() == 1) { return invokers.get(0); } // 而後就是各自均衡算法的實現了 return doSelect(invokers, url, invocation); }
好吧,看起來是我想多了。抽象方法並無太多的職責,僅作普通判空操做而已。不過它卻是提供幾個公用方法被調用,如 getWeight();
事實上,模板方法更多地存在於集羣的抽象調用方法中。AbstractClusterInvoker 。
整個負載均衡的功能,都被統一放在 cluster 模塊下的 loadbalance 包下,一看即明瞭。
仍是來看具體的實現好玩些!
/** * This class select one provider from multiple providers randomly. * You can define weights for each provider: * If the weights are all the same then it will use random.nextInt(number of invokers). * If the weights are different then it will use random.nextInt(w1 + w2 + ... + wn) * Note that if the performance of the machine is better than others, you can set a larger weight. * If the performance is not so good, you can set a smaller weight. */ public class RandomLoadBalance extends AbstractLoadBalance { // 標識自身 public static final String NAME = "random"; /** * Select one invoker between a list using a random criteria * @param invokers List of possible invokers * @param url URL * @param invocation Invocation * @param <T> * @return The selected invoker */ @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Number of invokers int length = invokers.size(); // Every invoker has the same weight? boolean sameWeight = true; // the weight of every invokers int[] weights = new int[length]; // the first invoker's weight int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // The sum of weights int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // save for later use weights[i] = weight; // 計算出全部權重和,以便在進行隨機時設定範圍 totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 針對各提供供者權重不一的狀況,則找到第一個大於隨機數的提供者便可 if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. // 若是你們權重都同樣,則直接以個數進行隨機便可獲得提供者 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
稍微有點小技巧的就是,針對不同權重的隨機實現,以相減的方式找到第一個爲負的提供者便可。注意,此處計算各提供者權重的算法,倒成了難點了有木有。
/** * Round robin load balance. */ public class RoundRobinLoadBalance extends AbstractLoadBalance { // 自身標識 public static final String NAME = "roundrobin"; private static final int RECYCLE_PERIOD = 60000; protected static class WeightedRoundRobin { private int weight; private AtomicLong current = new AtomicLong(0); private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); private AtomicBoolean updateLock = new AtomicBoolean(); /** * get invoker addr list cached for specified invocation * <p> * <b>for unit test only</b> * * @param invokers * @param invocation * @return */ protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map != null) { return map.keySet(); } return null; } @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; for (Invoker<T> invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); int weight = getWeight(invoker, invocation); if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } if (weight != weightedRoundRobin.getWeight()) { //weight changed weightedRoundRobin.setWeight(weight); } // 自增權重 long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); // 獲取最大權重項,並以對應的 invoker 做爲本次選擇的實例 if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } // 當invoker數量發生變化時,須要能感知到,以便清理 map, 避免內存泄露 if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false, true)) { try { // copy -> modify -> update reference // 超出計數週期,則清空原來的 WeightedRoundRobin ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map); newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } if (selectedInvoker != null) { // 將本次選中的invoker, 權重置爲最低, 以便下次不會再被選中 selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0); } }
依次從最大權重的invoker開始選擇,而後將選中的項放到最後,輪流選中。使用一個 ConcurrentHashMap 來保存每一個url的權重信息,且維護其活躍性。
/** * LeastActiveLoadBalance * <p> * Filter the number of invokers with the least number of active calls and count the weights and quantities of these invokers. * If there is only one invoker, use the invoker directly; * if there are multiple invokers and the weights are not the same, then random according to the total weight; * if there are multiple invokers and the same weight, then randomly called. */ public class LeastActiveLoadBalance extends AbstractLoadBalance { // 自身標識 public static final String NAME = "leastactive"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Number of invokers int length = invokers.size(); // The least active value of all invokers int leastActive = -1; // The number of invokers having the same least active value (leastActive) int leastCount = 0; // The index of invokers having the same least active value (leastActive) int[] leastIndexes = new int[length]; // the weight of every invokers int[] weights = new int[length]; // The sum of the warmup weights of all the least active invokers int totalWeight = 0; // The weight of the first least active invoker int firstWeight = 0; // Every least active invoker has the same weight value? boolean sameWeight = true; // Filter out all the least active invokers for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // Get the active number of the invoker int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Get the weight of the invoker's configuration. The default value is 100. int afterWarmup = getWeight(invoker, invocation); // save for later use weights[i] = afterWarmup; // If it is the first invoker or the active number of the invoker is less than the current least active number if (leastActive == -1 || active < leastActive) { // Reset the active number of the current invoker to the least active number leastActive = active; // Reset the number of least active invokers leastCount = 1; // Put the first least active invoker first in leastIndexes leastIndexes[0] = i; // Reset totalWeight totalWeight = afterWarmup; // Record the weight the first least active invoker firstWeight = afterWarmup; // Each invoke has the same weight (only one invoker here) sameWeight = true; // If current invoker's active value equals with leaseActive, then accumulating. } else if (active == leastActive) { // Record the index of the least active invoker in leastIndexes order leastIndexes[leastCount++] = i; // Accumulate the total weight of the least active invoker totalWeight += afterWarmup; // If every invoker has the same weight? if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // Choose an invoker from all the least active invokers if (leastCount == 1) { // 若是隻有一個最小則直接返回 // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexes[0]); } if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on // totalWeight. // 若是權重不相同且權重大於0則按總權重數隨機 // 並肯定隨機值落在哪一個片段上(第一個相減爲負的值) int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
官方解釋:最少活躍調用數,相同活躍數的隨機,活躍數指調用先後計數差,使慢的機器收到更少。
額,有點難以理解的樣子。
/** * ConsistentHashLoadBalance */ public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; /** * Hash nodes name * 經過 指定 hash.nodes=0,1,2... 能夠自定義參與一致性hash的參數列表 */ public static final String HASH_NODES = "hash.nodes"; /** * Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments"; // 使用selector 保存某個固定狀態時 invoker 的映射關係 private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); // 第一次進入或者 identityHashCode 不相等時(invoker環境發生了變化) if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector<T> { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { // 存放虛擬節點 this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // hash.nodes 默認是 160 this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker<T> select(Invocation invocation) { // 取出參與一致性hash計算的參數信息 String key = toKey(invocation.getArguments()); byte[] digest = md5(key); // 根據hash值選取 invoker return selectForKey(hash(digest, 0)); } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); // 按照指定的參與hash的參數,調用 toString() 方法,獲得參數標識信息 for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } private Invoker<T> selectForKey(long hash) { // ceilingEntry Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); // 若是沒有找到,取第一個值 if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); } } }
主要就是取第多少個參數,參與hashCode的計算,而後按照一致性hash算法,定位invoker. 它使用一個 TreeMap 來保存一致性哈希虛擬節點,hashCode->invoker形式存儲,使用 ceilingEntry(hash) 的方式獲取最近的虛擬節點(自然的一致性hash應用)。
值得一提的是,一致性哈希負載均衡策略是惟一一個沒有使用到權重項的負載均衡算法。而前面幾種均衡算法,多少都與權重相關。該負載均衡的應用場景嘛,還得本身找了。
dubbo實現了4種負載均衡策略,是否就只能是這麼多呢?一個好的設計是不會的,對擴展開放。基於dubbo的SPI機制,能夠自行實現任意的負載均衡策略!
1. 實現 LoadBalance 接口;
2. 添加資源文件 添加文件:src/main/resource/META-INF/dubbo/com.alibaba.dubbo.rpc.cluster.LoadBalance;
demo=my=com.demo.dubbo.DemoLoadBalance
3. 設置負載均衡策略爲本身實現的demo;
固然不能。負載均衡只是保證了在發生調用的時候,能夠將流量按照既定規定均攤到各機器上。然而,均攤是否是最合理的方式倒是不必定的。另外,若是發生異常,這次負載均衡就失敗了,從而成功躲過了高可用。
事實上,dubbo用三種方式協同保證了高可用:
1. 負載均衡
2. 集羣容錯
3. 服務路由
如下故事描述摘自官網:
這3個概念容易混淆。他們都描述了怎麼從多個 Provider 中選擇一個來進行調用。那他們到底有什麼區別呢?下面我來舉一個簡單的例子,把這幾個概念闡述清楚吧。 有一個Dubbo的用戶服務,在北京部署了10個,在上海部署了20個。一個杭州的服務消費方發起了一次調用,而後發生瞭如下的事情: 根據配置的路由規則,若是杭州發起的調用,會路由到比較近的上海的20個 Provider。 根據配置的隨機負載均衡策略,在20個 Provider 中隨機選擇了一個來調用,假設隨機到了第7個 Provider。 結果調用第7個 Provider 失敗了。 根據配置的Failover集羣容錯模式,重試其餘服務器。 重試了第13個 Provider,調用成功。 上面的第1,2,4步驟就分別對應了路由,負載均衡和集羣容錯。 Dubbo中,先經過路由,從多個 Provider 中按照路由規則,選出一個子集。再根據負載均衡從子集中選出一個 Provider 進行本次調用。若是調用失敗了,根據集羣容錯策略,進行重試或定時重發或快速失敗等。 能夠看到Dubbo中的路由,負載均衡和集羣容錯發生在一次RPC調用的不一樣階段。最早是路由,而後是負載均衡,最後是集羣容錯。