Dubbo解析(六)-服務調用

當dubbo消費方和提供方都發布和引用完成後,第四步就是消費方調用提供方。java

image

仍是以dubbo的DemoService舉例數組

-- 提供方
<dubbo:application name="demo-provider"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<dubbo:protocol name="dubbo" port="20880"/>
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

-- 消費方
<dubbo:application name="demo-consumer"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>

整個服務的調用過程都是以Invoker接口爲核心,經過不一樣的Invoker實現對象層層調用,完成整個RPC的調用。在調用過程當中,消費方構建包含調用信息的RpcInvocation,並封裝在Request中,傳輸到調用方,調用方解析出Request中的RpcInvocation完成調用,並將調用結果RpcResult封裝到Response中返回,而後由消費方接收後再從Response中解析出RpcResult中攜帶的調用結果完成整個調用。網絡

服務消費方發起請求

消費方獲取的demoService實例對象實際是代理工廠ProxyFactory.getProxy建立的代理對象app

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

於是調用demoService.sayHello方法時,實際調用的是javassist生成的代理對象。消費方的調用堆棧以下負載均衡

image

1. 代理對象執行sayHello方法

代理對象是由javassist生成,生成的sayHello方法以下異步

public String sayHello(String paramString){
    Object[] arrayOfObject = new Object[1];
    arrayOfObject[0] = paramString;
    Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
    return ((String)localObject);
}

2. 將方法名和方法參數傳入InvokerInvocationHandler.invoke執行

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

判斷非Object的方法,將方法和參數組裝成RpcInvocation,做爲Invoker.invoke的參數。ide

3. 執行MockClusterInvoker.invoke方法,MockClusterInvoker是Cluster服務接口的Wrapper包裝類MockClusterWrapper調用join方法返回的Invoker對象。

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
}

根據url獲取方法參數上的MOCK_KEY的值,判斷是否要執行mock調用:ui

a. 若是mock爲null或false,直接調用FailoverClusterInvoker b. 若是mock以force開頭,強制執行mock調用 c. 以上都不是,則先調用FailoverClusterInvoker,調用失敗再執行mock調用this

4. 執行FailoverClusterInvoker.invoke,Failover是Cluster集羣的默認策略。invoke方法由AbstractClusterInvoker執行,而後調用FailoverClusterInvoker的doInvoke實現。

// 執行Directory.list 和 router.route
List<Invoker<T>> invokers = list(invocation);
// 執行LoadBalance.select
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);

a. 經過Directory目錄服務的list方法獲取訂閱返回的服務提供者的Invoker對象集合,若是存在路由,路由服務根據策略過濾Invoker對象集合 b. 根據負載均衡策略LoadBalance來選擇一個Invokerurl

5. 執行Filter過濾器鏈和Listener監聽器鏈的invoke方法

Filter的Invoker執行器鏈由Protocol的Wrapper類ProtocolFilterWrapper.refer方法再調用buildInvokerChain構建。

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
    	// 倒序循環,順序執行
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                public URL getUrl() {
                    return invoker.getUrl();
                }

                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

而Listener的Invoker執行器則是Protocol的Wrapper類ProtocolListenerWrapper.refer方法返回的ListenerInvokerWrapper。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
            Collections.unmodifiableList(
                    ExtensionLoader.getExtensionLoader(InvokerListener.class)
                            .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}

6. 最終由DubboInvoker執行對遠程服務的調用

服務消費方調用遠程服務時,傳遞的參數是Request對象,封裝了RpcInvocation。

public class RpcInvocation implements Invocation, Serializable {

    private static final long serialVersionUID = -4355285085441097045L;

    // 方法名稱
    private String methodName;

    // 參數類型
    private Class<?>[] parameterTypes;

    // 參數值
    private Object[] arguments;

    // 附件
    private Map<String, String> attachments;

    private transient Invoker<?> invoker;
}

RpcInvocation包含了服務調用執行的方法名和參數,以及在附件中封裝了服務的path、interface、group和version。

在DubboInvoker.invoke中,先獲取交互層客戶端ExchangeClient,其中包含了和服務提供者的長鏈接,傳入RpcInvocation參數,由通訊層HeaderExchangeChannel將RpcInvocation封裝成Request,而後傳輸出去。

HeaderExchangeChannel

public void send(Object message, boolean sent) throws RemotingException {
    if (message instanceof Request
            || message instanceof Response
            || message instanceof String) {
        channel.send(message, sent);
    } else {
        Request request = new Request();
        request.setVersion("2.0.0");
        request.setTwoWay(false);
        request.setData(message);
        channel.send(request, sent);
    }
}

對於DemoService.sayHello方法,消費方傳輸的RpcInvocation的內容以下

RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]

而後由netty將數據傳輸到服務提供者,遠程調用的類型分爲同步,異步或oneway模式,對應的調用結果是:

1.oneway返回空RpcResult,不接收返回結果

2.異步方式直接返回空RpcResult,而異步獲取ResponseFuture回調

3.同步方式,調用方式仍是異步,經過等待ResponseFuture.get()返回結果

整個服務消費方的活動圖以下

