服務調用

強烈建議查看官方文檔html

@Autowired
@Qualifier("serviceImpl")
private TestService testService;

調用現場

這裏咱們能夠發現,testService是proxy0對象,也就是服務引用那篇裏返回的,java

@Autowired TestService testService:spring會去加載該Bean,調用到ReferenceBean.getObject獲取對象spring

-->InvokerInvocationHandler.invoke
  -->RpcInvocation  //全部請求都會轉爲RpcInvocation
  -->MockClusterInvoker.invoke //1.進入集羣
    -->result = this.invoker.invoke(invocation);
      -->AbstractClusterInvoker.invoke
        -->list(invocation)
          -->directory.list //2.進入目錄查找 從this.methodInvokerMap裏面查找一個Invoker
            -->AbstractDirectory.list
              -->doList(invocation)
                -->RegistryDirectory.doList //從this.methodInvokerMap裏面查找一個Invoker
              -->router.route //3.進入路由
                -->MockInvokersSelector.route
                  -->getNormalInvokers
        -->ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random")
        -->doInvoke
          -->FailoverClusterInvoker.doInvoke
            -->select //4.進入負載均衡
              -->AbstractClusterInvoker.select
                -->doselect  //這裏,若是集羣中只有一個服務,直接返回
                  -->loadbalance.select
                    -->AbstractLoadBalance.select
                      -->doSelect
                        -->RoundRobinLoadBalance.doSelect
                          -->invokers.get(currentSequence % length)//取模輪循
            -->Result result = invoker.invoke(invocation)
            -------------擴展點----------------
              -->InvokerWrapper.invoke
                -->ListenerInvokerWrapper.invoke
                  -->ConsumerContextFilter.invoke
                    -->ProtocolFilterWrapper.invoke
                      -->MonitorFilter.invoke
                        -->ProtocolFilterWrapper.invoke
                          -->FutureFilter.invoke
                            -->ListenerInvokerWrapper.invoke
                              -->AbstractInvoker.invoke  
                             //將附加消息(attachment)添加到invocation, 將帶到服務端去
                              ---------------擴展點---------------
                                -->doInvoke(invocation)
-------------***------------      -->DubboInvoker.doInvoke 
                                  //這裏主線程會等待,直到被喚醒,並且有返回值(通常是這種)
                                  //爲何DubboInvoker是個protocol? 由於
                                  //registryDirectory.refreshInvoker.toInvokers:protocol.refer
        
                                   -------------------邏輯隔開線--------------------
                                    -->ReferenceCountExchangeClient.request
                                      -->HeaderExchangeClient.request
-----------------***------------------- -->HeaderExchangeChannel.request  //建立request(自帶ID)
                                          -->AbstractPeer.send
                                            -->AbstractClient.send
                                              -->NettyChannel.send
                                                //裏面就是netty客戶端向服務端發送消息的邏輯
                                                //channel.writeAndFlush(message)
                                                // 這裏會使用netty的worke線程池去調用
                                                //  NettyClientHandler#write

中間涉及到對數據的編碼操做apache

ExchangeCodec#encode-->encodeRequest-->DubboCodec #encodeRequestData(序列化請求參數)c#

解碼操做數組

ExchangeCodec#decode-->decodeBody-->DubboCodec -->decodeBody-->DecodeableRpcInvocation.decode併發

------------------------來到服務端----------
NettyClientHandler#write
-->NettyServerHandler.channelRead 
//(這裏dubbo官網說的是NettyHandler#messageReceived,可是個人2.6.6版本並無進入那個方法,而是這裏寫着的write,channelRead)
//服務端接收消息
 -->AbstractPeer.received
   -->MultiMessageHandler.received
     -->HeartbeatHandler.received
       -->AllChannelHandler.received
         -->ChannelEventRunnable.run
           -->DecodeHandler.received
             //中間插入解碼操做(主要是針對運行時解碼)
             -->HeaderExchangeHandler.received
---****---     -->handleRequest(這個方法執行目標對象的目標方法)
             //Object result = handler.reply(channel, msg); 
            //handle是DubboProtocol.requestHandler屬性
              -->DubboProtocol.ExchangeHandler.reply
                -->Invoker.invoke // 執行過濾連 省略過濾鏈步驟
                  -->InvokerWrapper.invoke
                    -->DelegateProviderMetaDataInvoker.invoke
----****----            -->AbstractProxyInvoker.invoke
                      //new RpcResult(doInvoke(proxy, invocation.getMethodName(), 
                      //invocation.getParameterTypes(), invocation.getArguments()));
                        -->wrapper.invokeMethod(proxy, methodName, 
                                                parameterTypes, arguments);
                      //JavassistProxyFactory.getInvoker.AbstractProxyInvoker.doInvoke
                          -->getData (目標方法)
