注: Dubbo版本是2.5.7java
圖1 MergeableClusterInvoker的類繼承圖多線程
Mergeable,即對結果集進行合併。app
核心代碼在MergeableClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源碼以下。代碼看似有點多,可是分析主要邏輯的話,不復雜。ide
@Override public Result invoke(final Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); Class<?> returnType; try { returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); } catch (NoSuchMethodException e) { returnType = null; } Map<String, Future<Result>> results = new HashMap<String, Future<Result>>(); for (final Invoker<T> invoker : invokers) { Future<Result> future = executor.submit(new Callable<Result>() { public Result call() throws Exception { return invoker.invoke(new RpcInvocation(invocation, invoker)); } }); results.put(invoker.getUrl().getServiceKey(), future); } Object result = null; List<Result> resultList = new ArrayList<Result>(results.size()); int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); for (Map.Entry<String, Future<Result>> entry : results.entrySet()) { Future<Result> future = entry.getValue(); try { Result r = future.get(timeout, TimeUnit.MILLISECONDS); if (r.hasException()) { log.error(new StringBuilder(32).append("Invoke ") .append(getGroupDescFromServiceKey(entry.getKey())) .append(" failed: ") .append(r.getException().getMessage()).toString(), r.getException()); } else { resultList.add(r); } } catch (Exception e) { throw new RpcException(new StringBuilder(32) .append("Failed to invoke service ") .append(entry.getKey()) .append(": ") .append(e.getMessage()).toString(), e); } } if (resultList.size() == 0) { return new RpcResult((Object) null); } else if (resultList.size() == 1) { return resultList.iterator().next(); } if (returnType == void.class) { return new RpcResult((Object) null); } if (merger.startsWith(".")) { merger = merger.substring(1); Method method; try { method = returnType.getMethod(merger, returnType); } catch (NoSuchMethodException e) { throw new RpcException(new StringBuilder(32) .append("Can not merge result because missing method [ ") .append(merger) .append(" ] in class [ ") .append(returnType.getClass().getName()) .append(" ]") .toString()); } if (method != null) { if (!Modifier.isPublic(method.getModifiers())) { method.setAccessible(true); } result = resultList.remove(0).getValue(); try { if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) { for (Result r : resultList) { result = method.invoke(result, r.getValue()); } } else { for (Result r : resultList) { method.invoke(result, r.getValue()); } } } catch (Exception e) { throw new RpcException( new StringBuilder(32) .append("Can not merge result: ") .append(e.getMessage()).toString(), e); } } else { throw new RpcException( new StringBuilder(32) .append("Can not merge result because missing method [ ") .append(merger) .append(" ] in class [ ") .append(returnType.getClass().getName()) .append(" ]") .toString()); } } else { Merger resultMerger; if (ConfigUtils.isDefault(merger)) { resultMerger = MergerFactory.getMerger(returnType); } else { resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); } if (resultMerger != null) { List<Object> rets = new ArrayList<Object>(resultList.size()); for (Result r : resultList) { rets.add(r.getValue()); } result = resultMerger.merge( rets.toArray((Object[]) Array.newInstance(returnType, 0))); } else { throw new RpcException("There is no merger to merge result."); } } return new RpcResult(result); }
1.首先獲得服務提供者列表,遍歷服務提供者,對每一個服務提供者調用服務,這個調用過程是封裝在Callable中的,放在線程池中執行的。ui
2.步驟1中獲得的是個Future集合,即上面代碼段中的results。spa
3.對results中的Future,進行Future.get(),即阻塞等待線程執行完成。這樣獲得全部的結果集,即上面代碼段中的resultList。線程
4.爲簡化的目的,咱們注重分析下面代碼段中的源碼。不考慮Merger是怎麼來的話,代碼比較簡單,取出集合resultList中Result的value,結果也是個集合,即下面代碼段中的rets,最後用Merger對rets集合合併操做。code
} else { Merger resultMerger; if (ConfigUtils.isDefault(merger)) { resultMerger = MergerFactory.getMerger(returnType); } else { resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); } if (resultMerger != null) { List<Object> rets = new ArrayList<Object>(resultList.size()); for (Result r : resultList) { rets.add(r.getValue()); } result = resultMerger.merge( rets.toArray((Object[]) Array.newInstance(returnType, 0))); } else { throw new RpcException("There is no merger to merge result."); } }
重點是使用多線程,調用服務提供者,最後將獲得的結果集,用Merger進行合併。繼承