Dubbo集羣容錯模式之Forking實現

注:Dubbo版本是2.6.2。java

                 

                                                        圖1 Dubbo的ForkingClusterInvoker類繼承圖ide

1.Forking容錯的含義

    並行調用多個服務,只要一個成功即返回,可是這要消耗更多的資源。spa

2.Forking的實現

    核心代碼在ForkingClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源碼以下。代碼看起來比較多,可是咱們分析主要邏輯的話,不復雜。線程

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    final List<Invoker<T>> selected;
    final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
    final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    if (forks <= 0 || forks >= invokers.size()) {
        selected = invokers;
    } else {
        selected = new ArrayList<Invoker<T>>();
        for (int i = 0; i < forks; i++) {
            Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
            if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                selected.add(invoker);
            }
        }
    }
    RpcContext.getContext().setInvokers((List) selected);
    final AtomicInteger count = new AtomicInteger();
    final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
    for (final Invoker<T> invoker : selected) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Result result = invoker.invoke(invocation);
                    ref.offer(result);
                } catch (Throwable e) {
                    int value = count.incrementAndGet();
                    if (value >= selected.size()) {
                        ref.offer(e);
                    }
                }
            }
        });
    }
    try {
        Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
        if (ret instanceof Throwable) {
            Throwable e = (Throwable) ret;
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
        return (Result) ret;
    } catch (InterruptedException e) {
        throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
    }
}
  • 首先獲取咱們設定的forks,是個int類型值,爲了便於描述,假設這個forks的值是a。
  • 從invokers中選出a個Invoker放到selected中。
  • 遍歷selected,將每一個請求封裝在Runnable中,Runnable中將請求的結果放入到隊列ref中。注意Runnable中的實現細節以下,只有在最後一次請求失敗時,將異常exception放如到offer中。
} catch (Throwable e) {
    int value = count.incrementAndGet();
    if (value >= selected.size()) {
        ref.offer(e);
    }
}

    能夠看到,咱們的請求是異常的。在最後,從ref隊列中取出第一個,假設爲r,並且是帶有超時的那種等待。若是從隊列中poll時,拋出InterruptedException異常,則將其封裝後拋出;若是r是一個Throwable類型,則說明全部的請求都失敗了,拋出異常;若是r不是Throwable則說明存在請求成功的狀況,返回r。code

try {
    Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
    if (ret instanceof Throwable) {
        Throwable e = (Throwable) ret;
        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
    }
    return (Result) ret;
} catch (InterruptedException e) {
    throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}

 

    重點是:並行調用多個服務,只要有一個成功,則返回結果,不等其它的線程執行完成。假設並行請求A一、A二、A3,A1請求使用了3s,A2請求用了2s,A3請求用了5s,且A2請求的結果最早放入到隊列中,那麼主線程就返回A2請求的結果,主線程不等A1和A3線程執行完成。orm

相關文章
相關標籤/搜索