本篇主要對dubbo集羣容錯進行剖析,主要下面幾個模塊html
<dubbo:service cluster="failsafe" /> 服務提供方
<dubbo:reference cluster="failsafe" /> 服務消費方
接口類 com.alibaba.dubbo.rpc.cluster.Cluster
node
1.AvailableCluster
獲取可用的調用。遍歷全部Invokers判斷Invoker.isAvalible,只要一個有爲true直接調用返回,無論成不成功算法
2.BroadcastCluster
廣播調用。遍歷全部Invokers, 逐個調用每一個調用catch住異常不影響其餘invoker調用api
3.FailbackCluster
失敗自動恢復, 對於invoker調用失敗, 後臺記錄失敗請求,任務定時重發, 一般用於通知數組
//FailbackClusterInvoker //記錄失敗的調用 private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>(); protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { //失敗後調用 addFailed addFailed(invocation, this); return new RpcResult(); // ignore } } private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { // 收集統計信息 try { retryFailed(); } catch (Throwable t) { // 防護性容錯 logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } failed.put(invocation, router); } //失敗的進行重試,重試成功後移除當前map void retryFailed() { if (failed.size() == 0) { return; } for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>( failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { invoker.invoke(invocation); failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } }
4.FailfastCluster
快速失敗,只發起一次調用,失敗當即保錯,一般用於非冪等性操做安全
5.FailoverCluster default
失敗轉移,當出現失敗,重試其它服務器,一般用於讀操做,但重試會帶來更長延遲
(1) 目錄服務directory.list(invocation) 列出方法的全部可調用服務
獲取重試次數,默認重試兩次服務器
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
(2) 根據LoadBalance負載策略選擇一個Invoker
(3) 執行invoker.invoke(invocation)調用
(4) 調用成功返回
調用失敗小於重試次數,從新執行從3)步驟開始執行,調用次數大於等於重試次數拋出調用失敗異常數據結構
6.FailsafeCluster
失敗安全,出現異常時,直接忽略,一般用於寫入審計日誌等操做。app
7.ForkingCluster
並行調用,只要一個成功即返回,一般用於實時性要求較高的操做,但須要浪費更多服務資源。負載均衡
注:
還有 MergeableCluster 和 MockClusterWrapper策略,可是我的沒有用過因此就不說了
靜態目錄服務, 它的全部Invoker經過構造函數傳入, 服務消費方引用服務的時候, 服務對多註冊中心的引用,將Invokers集合直接傳入 StaticDirectory構造器
public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) { super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers); if (invokers == null || invokers.size() == 0) throw new IllegalArgumentException("invokers == null"); this.invokers = invokers; }
StaticDirectory的list方法直接返回全部invoker集合
@Override protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { return invokers; }
註冊目錄服務, 它的Invoker集合是從註冊中心獲取的, 它實現了NotifyListener接口實現了回調接口notify(List<Url>)。
好比消費方要調用某遠程服務,會向註冊中心訂閱這個服務的全部服務提供方,訂閱時和服務提供方數據有變更時回調消費方的NotifyListener服務的notify方法NotifyListener.notify(List<Url>) 回調接口傳入全部服務的提供方的url地址而後將urls轉化爲invokers, 也就是refer應用遠程服務到此時引用某個遠程服務的RegistryDirectory中有對這個遠程服務調用的全部invokers。
RegistryDirectory.list(invocation)就是根據服務調用方法獲取全部的遠程服務引用的invoker執行對象
dubbo路由功能貌似用的很少,目的主要是對已註冊的服務進行過濾,好比只能調用某些配置的服務,或者禁用某些服務。
dubbo-admin 後臺進行配置。
路由代碼入口
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (invokers == null || invokers.size() == 0) { return invokers; } try { if (!matchWhen(url, invocation)) { return invokers; } List<Invoker<T>> result = new ArrayList<Invoker<T>>(); if (thenCondition == null) { logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey()); return result; } .............................
按照dubbo腳本規則進行編寫,程序識別
default
隨機,按權重設置隨機機率。權重default=100
在一個截面上碰撞的機率高,但調用量越大分佈越均勻,並且按機率使用權重後也比較均勻,有利於動態調整提供者權重。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 總個數 int totalWeight = 0; // 總權重 boolean sameWeight = true; // 權重是否都同樣 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // 累計總權重 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; // 計算全部權重是否同樣 } } if (totalWeight > 0 && !sameWeight) { // 若是權重不相同且權重大於0則按總權重數隨機 int offset = random.nextInt(totalWeight); // 並肯定隨機值落在哪一個片段上 for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 若是權重相同或權重爲0則均等隨機 return invokers.get(random.nextInt(length)); }
算法含義
若是全部的服務權重都同樣,就採用總服務數進行隨機。若是權重不同,則按照權重出隨機數,而後用隨機數減去服務權重,結果爲負數則使用當前循環的服務。其實也就是一個機率性問題 每一個服務的機率就是 當前服務的權重/ 總服務權重
輪循,按公約後的權重設置輪循比率。
存在慢的提供者累積請求的問題,好比:第二臺機器很慢,但沒掛,當請求調到第二臺時就卡在那,長此以往,全部請求都卡在調到第二臺上。
該負載算法維護着一個方法調用順序計數
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
以方法名做爲key
輪循分爲 普通輪詢和加權輪詢。權重同樣時,採用取模運算普通輪詢,反之加權輪詢。
下面看下具體的實現
RoundRobinLoadBalance#doSelect
i.普通輪詢
AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } //獲取本次調用的服務器序號,並+1 int currentSequence = sequence.getAndIncrement(); //當前序號和服務總數取模 return invokers.get(currentSequence % length);
ii.加權輪詢
下面貼下核心實現代碼。注意幾個變量
weightSum
= 服務權重之和
invokerToWeightMap
= 權重>0的 invoker map
int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { // 權重不同 // mod < weightSum,下面for循環進行weight遞減,weight大的服務被調用的機率大 int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } }
能夠舉個例子
兩個服務 A 和 B,權重分別是1和2
那麼 mod=[0,1,2],通過上面的邏輯,調用機率是 A B B A B B A B B ..... 顯然B的機率更大一些
最少活躍調用數優先,活躍數指調用先後計數差。使慢的提供者收到更少請求,由於越慢的提供者的調用先後計數差會越大。
每一個服務有一個活躍計數器,咱們假若有A,B兩個提供者.計數均爲0.當A提供者開始處理請求,該計數+1,此時A還沒處理完,當處理完後則計數-1.而B請求接收到請求處理得很快.B處理完後A還沒處理完,因此此時A,B的計數爲1,0.那麼當有新的請求來的時候,就會選擇B提供者(B的活躍計數比A小).這就是文檔說的,使慢的提供者收到更少請求。
int leastCount = 0; // 相同最小活躍數的個數
int[] leastIndexs = new int[length]; // 相同最小活躍數的下標
i.最小活躍服務個數=1, 該服務優先
if (leastCount == 1) { // 若是隻有一個最小則直接返回 return invokers.get(leastIndexs[0]); }
ii.最小活躍服務個數>1, 最小活躍的服務按照權重隨機
if (!sameWeight && totalWeight > 0) { // 若是權重不相同且權重大於0則按總權重數隨機 int offsetWeight = random.nextInt(totalWeight); // 並肯定隨機值落在哪一個片段上 for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; //權重越大,offsetWeight越快減成負數 offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } }
iii. 最小活躍服務個數>1, 權重相同,服務個數隨機
// 若是權重相同或權重爲0則均等隨機 return invokers.get(leastIndexs[random.nextInt(leastCount)]);
<dubbo:parameter key="hash.arguments" value="0,1" />
<dubbo:parameter key="hash.nodes" value="320" />
<dubbo:reference id="demoService" interface="com.youzan.dubbo.api.DemoService" loadbalance="consistenthash"> <!--缺省只對第一個參數 Hash--> <dubbo:parameter key="hash.arguments" value="0,1" /> <!--缺省用 160 份虛擬節點,--> <dubbo:parameter key="hash.nodes" value="160" /> </dubbo:reference>
ConsistentHashLoadBalance爲使用該算法的服務維護了一個selectors
,
key=invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName()
eg: com.youzan.dubbo.api.DemoService.sayHello
#com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int identityHashCode = System.identityHashCode(invokers); //獲取該服務的ConsistentHashSelector,並跟進本次調用獲取對應invoker ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.getIdentityHashCode() != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); }
ConsistentHashSelector做爲ConsistentHashLoadBalance的內部類, 就是具體的一致性hash實現。
#com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance.ConsistentHashSelector //該服務的全部hash節點 private final TreeMap<Long, Invoker<T>> virtualInvokers; //虛擬節點數量 private final int replicaNumber; //該服務的惟一hashcode,經過System.identityHashCode(invokers)獲取 private final int identityHashCode;
public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { // 建立TreeMap 來保存結點 this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); // 生成調用結點HashCode this.identityHashCode = System.identityHashCode(invokers); // 獲取Url //dubbo://192.168.0.4:20880/com.youzan.dubbo.api.DemoService?anyhost=true&application=consumer-of-helloworld-app&check=false&class=com.youzan.dubbo.provider.DemoServiceImpl&dubbo=2.5.4&generic=false&hash.arguments=0,1&hash.nodes=160&interface=com.youzan.dubbo.api.DemoService&loadbalance=consistenthash&methods=sayHello&pid=32710&side=consumer×tamp=1527383363936 URL url = invokers.get(0).getUrl(); // 獲取所配置的結點數,如沒有設置則使用默認值160 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // 獲取須要進行hash的參數數組索引,默認對第一個參數進行hash String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i ++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 建立虛擬結點 // 對每一個invoker生成replicaNumber個虛擬結點,並存放於TreeMap中 for (Invoker<T> invoker : invokers) { for (int i = 0; i < replicaNumber / 4; i++) { // 根據md5算法爲每4個結點生成一個消息摘要,摘要長爲16字節128位。 byte[] digest = md5(invoker.getUrl().toFullString() + i); // 隨後將128位分爲4部分,0-31,32-63,64-95,95-128,並生成4個32位數,存於long中,long的高32位都爲0 // 並做爲虛擬結點的key。 for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } }
代碼若是看的不是很懂,也不用去深究了(我就沒看懂,瞻仰了網上大神的文章貼了帖註釋),你們能夠就粗略的認爲,這段代碼就是儘量的構建出散列均勻的服務hash表。
// 選擇invoker public Invoker<T> select(Invocation invocation) { // 根據調用參數來生成Key String key = toKey(invocation.getArguments()); // 根據這個參數生成消息摘要 byte[] digest = md5(key); //調用hash(digest, 0),將消息摘要轉換爲hashCode,這裏僅取0-31位來生成HashCode //調用sekectForKey方法選擇結點。 Invoker<T> invoker = sekectForKey(hash(digest, 0)); return invoker; } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); // 因爲hash.arguments沒有進行配置,由於只取方法的第1個參數做爲key for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } //根據hashCode選擇結點 private Invoker<T> sekectForKey(long hash) { Invoker<T> invoker; Long key = hash; // 若HashCode直接與某個虛擬結點的key同樣,則直接返回該結點 if (!virtualInvokers.containsKey(key)) { // 若不一致,找到一個比傳入的key大的第一個結點。 SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key); // 若不存在,那麼選擇treeMap中第一個結點 // 使用TreeMap的firstKey方法,來選擇最小上界。 if (tailMap.isEmpty()) { key = virtualInvokers.firstKey(); } else { // 若存在則返回 key = tailMap.firstKey(); } } invoker = virtualInvokers.get(key); return invoker; }
ConsistentHashSelector.virtualInvokers
這個東西就是咱們的服務hash節點,單純的從數據結構上的確看不到什麼環狀的存在,能夠先示意下,當前的數據結構
咱們的服務節點只是一個普通的 map數據存儲而已,如何造成環呢?其實所謂的環只是邏輯上的展示,ConsistentHashSelector.sekectForKey()
方法裏經過 TreeMap.tailMap()、TreeMap.tailMap().firstKey、TreeMap.tailMap().firstKey() 結合case實現了環狀邏輯。下面咱們畫圖說話。
第一步原始數據結構,咱們按照hash從小到大排列
A,B,C表示咱們提供的服務,改示意圖假設服務節點散列均勻
第二步選擇服務節點
i. 假設本地調用獲得的key=2120, 代碼邏輯(指ConsistentHashSelector.sekectForKey
)走到tailMap.firstKey()
那麼讀取到 3986
A服務
ii.假設本地調用獲得的key=9991, tailMap爲空,邏輯走到 virtualInvokers.firstKey()
回到起點
讀取到 1579 A服務
上述兩部狀況基本已經可以描述清楚節點的選擇邏輯,至於hash直接命中,那麼讀取對應的服務便可,無需多講。
最後環狀造成
上面兩部的介紹已經描述hash算法,那麼咱們所謂的環狀是怎麼一回事呢?其實也就是爲了方便更好的理解這個邏輯,咱們將線性的hash排列做爲環狀,而後hash的選擇按照順時針方向選擇節點(等價於上面hash比較大小)
節點選擇算法與上面等價,本圖主要用來示意,理想的hash環hash差距應該是等差,均勻的排列。
參考:
https://blog.csdn.net/column/details/learningdubbo.html?&page=1
https://blog.csdn.net/revivedsun/article/details/71022871
https://www.jianshu.com/p/53feb7f5f5d9