dubbo的啓動過程(五)--消費方

配置初始化

ReferenceBeanServiceBean同樣,都實現了InitializingBean的接口,天然也會調用afterPropertiesSet方法在,這個方法和dubbo的啓動過程(二)--服務方屬性配置雷同,這邊就不繼續了。除了InitializingBean這個接口,他還繼承了FactoryBean接口,實例化的時候,就會調用getObject方法,例子看spring學習之FactoryBean,源碼見doGetObjectFromFactoryBeanspring

public Object getObject() {  
    return get();  
}
public synchronized T get() {
    // 各類配置的更新
    checkAndUpdateSubConfigs();

    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}

init

// 以上代碼跟服務方雷同,省略
ref = createProxy(map);
String serviceKey = URL.buildKey(interfaceName, group, version);
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
initialized = true;

建立invoke

createProxy方法中,會調用RegistryProtocolrefer方法,這裏先略過ProtocolFilterWrapperProtocolListenerWrapperQosProtocolWrappersegmentfault

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 獲取註冊中心
    url = URLBuilder.from(url)
            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
            .removeParameter(REGISTRY_KEY)
            .build();
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 關聯引用
    return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 把消費方信息註冊到註冊中心
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    // 獲取invoker並緩存
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

cluster.join(directory)這個代碼,會執行包裝類MockClusterWrapper的join方法:緩存

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
}

因此invoke是MockClusterInvoker。app

建立代理

createProxy方法中,invoker建立完了,就開始建立代理,代理類中,經過newInstance的構造參數,把InvokerInvocationHandler傳入到成員變量中,後面調用的方法,就是經過這個類的invoke方法進行反射的。負載均衡

// 調用下面的JavassistProxyFactory的getProxy方法
return (T) PROXY_FACTORY.getProxy(invoker);

// 注意傳入的是InvokerInvocationHandler
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

執行方法

當執行方法的時候,就會執行InvokerInvocationHandler的invoke方法:異步

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    // Object直接調用返回
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    // toString、hashCode、equals返回響應的invoke方法
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }
    // 這個invoke就是MockClusterInvoker,RpcInvocation傳入方法參數信息
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

MockClusterInvoker#invokeasync

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // 是否配置了mock
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    
    // 。。。省略部分代碼。若是強制調用mock,調用本地的mock
    result = doMockInvoke(invocation, null);
    // 沒有mock,調用FailoverClusterInvoker的方法
    result = this.invoker.invoke(invocation);
    // 。。。省略
    return result;
}

這邊會調用FailoverClusterInvoker的父類AbstractClusterInvoker#invokeide

public Result invoke(final Invocation invocation) throws RpcException {
    // 檢查是否被Destroyed
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // 獲取Invoker列表
    List<Invoker<T>> invokers = list(invocation);
    // 負載均衡
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // FailoverClusterInvoker#doInvoke
    return doInvoke(invocation, invokers, loadbalance);
}

FailoverClusterInvoker#doInvoke
失敗重試、負載均衡這裏設置oop

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    // 檢查invokers是否爲空
    checkInvokers(copyInvokers, invocation);
    // 獲取方法名
    String methodName = RpcUtils.getMethodName(invocation);
    // 失敗重試次數
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    // 最後一次exception
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            // 檢查是否Destroyed
            checkWhetherDestroyed();
            // 獲取Invokers 
            copyInvokers = list(invocation);
            // check again 檢查invokers是否爲空
            checkInvokers(copyInvokers, invocation);
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // InvokerWrapper#invoke
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                // 。。。。
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 。。。。
}

InvokerWrapper#invoke:
這個invoker是ProtocolFilterWrapper,在這個invoke中,會調用ProtocolFilterWrapper&CallbackRegistrationInvoker#invoke-->ConsumerContextFilter#invoke-->FutureFilter#invoke-->MonitorFilter#invoke-->ListenerInvokerWrapper#invoke-->AsyncToSyncInvoker#invoke-->DubboInvoker#invoke學習

public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

ConsumerContextFilter#invoke:
在上下文設置invoker、invocation、LocalAddress、RemoteAddress、RemoteApplicationName、Attachment,再把invoker設置到invocation中,執行完下一個invoke後,再清空上下文信息。

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    RpcContext.getContext()
            .setInvoker(invoker)
            .setInvocation(invocation)
            .setLocalAddress(NetUtils.getLocalHost(), 0)
            .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort())
            .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY))
            .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY));
    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(invoker);
    }
    try {
        RpcContext.removeServerContext();
        return invoker.invoke(invocation);
    } finally {
        RpcContext.removeContext();
    }
}

FutureFilter#invoke:
主要是設置回調的

public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
    fireInvokeCallback(invoker, invocation);
    // need to configure if there's return value before the invocation in order to help invoker to judge if it's
    // necessary to return future.
    return invoker.invoke(invocation);
}

MonitorFilter#invoke:
接口對應的方法調用的次數。
key爲:接口全限定命.方法
value是AtomicInteger類型,記錄調用次數

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
        invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
    }
    return invoker.invoke(invocation); // proceed invocation chain
}

// concurrent counter
private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
    String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
    AtomicInteger concurrent = concurrents.get(key);
    if (concurrent == null) {
        concurrents.putIfAbsent(key, new AtomicInteger());
        concurrent = concurrents.get(key);
    }
    return concurrent;
}

ListenerInvokerWrapper#invoke:
直接調用AsyncToSyncInvoker#invoke

public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

AsyncToSyncInvoker#invoke:
若是是同步,就阻塞等待返回,若是是異步,就直接返回

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);

    try {
        // 同步就阻塞等待返回
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}

AbstractInvoker#invoke

public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    // 傳入當前的dubboInvoker
    invocation.setInvoker(this);
    if (CollectionUtils.isNotEmptyMap(attachment)) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
        invocation.addAttachments(contextAttachments);
    }
    // 包括FUTURE、ASYNC、SYNC
    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
    // Async時設置ID
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    try {
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
    }
}

DubboInvoker#doInvoke
遠程調用接口,獲取數據

protected Result doInvoke(final Invocation invocation) throws Throwable {
    // 給inv設置路徑和版本信息
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);
    // 獲取一個ExchangeClient
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 是否爲Oneway
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            asyncRpcResult.subscribeTo(responseFuture);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(responseFuture);
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
相關文章
相關標籤/搜索