在集羣調用失敗時,Dubbo 提供了多種容錯方案,缺省爲 failover 重試。html
Service代理對象初始化環節,涉及到Cluster的初始化,而且調用過程也涉及到Cluster組件的集羣容錯,接下來將詳細講解Dubbo是如何利用Cluster進行容錯處理 及Cluster的種類。java
首先,咱們看下代理對象初始化過程當中 Cluster的組裝過程。git
服務引用過程與發佈過程的調用鏈很是相似一樣也是api
/**
* Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
* =>
* Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
*/緩存
服務發佈 能夠參考:《Dubbo服務發佈之服務暴露&心跳機制&服務註冊》安全
咱們直接來看核心代碼:RegistryProtocol.refer 方法服務器
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 將param中的registry屬性,設置爲Protocol 刪除param中的registry url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 鏈接註冊中心,監聽reconnect狀態改變 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 得到服務引用配置參數集合 group="a,b" or group="*" refer中引用遠程服務的信息 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); // 分組處理方式 if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } // cluster , registry 註冊中心 , type 接口Class類型 , url 註冊中心信息 +refer 引用信息 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 建立 RegistryDirectory 對象,並設置註冊中心 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // 建立訂閱 URL // all attributes of REFER_KEY URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { // 向註冊中心註冊本身(服務消費者) // category = comsumers , side = consumer // registry註冊 /dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F10.8.0.49%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6496%26side%3Dconsumer%26timestamp%3D1533729758117 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 向註冊中心訂閱服務提供者 // 訂閱信息 category 設置爲providers,configurators,routers 進行訂閱 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 建立 Invoker 對象 基於 Directory ,建立 Invoker 對象,實現統1、透明的 Invoker 調用過程。 return cluster.join(directory); }
doRefer方法網絡
接下來咱們看下,RegistryProtocol 是如何實現 cluster的自適配對象(Cluster$Adaptive)和RegistryDirectory 生成Invoker對象的過程。app
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster { public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("cluster", "failover"); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); return extension.join(arg0); } }
默認獲取的是failover對應的通過MockClusterWrapper裝飾器裝飾的 FailoverCluster對象負載均衡
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
實際上FailoverCluster.join方法實現了 Cluster 到 invoker的轉換過程。
FailoverCluster 與 FailoverClusterInvoker 之間是對應的。 其中 FailoverCluster 做用是: 失敗轉移,當出現失敗,重試其它服務器,一般用於讀操做,但重試會帶來更長延遲。
咱們來看Cluster 和 Invoker 的集成結構圖:
MockClusterInvoker mock用於非業務異常時的服務降級 ,非業務異常是指 網絡抖動,超時,或者沒有服務提供者時等異常條件下 ,服務的臨時返回方案。 業務異常並不會走mock 的替代返回。
例如:某商品詳情訪問接口服務端異常,並不會走咱們的mock邏輯 , 若是網絡抖動或者沒有服務提供者這種狀況會走 mock邏輯,例如返回"商品禁止訪問,請稍後重試。"
直接上調用方法
public Result invoke(Invocation invocation) throws RpcException { Result result = null; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { //1. no mock result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock 強制走moke邏輯 result = doMockInvoke(invocation, null); } else { //fail-mock 異常moke邏輯 try { result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; }
mock配置方式有如下兩種:
1.遠程調用方配置mock參數。配置方式:
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" check="false" mock="return zhangsan"/>
說明:配置了mock參數以後,好比在調用服務的時候出現網絡斷連,或者沒有服務啓動,那麼會把這個mock設置的值返回,也就是zhangsan
經過這種方式就能夠避免由於服務調用不到而帶來的程序不可用問題。
2. 經過制定業務處理類來進行返回。 配置方式:
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" check="false" mock="true"/>
當服務調用過程當中,網絡異常了,它會去自定義的mock業務處理類進行業務處理。所以還須要建立自定義mock業務處理類:
規則:在接口目錄下建立自定義mock業務處理類 , 同時實現Service接口 。 命名規則符合:interfaceService + Mock ,而且有無參構造方法
如:
public class DemoServiceMock implements DemoService { @Override public String sayHello(String name) { return "張三李四"; } }
配置完成後,若是出現非業務異常,則會調用自定義降級業務。
因爲MockClusterInvoker 是 ClusterInvoker 的裝飾器,因此接下來還要執行 ClusterInvoker 的 invoke方法。接下來以 FailoverClusterInvoker 爲例進行ClusterInvoker講解。
可經過 retries="2"
來設置重試次數(不含第一次)。
重試次數配置以下:
<dubbo:service retries="2" /> 或 <dubbo:reference retries="2" /> 或 <dubbo:reference> <dubbo:method name="findFoo" retries="2" /> </dubbo:reference>
FailoverClusterInvoker 源碼以下:
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; // 得到全部服務提供者 Invoker 集合 List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//異步的話,須要添加id return doInvoke(invocation, invokers, loadbalance); } public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重試時,進行從新選擇,避免重試時invoker列表已發生變化. //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); //從新檢查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
咱們看到若是僅是集羣容錯 它的實現原理比較簡單:
除了FailoverCluster 失敗轉移的集羣容錯功能外,還有其餘:
快速失敗,只發起一次調用,失敗當即報錯。一般用於非冪等性的寫操做,好比新增記錄。
失敗安全,出現異常時,直接忽略。一般用於寫入審計日誌等操做。
失敗自動恢復,後臺記錄失敗請求,定時重發。一般用於消息通知操做。
並行調用多個服務器,只要一個成功即返回。一般用於實時性要求較高的讀操做,但須要浪費更多服務資源。可經過 forks="2"
來設置最大並行數。
廣播調用全部提供者,逐個調用,任意一臺報錯則報錯。一般用於通知全部提供者更新緩存或日誌等本地資源信息。
按照如下示例在服務提供方和消費方配置集羣模式
<dubbo:service cluster="failsafe" />
或
<dubbo:reference cluster="failsafe" />