image

服務提供方接收調用請求

服務提供方在export服務後,就打開端口等待服務消費方的調用。當服務消費方調用發送調用時,服務提供方netty接收到MessageEvent,調用NettyHandler的messageReceived方法,而後向上一直調用到DubboProtocol的requestHandler的reply方法,獲取到對應的Invoker,執行invoke方法調用服務的實際實現。

服務提供者的調用堆棧,調用從下到上,分爲兩個部分,由兩個線程來執行。

1. 從netty接收請求交予NettyHandler處理,到DubboProtocol的requestHandler,中間通過不少ChannelHandler,對請求的消息進行不一樣的處理。

在AllChannelHandler中,會把請求放入事件處理線程池中,由ChannelEventRunnable線程執行。這是一個分發機制(Dispatch服務),用於指定事件執行的線程池的模型。

AllChannelHandler

cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

在HeaderExchangeHandler中,handleRequest方法從Request中獲取RpcInvocation,而後調用DubboProtocol的requestHandler。

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // find handler by message class.
    Object msg = req.getData();
    try {
        // handle data.
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

2. DubboProtocol的requestHandler是一個ExchangeHandlerAdapter的內部類,received方法又調用reply方法,判斷message是Invocation類型,根據Invocation獲取服務調用Invoker。

方法參數Invocation,就是服務消費方發送的消息

RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, input=201, dubbo=2.0.0, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]

從中獲取path、group和version,組裝成serviceKey,從exporterMap中獲取DubboExporter,從而獲得Invoker。

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        Invocation inv = (Invocation) message;
        // 根據invocation匹配Invoker
        Invoker<?> invoker = getInvoker(channel, inv);
        // 部分省略
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
        return invoker.invoke(inv);
    }
}

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    int port = channel.getLocalAddress().getPort();
    String path = inv.getAttachments().get(Constants.PATH_KEY);
    // 組裝serviceKey
    String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

    // 根據serviceKey從exporterMap中獲取DubboExporter,再得到Invoker
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

    return exporter.getInvoker();
}

3. 通過提供方Filter Invoker鏈,執行前置攔截,由Protocol的包裝類ProtocolFilterWrapper.export再調用buildInvokerChain構建。

Filter分爲消費方Filter和提供方Filter,是經過Filter實現類上@Activate的group設置。如消費方的ConsumerContextFilter的group=consumer,而提供方的ContextFilter的group=provider

@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {}

@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {}

4.調用JavassistProxyFactory.getInvoker生成的AbstractProxyInvoker.invoke

JavassistProxyFactory.getInvoker建立實際對象的Wrapper類,並返回AbstractProxyInvoker內部類對象。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

AbstractProxyInvoker.invoke執行時,調用上面內部類的doInvoke方法,並將執行結果封裝成RpcResult。

public Result invoke(Invocation invocation) throws RpcException {
    try {
        return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
    } catch (InvocationTargetException e) {
        return new RpcResult(e.getTargetException());
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

5.經過Wrapper包裝類,執行真正的demoService的方法

public class Wrapper0 extends Wrapper {
    public Object invokeMethod(Object o, String n, Class[] p, Object[] v)
    throws java.lang.reflect.InvocationTargetException {
        com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
        
        try {
            w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        
        try {
            if ("sayHello".equals($2) && ($3.length == 1)) {
                return ($w) w.sayHello((java.lang.String) $4[0]);
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        
        throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException(
            "Not found method \"" + $2 +
            "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
        }
    }
}

服務提供方執行的活動圖

image

服務調用結果返回

當提供方執行完真正服務實現的方法後,須要將返回值傳輸給消費方

  1. 提供方在AbstractProxyInvoker中組裝返回結果成RpcResult
  2. 提供方部分Filter鏈還有後置攔截操做,如處理異常的ExceptionFilter
  3. 在HeaderExchangeHandler.handleRequest方法中,將RpcResult封裝到Response返回
  4. 通過網絡傳輸,回到消費方,DefaultFuture.returnFromResponse方法從Response中解析出RpcResult
  5. 通過層層返回,來到InvokerInvocationHandler,調用RpcResult.recreate方法返回調用結果

Request和Response的組裝和解析

  1. 消費方執行HeaderExchangeChannel.request時將RpcInvocation組裝成Request
public ResponseFuture request(Object request, int timeout) throws RemotingException {
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
  1. 提供方執行HeaderExchangeHandler.handleRequest時,解析出Request的RpcInvocation,並將返回結果RpcResult封裝到Response中
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // find handler by message class.
    Object msg = req.getData();
    try {
        // handle data.
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}
  1. 消費方等待提供方返回而後在DefaultFuture.get方法中執行returnFromResponse方法,從Response中獲取RpcResult
private Object returnFromResponse() throws RemotingException {
    Response res = response;
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        return res.getResult();
    }
    if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
    }
    throw new RemotingException(channel, res.getErrorMessage());
}

整個Dubbo服務調用的活動圖

image

參考:https://blog.csdn.net/quhongwei_zhanqiu/article/details/41701979

相關文章
相關標籤/搜索