前面的文章,已經單獨對服務發現(Directory、RegistryDirectory)、路由機制(Router)、負載均衡機制( LoadBalance ),本節將重點分析集羣容錯機制 ( AbstractClusterInvoker), AbstractClusterInvoker 就是將上述機制融合在一塊兒,整個集羣容錯中,上述組件扮演的角色見下圖所示,本文將重點分析 AbstractClusterInvoker 是如何融合這些組件的。 AbstractClusterInvoker#invoke算法
@Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; List<invoker<t>> invokers = list(invocation); // @1 if (invokers != null && !invokers.isEmpty()) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); // @2 } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); // @3 }
代碼@1:根據調用上下文,獲取服務提供者列表,服務提供者從Directory中獲取。緩存
protected List<invoker<t>> list(Invocation invocation) throws RpcException { List<invoker<t>> invokers = directory.list(invocation); return invokers; }
最終會調用RegistryDirecotry的list方法,該方法的服務提供者是當該消費者訂閱的服務的服務提供者列表發送變化後,會在註冊中心產生事件,而後通知消費者更新服務提供者列表(本地緩存)。須要注意的是RegistryDirecotry在返回Invoker以前,已經使用Router進行了一次篩選,具體實如今RegistryDirectory#notify方法時。服務器
代碼@2:根據SPI機制,獲取負載均衡算法的實現類,根據< dubbo:consumer loadbalance=""/>、< dubbo:reference loadbalance=""/>等標籤的配置值,默認爲random,加權隨機算法。架構
代碼@3:根據調用上下文,服務提供者列表,負載均衡算法選擇一服務提供者,具體代碼由AbstractClusterInvoker的各個子類實現。併發
Dubbo目前支持的集羣容錯策略在中/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.cluster.Cluster定義,具體內容以下:app
mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster failfast=com.alibaba.dubbo.rpc.cluster.support.FailfastCluster failsafe=com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster failback=com.alibaba.dubbo.rpc.cluster.support.FailbackCluster forking=com.alibaba.dubbo.rpc.cluster.support.ForkingCluster available=com.alibaba.dubbo.rpc.cluster.support.AvailableCluster mergeable=com.alibaba.dubbo.rpc.cluster.support.MergeableCluster broadcast=com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster
上述各類集羣策略,對應的執行器爲Cluser+Invoker,例如FailoverCluster對應的Invoker爲:FailoverClusterInvoker。負載均衡
在講解各類集羣容錯策略以前,咱們首先關注一下AbstractClusterInvoker具體從服務提供者中按照不一樣的負載均衡算法選取服務提供者的算法。dom
AbstractClusterInvoker#select異步
protected Invoker<t> select(LoadBalance loadbalance, Invocation invocation, List<invoker<t>> invokers, List<invoker<t>> selected) throws RpcException { // @1 if (invokers == null || invokers.isEmpty()) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); // @2 { //ignore overloaded method if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore concurrency problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } Invoker<t> invoker = doSelect(loadbalance, invocation, invokers, selected); // @3 if (sticky) { stickyInvoker = invoker; } return invoker; }
代碼@1:參數說明分佈式
代碼@3:執行doSelect選擇。
private Invoker<t> doSelect(LoadBalance loadbalance, Invocation invocation, List<invoker<t>> invokers, List<invoker<t>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) // @1 return invokers.get(0); // If we only have two invokers, use round-robin instead. if (invokers.size() == 2 && selected != null && !selected.isEmpty()) { // @2 return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); } if (loadbalance == null) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } Invoker<t> invoker = loadbalance.select(invokers, getUrl(), invocation); // @3 //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker<t> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); // @4 if (rinvoker != null) { invoker = rinvoker; } else { //Check the index of current selected invoker, if it's not the last one, choose the one at index+1. int index = invokers.indexOf(invoker); try { //Avoid collision invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker; } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; }
代碼@1:若是可選Invoker只有一個的話,直接返回該Invoker。
代碼@2:若是隻有兩個Invoker,而且其中一個已被選擇,返回另一個未選擇的Invoker。
代碼@3:調用loadBalance負載均衡算法,選擇一個服務提供者。
代碼@4:若是選擇的Invoker已被選擇,則從新選擇,這裏有一個疑問,爲何不在選以前,先過濾掉已被選的Invoker。
從服務提供者列表中選擇一個服務提供者算法就介紹到這裏,接下來將一一分析Dubbo提供的集羣容錯方式。
策略:失敗後自動選擇其餘服務提供者進行重試,重試次數由retries屬性設置,< dubbo:reference retries = "2"/>設置,默認爲2,表明重試2次,最多執行3次。
FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { List<invoker<t>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; // @1 if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<invoker<t>> invoked = new ArrayList<invoker<t>>(copyinvokers.size()); // invoked invokers. Set<string> providers = new HashSet<string>(len); // @2 for (int i = 0; i < len; i++) { // @3 //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { // @4 checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } Invoker<t> invoker = select(loadbalance, invocation, copyinvokers, invoked); // @5 invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); // @6 if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); // @7 } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
代碼@1:首先校驗服務提供者列表,若是爲空,則拋出RpcException,提示沒有可用的服務提供者。
代碼@2:構建Set< Stirng> providers,主要用來已調用服務提供者的地址,若是本次調用失敗,將在日誌信息中打印已調用的服務提供者信息。
代碼@3,循環執行次數,等於retries + 1 次。
代碼@4:若是i>0,表示服務調用,在重試,此時須要從新調用Directory#list方法,獲取最小的服務提供者列表。
代碼@5:根據負載均衡算法,選擇Invoker,後續詳細分析。
代碼@6:根據負載算法,路由算法從服務提供者列表選一個服務提供者,發起RPC調用。
代碼@7:將本次服務提供者的地址添加到providers集合中,若是屢次重試後,沒法完成正常的調用,將在錯誤日誌中包含這些信息。
策略:選擇集羣第一個可用的服務提供者。 缺點:至關於服務的主備,但同時只有一個服務提供者承載流量,並無使用集羣的負載均衡機制。 AvailableClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker<t> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); }
遍歷服務提供者列表,選擇第一個可用服務提供者,而後執行RPC服務調用,若是調用失敗,則失敗。
策略:廣播調用,將調用全部服務提供者,一個服務調用者失敗,並不會熔斷,而且一個服務提供者調用失敗,整個調用認爲失敗。 場景:刷新緩存。
public Result doInvoke(final Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); // @1 RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; for (Invoker<t> invoker : invokers) { // @2 try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { // @3 throw exception; } return result; }
代碼@1:檢測服務提供者列表,若是爲空,則拋出沒有服務提供的異常。
代碼@2:遍歷服務提供者列表,依次調用服務提供者的invoker,每一個服務調用用try catch語句包裹,當服務調用發生異常時,記錄異常信息,但並不當即返回,廣播模式,每一個服務提供者調用是異步仍是同步,取決服務調用的配置,默認是同步調用。
代碼@3:只要其中一個服務調用發送一次,將拋出異常 信息,異常信息被封裝爲RpcException。
策略:調用失敗後,返回成功,但會在後臺定時重試,重試次數(反覆) 場景:一般用於消息通知,但消費者重啓後,重試任務丟失。
FailbackClusterInvoker#doInvoke
protected Result doInvoke(Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); // @1 Invoker<t> invoker = select(loadbalance, invocation, invokers, null); // @2 return invoker.invoke(invocation); // @3 } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); addFailed(invocation, this); // @4 return new RpcResult(); // ignore } }
代碼@1:校驗服務提供者列表,若是爲空,則拋出沒有服務提供者錯誤。
代碼@2:根據負載均衡機制,選擇一個服務提供者。
代碼@3:發起遠程服務調用,若是出現異常,調用addFailed方法,添加劇試任務,而後返回給調用方成功。
接下來看一下addFailed方法。
FailbackClusterInvoker#addFailed
private void addFailed(Invocation invocation, AbstractClusterInvoker<!--?--> router) { // @1 if (retryFuture == null) { // @2 synchronized (this) { if (retryFuture == null) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { // @3 @Override public void run() { // collect retry statistics try { retryFailed(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } failed.put(invocation, router); // @4 }
代碼@1:Invocation invocation:調用上下文;AbstractClusterInvoker< ?> router:調用集羣策略。
代碼@2:若是retryFuture(ScheduledFuture< ?> retryFuture)爲空,則加鎖建立一個定時調度任務,任務以每隔5s的頻率調用retryFailed方法。
代碼@3:添加劇試任務(ConcurrentMap< Invocation, AbstractClusterInvoker< ?>> failed)。想必retryFailed方法就是遍歷failed,一個一個重複調用,若是調用成功則移除,調用不成功,繼續放入。
FailbackClusterInvoker#retryFailed
void retryFailed() { if (failed.size() == 0) { return; } for (Map.Entry<invocation, abstractclusterinvoker<?>> entry : new HashMap<invocation, abstractclusterinvoker<?>>( // @1 failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<!--?--> invoker = entry.getValue(); try { invoker.invoke(invocation); // @2 failed.remove(invocation); // @3 } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } }
代碼@1:遍歷待重試列表,而後發起遠程調用,若是調用成功,則從集合中移除,若是隻選失敗,並不會從待重試列表中移除,也就是在消費端不重啓的狀況下,會一直重複調用,直到成功。
策略:快速失敗,服務調用失敗後立馬拋出異常,不進行重試。 場景:是否修改類服務(未實行冪等的服務調用)
public Result doInvoke(Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); // @1 Invoker<t> invoker = select(loadbalance, invocation, invokers, null); // @2 try { return invoker.invoke(invocation); // @3 } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); // @4 } }
代碼@1:檢查服務提供者,若是服務提供者列表爲空,拋出沒有服務提供者錯誤。
代碼@2:根據負載算法選擇一個服務提供者。
代碼@3:發起RPC服務調用。
代碼@4:若是服務調用異常,拋出異常,打印服務消費者,服務提供者信息。
策略:服務調用失敗後,只打印錯誤日誌,而後返回服務調用成功。 場景:調用審計,日誌類服務接口。
FailsafeClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); // @1 Invoker<t> invoker = select(loadbalance, invocation, invokers, null); // @2 return invoker.invoke(invocation); // @3 } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); // ignore } }
代碼@1:檢查服務提供者,若是服務提供者列表爲空,拋出沒有服務提供者錯誤。
代碼@2:根據負載算法選擇一個服務提供者。
代碼@3:發起RPC服務調用,若是出現異常,記錄錯誤堆棧信息,並返回成功。
策略:並行調用多個服務提供者,當一個服務提供者返回成功,則返回成功。 場景:實時性要求比較高的場景,但浪費服務器資源,一般能夠經過forks參數設置併發調用度。
ForkingClusterInvoker#doInvoke
public Result doInvoke(final Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); // @1 final List<invoker<t>> selected; final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); // @2 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<invoker<t>>(); for (int i = 0; i < forks; i++) { // TODO. Add some comment here, refer chinese version for more details. Invoker<t> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) {//Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<object> ref = new LinkedBlockingQueue<object>(); for (final Invoker<t> invoker : selected) { // @3 executor.execute(new Runnable() { @Override public void run() { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } } } }); } try { Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); // @4 if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } }
代碼@1:檢查服務提供者,若是服務提供者列表爲空,拋出沒有服務提供者錯誤。
代碼@2:獲取forks屬性,貌似只能經過在< dubbo:reference />用< dubbo:parameter key="forks" value=""/>來設置forks,其默認值爲2,若是forks值大於服務提供者的數量,則將調用全部服務提供者,若是forks值小於服務提供者的數量,則使用負載均衡算法,選擇forks個服務提供者。
代碼@3:依次異步向服務提供者發起RPC調用,並將結果添加到BlockingQueue< Object> ref,若是服務調用發送錯誤,而且發生錯誤的個數大於等於本次調用的個數,則將錯誤信息放入BlockingQueue< Object> ref,不然,將錯誤數增長1。
代碼@4:Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS),從該隊列中獲取結果,若是隊列未空,則會阻塞等待,直到超時,當有一個調用成功後,將返回,忽略其餘調用結果。
本文重點分析了Dubbo集羣容錯機制,路由發現、路由算法、負載均衡等是如何共同協做完成Dubbo的服務調用,並詳細分析了Dubbo各類集羣策略,例如failover、failfast、failsafe、failback、forking、available等實現細節。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。 </t></object></object></t></invoker<t></invoker<t></invoker<t></t></invoker<t></t></invoker<t></invocation,></invocation,></t></invoker<t></t></invoker<t></t></invoker<t></t></string></string></invoker<t></invoker<t></invoker<t></invoker<t></t></t></invoker<t></invoker<t></t></t></invoker<t></invoker<t></t></invoker<t></invoker<t></invoker<t>