Dubbo系列之 (七)網絡層那些事(2)

輔助連接

Dubbo系列之 (一)SPI擴展

Dubbo系列之 (二)Registry註冊中心-註冊(1)

Dubbo系列之 (三)Registry註冊中心-註冊(2)

Dubbo系列之 (四)服務訂閱(1)

Dubbo系列之 (五)服務訂閱(2)

Dubbo系列之 (六)服務訂閱(3)

Dubbo系列之 (七)鏈路層那些事(1)

Dubbo系列之 (七)鏈路層那些事(2)

讓咱們以本身編寫的TCP的思想,來看dubbo的網絡層。

一、網絡層結構圖

Netty,讓咱們的編寫TCP變的很是簡單,而且它在業界運用極其普遍。Dubbo底層默認實現也是經過Netty。
org.apache.dubbo.remoting.transport.netty4.NettyServer就是其默認實現方式。它的類關係以下:
html

從上面的類圖,咱們能夠知道其設計的理念。apache

  1. 網絡通訊,基本都是點對點通訊,每一個通訊端能夠稱爲一個終端,記名爲Endpont。它具備獲取本地地址getLocalAddress(),發送消息send(),關閉服務close(),而且能夠獲取獲得處理消息的句柄getChannelHandler() 等基本功能。
    2)通常進行網絡通訊,都認爲是一個遠程服務器,它具備不少個客戶端與其通訊,因此持有獲取全部通訊通道的getChannels()方法,判斷雙端是否能夠通訊isBound()方法等。而且它實現Resetable,IdleSensible 接口,說明遠程服務端能夠有重置功能,也能夠處於空閒。它一定是一個通訊終端,因此繼承Endpoint,記名爲RemotingServer。
  2. 從網絡通訊的雙端的角度來說,客戶端和服務端是一對對等端,記名爲AbstractPeer,它具備通道信息處理能力,因此實現ChannelHandler。ChannelHandler是用來處理通訊鏈路上的消息處理器。
    4)爲了讓程序更具備通用性,抽取爲AbstractServer,子類實現其doOpen()和doClose()方法。
    5)NettyServer 就是其網絡通訊客戶端的一個具體的實現。
    6)相應的客戶端的示意圖以下:

二、NettyServer 內部持有對象

1)Map<String, Channel> channels 持有與該服務端通訊的TCP通訊Channel,key爲host:port。它是NettyServerHandler內部channels的引用,NettyServerHandler就是Netty處理消息的句柄,由咱們本身編寫。
2)io.netty.channel.Channel channel netty監聽網絡鏈接的channel,由bind()方法返回
3)內部監聽線程爲1個,線程名爲NettyServerBoss。
4)內部IO線程爲可用處理器+1,最多爲32個線程,線程名爲NettyServerWorker。
5)NettyServerHandler 爲dubbo的消息處理器,編解碼器對消息編解碼後,就是扔給它處理。它內部持有一個ChannelHandler,便是NettyServer這個類,從類圖上可知NettyServer實現了ChannelHandler 這個接口。
六、NettyServer 持有一個外部傳進來的ChannelHandler。並會對其進行封裝,截圖以下(截圖比較清晰):

6)NettyCodecAdapter,爲dubbo內部的編解碼器,內部持有編碼器InternalEncoder,解碼器爲InternalDecoder。
7)Dubbo 的業務線程名爲DubboServerHandler。以後再日誌上看到這個名字不要在陌生了。api

三、直接new NettyServer?

答案,確定不是,直接在框架上new,以後確定沒有擴展性可言。沒事情,咱們包裹一層便可,命名爲傳輸層Transporter,它就有bind,connect功能。
NettyTransporter就是其具體的實現之一。服務器

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }
}

四、Transporter 是一個SPI擴展,它的工具類Transporters

既然是一個SPI擴展,必然須要經過ExtensionLoader.getExtensionLoader()進行加載,Transporters其工具類的靜態方法getTransporter()就是經過ExtensionLoader進行加載的,而後其靜態方法Transporters.bind()來獲取RemotingServer,即NettyServer。網絡

public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

值得注意的是,能夠爲其添加多個ChannelHandler,若是爲多個,則對其封裝爲ChannelHandlerDispatcher,而後傳遞給NettyServer。框架

五、Transporters#bind()傳入的ChannelHandler究竟是誰?

先引入一個名詞Exchanger,我稱爲信息交換回執器,它實際上是對Transporter的封裝。Exchanger 一樣具備bind()和connect()方法。
Exchanger的bind()和connect()接收一個ExchangeHandler的消息處理句柄,ExchangeHandler 也是一個ChannelHandler,也是用來處理通道消息的,但它對其進行的加強,有一個回執的方法reply()。即接收一個消息後,能夠進行回執,經過reply()。dom

