Dubbo服務調用——Cluster組件(服務降級,容錯)

在集羣調用失敗時,Dubbo 提供了多種容錯方案,缺省爲 failover 重試。html

Service代理對象初始化環節,涉及到Cluster的初始化,而且調用過程也涉及到Cluster組件的集羣容錯,接下來將詳細講解Dubbo是如何利用Cluster進行容錯處理 及Cluster的種類。java

首先,咱們看下代理對象初始化過程當中 Cluster的組裝過程。git

服務引用過程與發佈過程的調用鏈很是相似一樣也是api

 /**
 * Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
 * =>
 * Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
 */緩存

服務發佈 能夠參考:《Dubbo服務發佈之服務暴露&心跳機制&服務註冊》安全

咱們直接來看核心代碼:RegistryProtocol.refer 方法服務器

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 將param中的registry屬性,設置爲Protocol 刪除param中的registry
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 鏈接註冊中心,監聽reconnect狀態改變
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // 得到服務引用配置參數集合 group="a,b" or group="*" refer中引用遠程服務的信息
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    String group = qs.get(Constants.GROUP_KEY);
    // 分組處理方式
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // cluster , registry 註冊中心 , type 接口Class類型 , url 註冊中心信息 +refer 引用信息
    return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 建立 RegistryDirectory 對象,並設置註冊中心
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // 建立訂閱 URL
    // all attributes of REFER_KEY
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())  && url.getParameter(Constants.REGISTER_KEY, true)) {
        // 向註冊中心註冊本身(服務消費者)
        //  category = comsumers , side = consumer
        // registry註冊 /dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F10.8.0.49%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6496%26side%3Dconsumer%26timestamp%3D1533729758117
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }

    // 向註冊中心訂閱服務提供者
    // 訂閱信息 category 設置爲providers,configurators,routers  進行訂閱
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    // 建立 Invoker 對象  基於 Directory ,建立 Invoker 對象,實現統1、透明的 Invoker 調用過程。
    return cluster.join(directory);
}
  1. refer方法中首先鏈接註冊中心並監聽reconnect狀態,同服務發佈
  2. 獲取遠程引用refer指定的group數據。若是有group參數,則獲取Cluster的mergeable擴展 , 並執行doRefer方法
  3. 若是未指定group數據,則執行doRefer方法時傳入Cluster$Adapive(Cluster的自適應對象)

doRefer方法網絡

  1. 組裝動態的Directory組件 RegistryDirectory,爲何是動態的還有什麼其餘的實現,咱們後面講解。
  2. 建立訂閱 URL
  3. 向註冊中心註冊服務消費者信息
  4. 訂閱服務 providers,configuragors,routers 信息服務治理相關。 這裏不作贅述(後面講解)。
  5. 最後經過Directory,建立Invoker對象,實現統1、透明的調用過程。

接下來咱們看下,RegistryProtocol 是如何實現 cluster的自適配對象(Cluster$Adaptive)和RegistryDirectory 生成Invoker對象的過程。app

public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
	public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
		if (arg0 == null)
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
		if (arg0.getUrl() == null)
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
		com.alibaba.dubbo.common.URL url = arg0.getUrl();
		String extName = url.getParameter("cluster", "failover");
		if (extName == null)
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
		com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
		return extension.join(arg0);
	}
}

默認獲取的是failover對應的通過MockClusterWrapper裝飾器裝飾的 FailoverCluster對象負載均衡

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
}
public class FailoverCluster implements Cluster {
    public final static String NAME = "failover";
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
}

實際上FailoverCluster.join方法實現了 Cluster 到 invoker的轉換過程。

FailoverCluster 與 FailoverClusterInvoker 之間是對應的。 其中 FailoverCluster 做用是: 失敗轉移,當出現失敗,重試其它服務器,一般用於讀操做,但重試會帶來更長延遲。

咱們來看Cluster 和 Invoker 的集成結構圖:

MockClusterInvoker

MockClusterInvoker mock用於非業務異常時的服務降級 ,非業務異常是指 網絡抖動,超時,或者沒有服務提供者時等異常條件下 ,服務的臨時返回方案。 業務異常並不會走mock 的替代返回。

例如:某商品詳情訪問接口服務端異常,並不會走咱們的mock邏輯 , 若是網絡抖動或者沒有服務提供者這種狀況會走 mock邏輯,例如返回"商品禁止訪問,請稍後重試。"

