注:Dubbo版本是2.6.2。java
圖1 Dubbo的ForkingClusterInvoker類繼承圖ide
並行調用多個服務,只要一個成功即返回,可是這要消耗更多的資源。spa
核心代碼在ForkingClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源碼以下。代碼看起來比較多,可是咱們分析主要邏輯的話,不復雜。線程
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); final List<Invoker<T>> selected; final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<Invoker<T>>(); for (int i = 0; i < forks; i++) { Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) {//Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); for (final Invoker<T> invoker : selected) { executor.execute(new Runnable() { @Override public void run() { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } } } }); } try { Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } }
} catch (Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } }
能夠看到,咱們的請求是異常的。在最後,從ref隊列中取出第一個,假設爲r,並且是帶有超時的那種等待。若是從隊列中poll時,拋出InterruptedException異常,則將其封裝後拋出;若是r是一個Throwable類型,則說明全部的請求都失敗了,拋出異常;若是r不是Throwable則說明存在請求成功的狀況,返回r。code
try { Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); }
重點是:並行調用多個服務,只要有一個成功,則返回結果,不等其它的線程執行完成。假設並行請求A一、A二、A3,A1請求使用了3s,A2請求用了2s,A3請求用了5s,且A2請求的結果最早放入到隊列中,那麼主線程就返回A2請求的結果,主線程不等A1和A3線程執行完成。orm