-----------------邏輯分割線-------------------
               -->AbstractPeer.send
                 -->NettyChannel.send
                   //channel.writeAndFlush(message) 返回消息到客戶端
                   -->NettyServerHandler.write
                   ------------------服務端執行完畢----------------------
                     -->NettyClientHandler.channelRead(客戶端接收消息)
                       -->AbstractPeer.received
                         -->MultiMessageHandler.received
                           -->HeartbeatHandler.received
                             -->AllChannelHandler.received
                               -->ChannelEventRunnable.run
                                 -->DecodeHandler.received
                                   -->HeaderExchangeHandler.received
                                     //中間插入解碼操做(對返回的信息)
                                     -->handleResponse
                                       -->DefaultFuture.received
                                         -->DefaultFuture.doReceived
                                           //response = res 將返回的信息保存在DefaultFuture中
                                           //done.signal() 喚醒DubboInvoker.doInvoke中暫停的主線程

DubboInvoker#doInvokeapp

該方法在請求時經過get方法處於while(true)等待中負載均衡

當被喚醒並且有返回值後(一般使用的這一種)繼續執行主線程dom

return (Result) currentClient.request(inv, timeout).get();
  -->currentClient.request //這裏就是執行的以上全部過程(請求與相應)
  -->DefaultFuture.get(timeout)
    -->returnFromResponse
      // retrun response.getResult()

DubboInvoker

public class DubboInvoker<T> extends AbstractInvoker<T> {
    
    private final ExchangeClient[] clients;
    
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 設置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 從 clients 數組中獲取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 獲取異步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 爲 true,表示「單向」通訊
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 異步無返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 發送請求
                currentClient.send(inv, isSent);
                // 設置上下文中的 future 字段爲 null
                RpcContext.getContext().setFuture(null);
                // 返回一個空的 RpcResult
                return new RpcResult();
            } 

            // 異步有返回值
            else if (isAsync) {
                // 發送請求,並獲得一個 ResponseFuture 實例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 設置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                // 暫時返回一個空結果
                return new RpcResult();
            } 

            // 同步調用
            else {
                RpcContext.getContext().setFuture(null);
                // 發送請求,獲得一個 ResponseFuture 實例,並調用該實例的 get 方法進行等待
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
        }
    }
    
    // 省略其餘方法
}

問題: 通常狀況下,服務消費方會併發調用多個服務,每一個用戶線程發送請求後,會調用不一樣 DefaultFuture 對象的 get 方法進行等待。 一段時間後,服務消費方的線程池會收到多個響應對象。這個時候要考慮一個問題,如何將每一個響應對象傳遞給相應的 DefaultFuture 對象,且不出錯??

看到DubboInvoker.doInvoke中的同步有返回值的一段代碼

return (Result) currentClient.request(inv, timeout).get();

get方法會等待被喚醒同時有返回結果

看到HeaderExchangeChannel#request方法

@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException("is closed!");
    }
    // 建立一個request對象,默認賦值了一個ID
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    //直接返回DefaultFuture
    return future;
}

這裏就直接返回了DefaultFuture ,對應是上面的currentClient.request, get方法就是調用的DefaultFuture .get

channel.send後面的調用會爲每次調用開啓不一樣的線程

在請求時會將請求參數序列化到服務端

服務端接到請求後,會還原Request對象

ExchangeCodec.ExchangeCodec 服務端解碼

// decode request.
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
    req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
    Object data;
    if (req.isHeartbeat()) {
        data = decodeHeartbeatData(channel, in);
    } else if (req.isEvent()) {
        data = decodeEventData(channel, in);
    } else {
        data = decodeRequestData(channel, in);
    }
    req.setData(data);
} catch (Throwable t) {
    // bad request
    req.setBroken(true);
    req.setData(t);
}
return req;

在服務端調用目標方法完畢後會將請求返回的結果和id設置給Reponse對象

HeaderExchangeHandler.handleRequest

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    //給response設置id(該ID是請求時Request中的ID)
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
        Object data = req.getData();

        String msg;
        if (data == null) msg = null;
        else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
        else msg = data.toString();
        res.setErrorMessage("Fail to decode request due to: " + msg);
        res.setStatus(Response.BAD_REQUEST);

        return res;
    }
    // 請求方法的參數
    Object msg = req.getData();
    try {
        // result: 調用目標方法返回的結果
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

最後來到服務端調用完後回到客戶端調用DefaultFuture.received

public static void received(Channel channel, Response response) {
    try {
        // FUTURES保存着每次調用後返回的DefaultFuture對象,key是生成Request生成時的ID
        // 這裏用response.getId()去獲取,由於Request對應的Response有相同的ID
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            // 喚醒對應線程的DefaultFuture對象
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

這裏就喚醒了Response對應的DefaultFuture對象,一個請求的響應就完成了,過程頗爲複雜。

在dubbo官網對服務調用用很是詳細的講解!!

相關文章
相關標籤/搜索