Dubbo點滴(2)之集羣容錯

首先看來自官方文檔java

wKiom1hRRW2x2tdlAANArwZrnFI749.png

  • 這裏的Invoker是Provider的一個可調用Service的抽象,Invoker封裝了Provider地址及Service接口信息。算法

  • Directory表明多個Invoker,能夠把它當作List<Invoker>,但與List不一樣的是,它的值多是動態變化的,好比註冊中心推送變動。api

  • Cluster將Directory中的多個Invoker假裝成一個Invoker,對上層透明,假裝過程包含了容錯邏輯,調用失敗後,重試另外一個。安全

  • Router負責從多個Invoker中按路由規則選出子集,好比讀寫分離,應用隔離等。服務器

  • LoadBalance負責從多個Invoker中選出具體的一個用於本次調用,選的過程包含了負載均衡算法,調用失敗後,須要重選。
    負載均衡

之前本身看到上面的文字,理解不太深。隨着對分佈式系統的深刻接觸,以及DUBBO源碼的研究,纔有了更精確的理解。總結一句話:閱讀技術類文檔時,對一些語言的理解,受到閱讀者自身狀況的限制,每每理解有深有淺。閒話少說。本文,就是針對圖中的組件,一個個進行剖析。異步

1.Invoker
async

Invoker 是Provider的一個可調用Service的抽象,Invoker封裝了Provider地址及Service接口信息。分佈式

1.1 API
ide

public interface Invoker<T> extends Node {
    /**
     * get service interface.
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;
}

1.2  層次樹結構

wKiom1hRR03CvULSAABHw_wiGfU581.png

分析可知:主要有2個分支

1.AbstractInvoker:主要具體遠程實現,和RPC 協議有關,屬於dubbo-rpc-api範疇。

2.AbstractClusterInvoker:主要邏輯包括Invoker的選擇,和高可用有關,屬於dubbo-cluster範疇。

1.3 AbstractInvoker VS AbstractClusterInvoker

1.3.1 類圖比較

經過下圖能夠很容易發現:AbstractClusterInvoker比較AbstractInvoker,多了些select,doselect,reselect方法。這些方法的做用也能夠猜到。

wKiom1hRSOry9ZRBAACFwNBpopo819.png

1.3.2 以兩個實現類對比

選擇AbstractInvoker的子類DubboInvoker,

和AbstractClusterInvoker的子類FailoverClusterInvoker爲表明。

其中FailoverClusterInvoker是dubbo集羣容錯默認方案,Dubbo協議爲默認協議。

先看下AbstractInvoker分支

public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {

    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    Map<String, String> context = RpcContext.getContext().getAttachments();
    
    try {
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
    } catch (RpcException e) {
    } catch (Throwable e) {    
    }
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
..
}

public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);
    logger.debug("doInvoke start -----------------------------");
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
           ResponseFuture future = currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
           RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }finally {
        logger.debug("doInvoke end-----------------------------");
    }


}
...
}

這裏能夠很清晰地看到,遠程調用分三種狀況:
1. 不須要返回值的調用(所謂oneWay)
2. 異步(async)
3. 同步

對於第一種狀況,客戶端只管發請求就完了,不考慮返回結果。
對於第二種狀況,客戶端除了發請求,還須要將結果塞到一個ThreadLocal變量中,以便於客戶端get返回值
對於第三種狀況,客戶端除了發請求,還會同步等待返回結果

再看下AbstractClusterInvoker分支

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {

    checkWheatherDestoried();

    LoadBalance loadbalance;
    
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && invokers.size() > 0) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

protected  List<Invoker<T>> list(Invocation invocation) throws RpcException {
   List<Invoker<T>> invokers = directory.list(invocation);
   return invokers;
}
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                   LoadBalance loadbalance) throws RpcException;
..
}

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
   List<Invoker<T>> copyinvokers = invokers;
   checkInvokers(copyinvokers, invocation);
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
       //重試時,進行從新選擇,避免重試時invoker列表已發生變化.
       //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變
       if (i > 0) {
          checkWheatherDestoried();
          copyinvokers = list(invocation);
          //從新檢查一下
          checkInvokers(copyinvokers, invocation);
       }
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List)invoked);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
 }
...
}

總結下AbstractClusterInvoker和AbstractClusterInvoker都須要實現Invoker接口,二者都聲明瞭doInvoke抽象方法。但方法定義有區別。

protected abstract Result doInvoke(Invocation invocation) throws Throwable;

protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                   LoadBalance loadbalance) throws RpcException;

AbstractClusterInvoker 須要一個Invoker列表,他們來自Directory,LoadBalance 能夠能夠理解爲負載均衡策略。

二者的聯繫:AbstractClusterInvoker 比AbstractInvoker多了一些選擇和負載均衡部分,到最後仍是會調用AbstractInvoker分支,負責具體RPC調用工做。

1.4 常見容錯方案對比

Feature
優勢
缺點
實現類
Failover Cluster
失敗自動切換,當出現失敗,重試其它服務器,一般用於讀操做(推薦使用)
重試會帶來更長延遲
FailoverClusterInvoker
Failfast Cluster
快速失敗,只發起一次調用,失敗當即報錯,一般用於非冪等性的寫操做
若是有機器正在重啓,可能會出現調用失敗
FailfastClusterInvoker
Failsafe Cluster
失敗安全,出現異常時,直接忽略,一般用於寫入審計日誌等操做
調用信息丟失

FailsafeClusterInvoker
Failback Cluster
失敗自動恢復,後臺記錄失敗請求,定時重發,一般用於消息通知操做
不可靠,重啓丟失

FailbackClusterInvoker

Forking Cluster

並行調用多個服務器,只要一個成功即返回,一般用於實時性要求較高的讀操做
須要浪費更多服務資源
ForkingClusterInvoker

Broadcast

Cluster

廣播調用全部提供者,逐個調用,任意一臺報錯則報錯,一般用於更新提供方本地狀態
速度慢,任意一臺報錯則報錯
BroadcastClusterInvoker


2. 總結

wKiom1hRWWTiPqlFAABh-mR8ack176.png

*ClusterInvoker 分別使用Router,和Directory獲取Invoker列表。

*ClusterInvoker而後再Invoker列表中,藉助LoadBalance提供的負載均衡策略,返回一個可用的Invoker.

*Directory表明多個Invoker,能夠理解爲Invoker的邏輯集合,負責Invoker的下線,上線。


Router和Directory二者都對我提供Invoker列表。經過提供的API,很容易找出區別來。

//Directory
List<Invoker<T>> list(Invocation invocation) throws RpcException;
//Route
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

Directory:返回的Invoker列表,表明當前正常對外提供服務的Invoker列表。重點在維護可用節點列表。

Route:返回的Invoker列表,是在現有可用的Invoker列表裏,按照規則進行再篩選,重點在規則匹配,過濾等。

相關文章
相關標籤/搜索