dubbo版本2.5.3java
咱們這裏以zookeeper做爲註冊中心爲例說明。spring
這裏說的集羣,能夠理解爲,一個接口服務對應有多個提供者。
在dubbo的調用方(reference)看來,每一個提供方(service)對應一個invoker。
關於一個調用方對應多個提供方的場景大概包括三大類:
1,者調者訂閱一個註冊中心,此註冊中心,同一個服務有多個提供者(以不一樣機器,端口,版本等發佈的服務)
2,者調者訂閱多個註冊中心的服務,每一個註冊中心都有引用的服務的提供者(一個或者多個)。
3,調用方,經過url配置,提供多個提供者地址,多個地址以分號隔開。
1,2是同一類場景,3是直連場景,這兩中場景是互斥,也就是用戶配置了reference的url屬性,dubbo就不會再訂閱註冊中心。負載均衡
下面經過代碼分析下,這三種場景的集羣容錯
客戶端訂閱能夠看ReferenceConfig類的createProxy方法裏如下代碼jvm
if (isJvmRefer) {//引用本地服務,只返回一個exporter不會有集羣。 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else {//應用遠程服務 if (url != null && url.length() > 0) { // 用戶指定URL,指定的URL多是對點對直連地址,也多是註冊中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) {//用戶自定多個直連服務 for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 經過註冊中心配置拼裝URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) {//用戶自定多個註冊中心 for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) {//調用方訂閱一個註冊中心,或者自定一個直連服務(直連的這種狀況不考慮集羣,只有一個提供者) //一個註冊中心時,這個refprotocol自適應後是RegistryProtocol //一個直連者時,這個refprotocol自適應後是DubboProtocol(若是是duboo協議) invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else {//調用方訂閱多個註冊中心,或者多個直連地址 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最後一個registry url } } if (registryURL != null) { // 有 註冊中心協議的URL // 對有註冊中心的Cluster 只用 AvailableCluster 容錯策略 // 對於訂閱多個註冊中心的,這裏其實有兩層的容錯機制,只是第一層,被強制設置爲AvailableCluster 容錯策略 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));//cluster是經過spi機制注入的自適應adaptive實現,場景2執行邏輯 } else { // 不是 註冊中心的URL invoker = cluster.join(new StaticDirectory(invokers));//cluster是經過spi機制注入的自適應adaptive實現,場景3執行邏輯 } } }
經過代碼咱們看到,對於場景1,引用一個註冊中心的場景,會執行
invoker = refprotocol.refer(interfaceClass, urls.get(0));代碼ide
經過代碼調試,能夠發現,refprotocol.refer會調用RegistryProtocol的refer方法最終進入doRefer方法,
會執行以下代碼oop
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); return cluster.join(directory);//cluster是經過spi機制注入的自適應adaptive實現。此時的directory是RegistryDirectory類型 }
三種場景實際上,都執行了dubbo SPI機制生成的adaptive的Cluster實現代碼
經過dubbo打印日誌,能夠看到adaptive的Cluster實現代碼以下url
package com.alibaba.dubbo.rpc.cluster; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster { public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("cluster", "failover");//能夠看到,經過url裏的cluster鍵值獲取容錯機制,url中沒有指定cluster鍵值,dubbo默認是用failover集羣容錯策略 if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); return extension.join(arg0); } }
到此,咱們能夠看到對於多註冊中心的,第一層容錯機制被強制設置爲available,
而後第二層,就和單個註冊中心多服務提供者集羣容錯機制同樣了,即默認爲failover容錯機制。這裏看下這兩種容錯機制的代碼實現
1,failover容錯機制
經過spi機制咱們找到Cluster failover擴展FailoverCluster類是這樣實現的.net
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
接着看FailoverClusterInvoker類,先看它的父類AbstractClusterInvoker,這個類實現了Invoker接口:調試
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; //這裏是獲取負載均衡策略 List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance);//回調子類的doInvoke方法 }
而後再回到子類看doInvoke方法:日誌
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @SuppressWarnings({ "unchecked", "rawtypes" }) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); //獲取重試次數 +1是由於第一次調用不算重試次數 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重試時,進行從新選擇,避免重試時invoker列表已發生變化. //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); //從新檢查一下 checkInvokers(copyinvokers, invocation); } //這裏是經過負載均衡策略獲取下一個服務提供者 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception.若是是業務異常,則再也不重試,直接拋出異常 throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress());//記錄調用失敗信息 } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } }
經過代碼能夠看到,
failvoer集羣容錯機制,總的邏輯是,以方法重複次數爲限制,每次調用若是失敗,
就利用負責均衡策略獲取下一個提供者(invoker),直到調用成功,或者最後方法超限,拋出異常,
其中中間若是有業務異常,則再也不重試,直接拋出異常。
2,available集羣容錯機制,咱們找到AvailableCluster類,它只有一個方法
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { //它沒經過擴展AbstractClusterInvoker抽象類,而是直接實現它,它沒用負載均衡策略,而是簡單選擇一個可達的服務 return new AbstractClusterInvoker<T>(directory) { public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) {//獲取第一個,可達的服務提供方, return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); } }; }
經過代碼能夠看到,
available集羣容錯機制,則是簡單的調用第一個可到達的服務。都不可達是,拋出異常
最後
dubbo自己還有其餘集羣容錯的擴展實現,這裏http://www.javashuo.com/article/p-awxzbqol-ce.html