直接上調用方法

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;

    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //1. no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock  強制走moke邏輯
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock 異常moke邏輯
        try {
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            } else {
                if (logger.isWarnEnabled()) {
                    logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                }
                result = doMockInvoke(invocation, e);
            }
        }
    }
    return result;
}
  1. 未指定mock ,則直接執行接下來步驟,異常直接拋出。
  2. 不走正常業務邏輯, 強制返回mock 調用結果。
  3. 進行正常調用,出現異常走mock邏輯

mock配置方式有如下兩種:

1.遠程調用方配置mock參數。配置方式:

<dubbo:reference id="demoService"  interface="com.alibaba.dubbo.demo.DemoService" check="false" mock="return zhangsan"/>

說明:配置了mock參數以後,好比在調用服務的時候出現網絡斷連,或者沒有服務啓動,那麼會把這個mock設置的值返回,也就是zhangsan
經過這種方式就能夠避免由於服務調用不到而帶來的程序不可用問題。

2. 經過制定業務處理類來進行返回。 配置方式:

<dubbo:reference id="demoService"  interface="com.alibaba.dubbo.demo.DemoService" check="false" mock="true"/>

當服務調用過程當中,網絡異常了,它會去自定義的mock業務處理類進行業務處理。所以還須要建立自定義mock業務處理類:

規則:在接口目錄下建立自定義mock業務處理類 , 同時實現Service接口 。 命名規則符合:interfaceService + Mock ,而且有無參構造方法
如:

public class DemoServiceMock implements DemoService {

	@Override
	public String sayHello(String name) {
		return "張三李四";
	}
}

配置完成後,若是出現非業務異常,則會調用自定義降級業務。

因爲MockClusterInvoker 是 ClusterInvoker 的裝飾器,因此接下來還要執行 ClusterInvoker 的 invoke方法。接下來以 FailoverClusterInvoker 爲例進行ClusterInvoker講解。

FailoverCluster

可經過 retries="2" 來設置重試次數(不含第一次)。

重試次數配置以下:

<dubbo:service retries="2" />
或
<dubbo:reference retries="2" />
或
<dubbo:reference>
    <dubbo:method name="findFoo" retries="2" />
</dubbo:reference>

FailoverClusterInvoker 源碼以下:

public Result invoke(final Invocation invocation) throws RpcException {

    checkWhetherDestroyed();

    LoadBalance loadbalance;
    // 得到全部服務提供者 Invoker 集合
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && invokers.size() > 0) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//異步的話,須要添加id
    return doInvoke(invocation, invokers, loadbalance);
}

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //重試時,進行從新選擇,避免重試時invoker列表已發生變化.
            //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                //從新檢查一下
                checkInvokers(copyinvokers, invocation);
            }
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

咱們看到若是僅是集羣容錯 它的實現原理比較簡單:

  1. 獲取遠程服務提供者代理對象集合列表
  2. 從獲取對應的LoadBanlance(負載均衡實體)
  3. select(loadbalance, invocation, copyinvokers, invoked) 根據負載均衡,調用實體,服務列表,已經調用過的服務列表(失敗的服務列表) 來獲取合適的remote Invoker 。
  4. 遠程服務調用
  5. 若是失敗,則從RegistryDirectory中從新獲取服務列表  ,進行從新選擇,避免重試時invoker列表已發生變化.
  6. 從新進行服務調用

除了FailoverCluster 失敗轉移的集羣容錯功能外,還有其餘:

Failfast Cluster

快速失敗,只發起一次調用,失敗當即報錯。一般用於非冪等性的寫操做,好比新增記錄。

Failsafe Cluster

失敗安全,出現異常時,直接忽略。一般用於寫入審計日誌等操做。

Failback Cluster

失敗自動恢復,後臺記錄失敗請求,定時重發。一般用於消息通知操做。

Forking Cluster

並行調用多個服務器,只要一個成功即返回。一般用於實時性要求較高的讀操做,但須要浪費更多服務資源。可經過 forks="2" 來設置最大並行數。

Broadcast Cluster

廣播調用全部提供者,逐個調用,任意一臺報錯則報錯。一般用於通知全部提供者更新緩存或日誌等本地資源信息。

集羣模式配置

按照如下示例在服務提供方和消費方配置集羣模式

<dubbo:service cluster="failsafe" />

<dubbo:reference cluster="failsafe" />

讚揚支持

相關文章
相關標籤/搜索