dubbo入門之服務消費

今天,咱們來看看dubbo消費的執行過程
首先,咱們都知道dubbo是一個基於netty實現的RPC框架,底層通訊是使用netty來實現的。在學習dubbo的時候,或許咱們都會有下面的這些疑惑:
一、服務消費者只持有服務接口,咱們的消費端在執行接口請求的時候獲取到的接口實現是什麼?
二、消費者是如何經過netty創建同服務端的通訊的?
三、服務是怎麼註冊到註冊中心的?
四、消費端怎麼拉取服務?
五、服務的負載均衡是如何體現的?
等等這些問題都會困擾着咱們,今天咱們先來聊聊dubbo消費端的實現原理
如今,你可能已經本身經過官網的教程搭建了本身的dubbo demo服務,你在執行demo的時候會發現,服務消費者只持有服務接口,你是經過@Reference註解去獲取的實現,你已經知道spring bean工廠會自動爲用戶建立代理實例,那麼dubbo爲咱們的消費者建立的代理實現是什麼呢?只要開啓idea的調試模式,你就能夠看到咱們獲得的實現實際上是:com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler。該Handler實現了InvocationHandler,InvocationHandler是JDK動態代理實現的核心接口,若是你不瞭解動態代理,那建議你本身去了解一下。
回到正題,咱們經過接口調用的方法都會被該Handler代理,該Handler源碼以下:spring

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

源碼很簡單,只有一個invoke方法,它是代理類和接口之間的橋樑。若是你再細心一點,會發現InvokerInvocationHandler中的Invoker實現類是com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker。它是dubbo invoker的默認實現,裏面封裝了服務降級等功能。到這裏,你基本已經知道消費者到底是怎麼去調用服務的了,後面你只要繼續跟着源碼調試,服務是如何和netty創建聯繫的app

消費者請求調用鏈:

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多個 Filter 調用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

在MockClusterInvoker是一個抽象類,它的默認實現是FailoverClusterInvoker,在MockClusterInvoker中,經過服務目錄Directory列舉服務列表,核心方法invoke以下:負載均衡

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;
    //列舉服務列表
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && !invokers.isEmpty()) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

服務目錄Directory已知的實現有RegistryDirectory和StaticDirectiry,默認使用的是RegistryDirectory,在進行服務調用的時候會從這裏面去獲取可用的服務列表,若是想要了解更多,推薦閱讀dubbo官網服務列表一章,裏面有很是詳細的介紹。
能夠看到,獲取服務列表以後會從系統擴展中加載默認的負載均衡實現,而後繼續往下執行到子類FailoverClusterInvoker的模板方法doInvoke,該方法會從新執行服務列舉並檢查服務的可用性,以後經過負載均衡策略選擇具體服務,dubbo默認負載均衡策略是隨機RandomLoadBalance。關鍵代碼以下:框架

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    // 服務檢查
    checkInvokers(copyinvokers, invocation);
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            // i>0,說明第一次服務調用失敗,須要從新檢查服務列表
            checkWhetherDestroyed();    //若是服務已經銷燬,拋出異常
            copyinvokers = list(invocation);//從新列舉服務
            // check again
            checkInvokers(copyinvokers, invocation);//檢查服務是否爲空
        }
        //負載均衡獲取服務
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);//將獲取的服務添加到已執行列表
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            //服務請求
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + invocation.getMethodName()
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyinvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
           
        }
        // 異常處理部分省略。。。
    }
 
}

經過負載均衡獲取到具體服務後,執行服務調用,到AbstractInvoker的invoke方法,主要設置一些attachment的信息。重點來看看實現類DubboInvoke的doInvoke方法,以下:dom

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    //設置路徑和版本
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    //ExchangeClient
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        //從配置中獲取是否同步執行
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        //是否單向執行
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        //請求超時時間
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            //若是單向執行,發起調用後當即返回一個空RpcResult
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            //若是異步,調用後當即返回一個空的RpcResult
            ResponseFuture future = currentClient.request(inv, timeout);
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            //不然,同步等待
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

ExchangeClient接口的實現是HeaderExchangeClient,request方法很簡單,以下異步

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    return channel.request(request, timeout);
}

就是作了一次請求轉發,channel是ExchangeChannel,默認實現是HeaderExchangeChannel,HeaderExchangeChannel的request方法以下:ide

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");//版本號
    req.setTwoWay(true);//雙向通訊
    req.setData(request);//請求數據
    DefaultFuture future = new DefaultFuture(channel, req, timeout);//Future
    try {
        channel.send(req);//Dubbo封裝的通訊Channel
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

這裏就是構造Request請求體,而後傳遞給NettyChannel,NettyChannel封裝了netty的channel,經過該channel將數據請求寫入到TCP請求中傳遞給服務端。NettyChannel的send方法以下oop

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        //數據請求
        ChannelFuture future = channel.write(message);//此處的channel纔是org.jboss.netty.channel.Channel
        if (sent) {
            //等待請求結果
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

到這裏,關於dubbo消費請求的基本流程已經走完,繼續往下就是netty層面的東西了,有興趣的童鞋能夠自行尋找netty相關教程學習

相關文章
相關標籤/搜索