dubbo集羣容錯機制代碼分析1

dubbo版本2.5.3java

咱們這裏以zookeeper做爲註冊中心爲例說明。spring

這裏說的集羣,能夠理解爲,一個接口服務對應有多個提供者。
在dubbo的調用方(reference)看來,每一個提供方(service)對應一個invoker。
關於一個調用方對應多個提供方的場景大概包括三大類:
1,者調者訂閱一個註冊中心,此註冊中心,同一個服務有多個提供者(以不一樣機器,端口,版本等發佈的服務)
2,者調者訂閱多個註冊中心的服務,每一個註冊中心都有引用的服務的提供者(一個或者多個)。
3,調用方,經過url配置,提供多個提供者地址,多個地址以分號隔開。
1,2是同一類場景,3是直連場景,這兩中場景是互斥,也就是用戶配置了reference的url屬性,dubbo就不會再訂閱註冊中心。負載均衡

下面經過代碼分析下,這三種場景的集羣容錯
客戶端訂閱能夠看ReferenceConfig類的createProxy方法裏如下代碼jvm

if (isJvmRefer) {//引用本地服務,只返回一個exporter不會有集羣。
			    URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
			    invoker = refprotocol.refer(interfaceClass, url);
			    if (logger.isInfoEnabled()) {
				logger.info("Using injvm service " + interfaceClass.getName());
			    }
		} else {//應用遠程服務
		    if (url != null && url.length() > 0) { // 用戶指定URL,指定的URL多是對點對直連地址,也多是註冊中心URL
			String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
			if (us != null && us.length > 0) {//用戶自定多個直連服務
			    for (String u : us) {
				URL url = URL.valueOf(u);
				if (url.getPath() == null || url.getPath().length() == 0) {
				    url = url.setPath(interfaceName);
				}
				if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
				    urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
				} else {
				    urls.add(ClusterUtils.mergeUrl(url, map));
				}
			    }
			}
		    } else { // 經過註冊中心配置拼裝URL
			List<URL> us = loadRegistries(false);
			if (us != null && us.size() > 0) {//用戶自定多個註冊中心
				for (URL u : us) {
				    URL monitorUrl = loadMonitor(u);
				if (monitorUrl != null) {
				    map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
				}
				    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
			    }
			}
			if (urls == null || urls.size() == 0) {
			    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
			}
		    }

		    if (urls.size() == 1) {//調用方訂閱一個註冊中心,或者自定一個直連服務(直連的這種狀況不考慮集羣,只有一個提供者)
                        //一個註冊中心時,這個refprotocol自適應後是RegistryProtocol
                        //一個直連者時,這個refprotocol自適應後是DubboProtocol(若是是duboo協議)
			invoker = refprotocol.refer(interfaceClass, urls.get(0));
		    } else {//調用方訂閱多個註冊中心,或者多個直連地址
			List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
			URL registryURL = null;
			for (URL url : urls) {
			    invokers.add(refprotocol.refer(interfaceClass, url));
			    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
				registryURL = url; // 用了最後一個registry url
			    }
			}
			if (registryURL != null) { // 有 註冊中心協議的URL
			    // 對有註冊中心的Cluster 只用 AvailableCluster 容錯策略
			    // 對於訂閱多個註冊中心的,這裏其實有兩層的容錯機制,只是第一層,被強制設置爲AvailableCluster 容錯策略
			    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
			    invoker = cluster.join(new StaticDirectory(u, invokers));//cluster是經過spi機制注入的自適應adaptive實現,場景2執行邏輯
			}  else { // 不是 註冊中心的URL
			    invoker = cluster.join(new StaticDirectory(invokers));//cluster是經過spi機制注入的自適應adaptive實現,場景3執行邏輯
			}
		    }
        }

經過代碼咱們看到,對於場景1,引用一個註冊中心的場景,會執行
invoker = refprotocol.refer(interfaceClass, urls.get(0));代碼ide

經過代碼調試,能夠發現,refprotocol.refer會調用RegistryProtocol的refer方法最終進入doRefer方法,
會執行以下代碼oop

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);//cluster是經過spi機制注入的自適應adaptive實現。此時的directory是RegistryDirectory類型
    }

三種場景實際上,都執行了dubbo SPI機制生成的adaptive的Cluster實現代碼
經過dubbo打印日誌,能夠看到adaptive的Cluster實現代碼以下url

package com.alibaba.dubbo.rpc.cluster;

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
    public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover");//能夠看到,經過url裏的cluster鍵值獲取容錯機制,url中沒有指定cluster鍵值,dubbo默認是用failover集羣容錯策略
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
        com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
}

到此,咱們能夠看到對於多註冊中心的,第一層容錯機制被強制設置爲available,
而後第二層,就和單個註冊中心多服務提供者集羣容錯機制同樣了,即默認爲failover容錯機制。這裏看下這兩種容錯機制的代碼實現
1,failover容錯機制
經過spi機制咱們找到Cluster failover擴展FailoverCluster類是這樣實現的.net

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }

}

接着看FailoverClusterInvoker類,先看它的父類AbstractClusterInvoker,這個類實現了Invoker接口:調試

public Result invoke(final Invocation invocation) throws RpcException {

        checkWhetherDestroyed();

        LoadBalance loadbalance;
        //這裏是獲取負載均衡策略
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);//回調子類的doInvoke方法
}

而後再回到子類看doInvoke方法:日誌

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    	List<Invoker<T>> copyinvokers = invokers;
    	checkInvokers(copyinvokers, invocation);
	//獲取重試次數  +1是由於第一次調用不算重試次數
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
        	//重試時,進行從新選擇,避免重試時invoker列表已發生變化.
        	//注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變
        	if (i > 0) {
        		checkWhetherDestroyed();
        		copyinvokers = list(invocation);
        		//從新檢查一下
        		checkInvokers(copyinvokers, invocation);
        	}
		//這裏是經過負載均衡策略獲取下一個服務提供者
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers 
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.若是是業務異常,則再也不重試,直接拋出異常
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());//記錄調用失敗信息
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

}

經過代碼能夠看到,
failvoer集羣容錯機制,總的邏輯是,以方法重複次數爲限制,每次調用若是失敗,
就利用負責均衡策略獲取下一個提供者(invoker),直到調用成功,或者最後方法超限,拋出異常,
其中中間若是有業務異常,則再也不重試,直接拋出異常。

2,available集羣容錯機制,咱們找到AvailableCluster類,它只有一個方法

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        //它沒經過擴展AbstractClusterInvoker抽象類,而是直接實現它,它沒用負載均衡策略,而是簡單選擇一個可達的服務
        return new AbstractClusterInvoker<T>(directory) {
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {//獲取第一個,可達的服務提供方,
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };
        
    }

經過代碼能夠看到,
available集羣容錯機制,則是簡單的調用第一個可到達的服務。都不可達是,拋出異常

最後
dubbo自己還有其餘集羣容錯的擴展實現,這裏http://www.javashuo.com/article/p-awxzbqol-ce.html

相關文章
相關標籤/搜索