在集羣調用失敗時,Dubbo 提供了多種容錯方案,缺省爲 failover 重試。html
Service代理對象初始化環節,涉及到Cluster的初始化,而且調用過程也涉及到Cluster組件的集羣容錯,接下來將詳細講解Dubbo是如何利用Cluster進行容錯處理 及Cluster的種類。java
首先,咱們看下代理對象初始化過程當中 Cluster的組裝過程。git
* Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
* Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
服務發佈 能夠參考:《Dubbo服務發佈之服務暴露&心跳機制&服務註冊》安全
咱們直接來看核心代碼:RegistryProtocol.refer 方法服務器
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 將param中的registry屬性,設置爲Protocol 刪除param中的registry url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 鏈接註冊中心,監聽reconnect狀態改變 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 得到服務引用配置參數集合 group="a,b" or group="*" refer中引用遠程服務的信息 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); // 分組處理方式 if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } // cluster , registry 註冊中心 , type 接口Class類型 , url 註冊中心信息 +refer 引用信息 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 建立 RegistryDirectory 對象,並設置註冊中心 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // 建立訂閱 URL // all attributes of REFER_KEY URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { // 向註冊中心註冊本身(服務消費者) // category = comsumers , side = consumer // registry註冊 /dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F10.8.0.49%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6496%26side%3Dconsumer%26timestamp%3D1533729758117 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 向註冊中心訂閱服務提供者 // 訂閱信息 category 設置爲providers,configurators,routers 進行訂閱 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 建立 Invoker 對象 基於 Directory ,建立 Invoker 對象,實現統1、透明的 Invoker 調用過程。 return cluster.join(directory); }
接下來咱們看下,RegistryProtocol 是如何實現 cluster的自適配對象(Cluster$Adaptive)和RegistryDirectory 生成Invoker對象的過程。app
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster { public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("cluster", "failover"); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); return extension.join(arg0); } }
默認獲取的是failover對應的通過MockClusterWrapper裝飾器裝飾的 FailoverCluster對象負載均衡
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
實際上FailoverCluster.join方法實現了 Cluster 到 invoker的轉換過程。
FailoverCluster 與 FailoverClusterInvoker 之間是對應的。 其中 FailoverCluster 做用是: 失敗轉移,當出現失敗,重試其它服務器,一般用於讀操做,但重試會帶來更長延遲。
咱們來看Cluster 和 Invoker 的集成結構圖:
MockClusterInvoker mock用於非業務異常時的服務降級 ,非業務異常是指 網絡抖動,超時,或者沒有服務提供者時等異常條件下 ,服務的臨時返回方案。 業務異常並不會走mock 的替代返回。
例如:某商品詳情訪問接口服務端異常,並不會走咱們的mock邏輯 , 若是網絡抖動或者沒有服務提供者這種狀況會走 mock邏輯,例如返回"商品禁止訪問,請稍後重試。"
public Result invoke(Invocation invocation) throws RpcException { Result result = null; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { //1. no mock result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock 強制走moke邏輯 result = doMockInvoke(invocation, null); } else { //fail-mock 異常moke邏輯 try { result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; }
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" check="false" mock="return zhangsan"/>
2. 經過制定業務處理類來進行返回。 配置方式:
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" check="false" mock="true"/>
規則:在接口目錄下建立自定義mock業務處理類 , 同時實現Service接口 。 命名規則符合:interfaceService + Mock ,而且有無參構造方法
public class DemoServiceMock implements DemoService { @Override public String sayHello(String name) { return "張三李四"; } }
因爲MockClusterInvoker 是 ClusterInvoker 的裝飾器,因此接下來還要執行 ClusterInvoker 的 invoke方法。接下來以 FailoverClusterInvoker 爲例進行ClusterInvoker講解。
可經過 retries="2"
<dubbo:service retries="2" /> 或 <dubbo:reference retries="2" /> 或 <dubbo:reference> <dubbo:method name="findFoo" retries="2" /> </dubbo:reference>
FailoverClusterInvoker 源碼以下:
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; // 得到全部服務提供者 Invoker 集合 List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//異步的話,須要添加id return doInvoke(invocation, invokers, loadbalance); } public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重試時,進行從新選擇,避免重試時invoker列表已發生變化. //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); //從新檢查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
咱們看到若是僅是集羣容錯 它的實現原理比較簡單:
除了FailoverCluster 失敗轉移的集羣容錯功能外,還有其餘:
並行調用多個服務器,只要一個成功即返回。一般用於實時性要求較高的讀操做,但須要浪費更多服務資源。可經過 forks="2"
<dubbo:service cluster="failsafe" />
<dubbo:reference cluster="failsafe" />