Dubbo集羣調用模式之Mergeable實現

注: Dubbo版本是2.5.7java

                        

                                                        圖1 MergeableClusterInvoker的類繼承圖多線程

1.Mergeable的含義

    Mergeable,即對結果集進行合併。app

2.Dubbo中是怎麼實現

    核心代碼在MergeableClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源碼以下。代碼看似有點多,可是分析主要邏輯的話,不復雜。ide

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    List<Invoker<T>> invokers = directory.list(invocation);
    String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
    Class<?> returnType;
    try {
        returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
    } catch (NoSuchMethodException e) {
        returnType = null;
    }
    Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
    for (final Invoker<T> invoker : invokers) {
        Future<Result> future = executor.submit(new Callable<Result>() {
            public Result call() throws Exception {
                return invoker.invoke(new RpcInvocation(invocation, invoker));
            }
        });
        results.put(invoker.getUrl().getServiceKey(), future);
    }
    Object result = null;
    List<Result> resultList = new ArrayList<Result>(results.size());
    int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
        Future<Result> future = entry.getValue();
        try {
            Result r = future.get(timeout, TimeUnit.MILLISECONDS);
            if (r.hasException()) {
                log.error(new StringBuilder(32).append("Invoke ")
                                .append(getGroupDescFromServiceKey(entry.getKey()))
                                .append(" failed: ")
                                .append(r.getException().getMessage()).toString(),
                        r.getException());
            } else {
                resultList.add(r);
            }
        } catch (Exception e) {
            throw new RpcException(new StringBuilder(32)
                    .append("Failed to invoke service ")
                    .append(entry.getKey())
                    .append(": ")
                    .append(e.getMessage()).toString(),
                    e);
        }
    }
    if (resultList.size() == 0) {
        return new RpcResult((Object) null);
    } else if (resultList.size() == 1) {
        return resultList.iterator().next();
    }
    if (returnType == void.class) {
        return new RpcResult((Object) null);
    }
    if (merger.startsWith(".")) {
        merger = merger.substring(1);
        Method method;
        try {
            method = returnType.getMethod(merger, returnType);
        } catch (NoSuchMethodException e) {
            throw new RpcException(new StringBuilder(32)
                    .append("Can not merge result because missing method [ ")
                    .append(merger)
                    .append(" ] in class [ ")
                    .append(returnType.getClass().getName())
                    .append(" ]")
                    .toString());
        }
        if (method != null) {
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                if (method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException(
                        new StringBuilder(32)
                                .append("Can not merge result: ")
                                .append(e.getMessage()).toString(),
                        e);
            }
        } else {
            throw new RpcException(
                    new StringBuilder(32)
                            .append("Can not merge result because missing method [ ")
                            .append(merger)
                            .append(" ] in class [ ")
                            .append(returnType.getClass().getName())
                            .append(" ]")
                            .toString());
        }
    } else {
        Merger resultMerger;
        if (ConfigUtils.isDefault(merger)) {
            resultMerger = MergerFactory.getMerger(returnType);
        } else {
            resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
        }
        if (resultMerger != null) {
            List<Object> rets = new ArrayList<Object>(resultList.size());
            for (Result r : resultList) {
                rets.add(r.getValue());
            }
            result = resultMerger.merge(
                    rets.toArray((Object[]) Array.newInstance(returnType, 0)));
        } else {
            throw new RpcException("There is no merger to merge result.");
        }
    }
    return new RpcResult(result);
}

    1.首先獲得服務提供者列表,遍歷服務提供者,對每一個服務提供者調用服務,這個調用過程是封裝在Callable中的,放在線程池中執行的。ui

    2.步驟1中獲得的是個Future集合,即上面代碼段中的results。spa

    3.對results中的Future,進行Future.get(),即阻塞等待線程執行完成。這樣獲得全部的結果集,即上面代碼段中的resultList。線程

    4.爲簡化的目的,咱們注重分析下面代碼段中的源碼。不考慮Merger是怎麼來的話,代碼比較簡單,取出集合resultList中Result的value,結果也是個集合,即下面代碼段中的rets,最後用Merger對rets集合合併操做。code

} else {
    Merger resultMerger;
    if (ConfigUtils.isDefault(merger)) {
        resultMerger = MergerFactory.getMerger(returnType);
    } else {
        resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
    }
    if (resultMerger != null) {
        List<Object> rets = new ArrayList<Object>(resultList.size());
        for (Result r : resultList) {
            rets.add(r.getValue());
        }
        result = resultMerger.merge(
                rets.toArray((Object[]) Array.newInstance(returnType, 0)));
    } else {
        throw new RpcException("There is no merger to merge result.");
    }
}

 

    重點是使用多線程,調用服務提供者,最後將獲得的結果集,用Merger進行合併。繼承

相關文章
相關標籤/搜索