六、Exchanger 是一個SPI擴展,它的工具類Exchangers

和Transporters同樣,工具類Exchangers一樣的方式經過ExtensionLoader.getExtensionLoader()來獲取特定的Exchanger,默認爲HeaderExchanger。HeaderExchanger#bind()和connect()。

HeaderExchanger是默認的信息交換回執器。能夠看到HeaderExchangeServer 和HeaderExchangeClient分別接收RemoteServer 和Client。而且經過Transporters調用傳入。ExchangeHandler會被包裹成HeaderExchangeHandler,接着在被包裹爲DecodeHandler。
從這裏咱們就能夠解答第五個問題,Transporters#bind()傳入的ChannelHandler 實際上是DecodeHandler。爲啥這裏須要DecodeHandler,實際上是服務端或者客戶端在收到對等端的消息字節數據後,首先解析的是頭部信息,body信息是沒有在netty的編解碼中進行解析的,是到了真正處理消息的時候,經過Decodehandler#received()內部進行解碼的。tcp

七、Exchangers的bind()和connect()重載方法太多?

實際上,框架真正用到的是以下:

ide

其餘最終都是會調到這個方法,有時間的同窗能夠本身調用下其餘的重載方法。工具

八、Exchangers#bind()和connect()的ExchangeHandler 最終是什麼?

這個問題請查看DubboProtocol的成員變量ExchangeHandler。接下來,咱們來分析這個ExchangeHandler 到底作了什麼。
DubboProtocol內部new 了一個ExchangeHandlerAdapter 對象,也就是ExchangeHandler。該handler主要處理已經Invocation類型的消息。
首先,看下received()方法。

@Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);

            } else {
                super.received(channel, message);
            }
        }

若是消息的類型是Invocation,那麼調用reply方法進行消息應答,若是不是調用超類,也就是
ExchangeHandlerAdapter的received方法,該方法是空的,因此即會丟棄該消息內容。
那咱們來看下,relpy方法主要乾了什麼?註釋以下代碼塊。

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            /**
             *
             * 若是消息類型不是Invocation,那麼會拋出異常,通常狀況下不會,在received方法上已經進行的消息類型判斷。
             */
            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;

            //這裏獲得的就是服務Invoker根據inv。
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            // 看是否是dubbo 回調,以後看下回調內容
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            //調用,這裏就是服務調用,這裏會牽扯到dubbo filter 相關內容,在服務調用時具體堅定
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        }

從上面的可知道,reply主要就是進行服務的調用,核心語句就是 invoker.invoke(inv)。
那麼是如何找到這個服務提供者的服務呢。來看下getInvoker()。

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = (String) inv.getObjectAttachments().get(PATH_KEY);

        // if it's callback service on client side
        //若是該調用是在客服端,可能會有配置Stub類,經過isStubServiceInvoke標註
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY));
        if (isStubServiceInvoke) {
            port = channel.getRemoteAddress().getPort();
        }

        //callback
        // 查看是不是本地回調
        isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
            path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY);
            inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
        }
        
        // 構建serviceKey,經過端口port,路徑(通常是接口的全限定名)path,版本號version,分組group
        String serviceKey = serviceKey(
                port,
                path,
                (String) inv.getObjectAttachments().get(VERSION_KEY),
                (String) inv.getObjectAttachments().get(GROUP_KEY)
        );
        
        //全部的服務導出都會放在exporterMap對象裏,而後根據key獲取獲得DubboExporter
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                    ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));
        }
        // 接着返回Invoker。
        return exporter.getInvoker();
    }

當服務調用方與服務提供方創建鏈接和斷開鏈接時,代碼以下:

@Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isDebugEnabled()) {
                logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            invoke(channel, ON_DISCONNECT_KEY);
        }

都是進行調用invoke方法。那麼invoke方法主要乾了啥?

private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        /**
         * FIXME channel.getUrl() always binds to a fixed service, and this service is random.
         * we can choose to use a common service to carry onConnect event if there's no easy way to get the specific
         * service this connection is binding to.
         * @param channel
         * @param url
         * @param methodKey
         * @return
         */
        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }

            RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), new Class<?>[0], new Object[0]);
            invocation.setAttachment(PATH_KEY, url.getPath());
            invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
            invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
            invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
            if (url.getParameter(STUB_EVENT_KEY, false)) {
                invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
            }

            return invocation;
        }

九、最後

這一篇文章距離上一次已經好久了,主要是遇到了國慶,本身想放鬆一下,接下來還會繼續努力的分析dubbo的一些內容。

相關文章
相關標籤/搜索