源碼分析Dubbo服務提供者啓動流程-下篇

本文繼續上文Dubbo服務提供者啓動流程,在上篇文章中詳細梳理了基於dubbo spring文件的配置方式,Dubbo是如何加載配置文件,服務提供者dubbo:service標籤服務暴露全流程,本節重點關注RegistryProtocol#export中調用doLocalExport方法,其實主要是根據各自協議,服務提供者創建網絡服務器,在特定端口創建監聽,監聽來自消息消費端服務的請求。spring

RegistryProtocol#doLocalExport:bootstrap

private <t> ExporterChangeableWrapper<t> doLocalExport(final Invoker<t> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<t> exporter = (ExporterChangeableWrapper<t>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<t>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<!--?--> invokerDelegete = new InvokerDelegete<t>(originInvoker, getProviderUrl(originInvoker));   // @1
                    exporter = new ExporterChangeableWrapper<t>((Exporter<t>) protocol.export(invokerDelegete), originInvoker);    // @2
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

代碼@1:若是服務提供者以dubbo協議暴露服務,getProviderUrl(originInvoker)返回的URL將以dubbo://開頭。 代碼@2:根據Dubbo內置的SPI機制,將調用DubboProtocol#export方法。緩存

一、源碼分析DubboProtocol#export安全

public <t> Exporter<t> export(Invoker<t> invoker) throws RpcException {
        URL url = invoker.getUrl();     // @1
        // export service.
        String key = serviceKey(url);      // @2
        DubboExporter<t> exporter = new DubboExporter<t>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);    //@3  start
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);                                                  
        if (isStubSupportEvent &amp;&amp; !isCallbackservice) {                                                                                                                        
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);                                                                      
            }
        }   // @3 end

        openServer(url);   // @4
        optimizeSerialization(url);  // @5
        return exporter;                
    }

代碼@1:獲取服務提供者URL,以協議名稱,這裏是dubbo://開頭。 代碼@2:從服務提供者URL中獲取服務名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880。 代碼@3:是否將轉發事件導出成stub。 代碼@4:根據url打開服務,下面將詳細分析其實現。 代碼@5:根據url優化器序列化方式。服務器

二、源碼分析DubboProtocol#openServer網絡

private void openServer(URL url) {
        // find server.
        String key = url.getAddress();    // @1
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);           // @2
            if (server == null) {
                serverMap.put(key, createServer(url));                    //@3
            } else {
                // server supports reset, use together with override
                server.reset(url);                                                       //@4
            }
        }
    }

代碼@1:根據url獲取網絡地址:ip:port,例如:192.168.56.1:20880,服務提供者IP與暴露服務端口號。 代碼@2:根據key從服務器緩存中獲取,若是存在,則執行代碼@4,若是不存在,則執行代碼@3. 代碼@3:根據URL建立一服務器,Dubbo服務提供者服務器實現類爲ExchangeServer。 代碼@4:若是服務器已經存在,用當前URL重置服務器,這個不難理解,由於一個Dubbo服務中,會存在多個dubbo:service標籤,這些標籤都會在服務檯提供者的同一個IP地址、端口號上暴露服務。架構

2.1 源碼分析DubboProtocol#createServer併發

private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());    // @1
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));     // @2
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);  // @3

        if (str != null &amp;&amp; str.length() &gt; 0 &amp;&amp; !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))    // @4
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);       // @5
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);    // @6
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);     //@7
        if (str != null &amp;&amp; str.length() &gt; 0) {
            Set<string> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

代碼@1:爲服務提供者url增長channel.readonly.sent屬性,默認爲true,表示在發送請求時,是否等待將字節寫入socket後再返回,默認爲true。 代碼@2:爲服務提供者url增長heartbeat屬性,表示心跳間隔時間,默認爲60*1000,表示60s。 代碼@3:爲服務提供者url增長server屬性,可選值爲netty,mina等等,默認爲netty。 代碼@4:根據SPI機制,判斷server屬性是否支持。 代碼@5:爲服務提供者url增長codec屬性,默認值爲dubbo,協議編碼方式。 代碼@6:根據服務提供者URI,服務提供者命令請求處理器requestHandler構建ExchangeServer實例。requestHandler的實現具體在之後詳細分析Dubbo服務調用時再詳細分析。 代碼@7:驗證客戶端類型是否可用。app

2.1.1 源碼分析Exchangers.bind 根據URL、ExchangeHandler構建服務器socket

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

上述代碼不難看出,首先根據url獲取Exchanger實例,而後調用bind方法構建ExchangeServer,Exchanger接口以下 這裏寫圖片描述

  • ExchangeServer bind(URL url, ExchangeHandler handler) : 服務提供者調用。
  • ExchangeClient connect(URL url, ExchangeHandler handler):服務消費者調用。

dubbo提供的實現類爲:HeaderExchanger,其bind方法以下:

HeaderExchanger#bind

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

從這裏能夠看出,端口的綁定由Transporters的bind方法實現。

2.1.2 源碼分析Transporters.bind方法

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

從這裏得知,Dubbo網絡傳輸的接口有Transporter接口實現,其繼承類圖所示: 這裏寫圖片描述 本文以netty版原本查看一下Transporter實現。

NettyTransporter源碼以下:

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}

NettyServer創建網絡鏈接的實現方法爲:

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);      // @1
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout &gt; 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);     // @2
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

熟悉本方法須要具有Netty的知識,有關源碼:閱讀Netty系列文章,這裏不對每一行代碼進行解讀,對於與網絡相關的參數,將在後續文章中詳細講解,本方法@一、@2引發了個人注意,首先建立NettyServer必須傳入一個服務提供者URL,但從DubboProtocol#createServer中能夠看出,Server是基於網絡套接字(ip:port)緩存的,一個JVM應用中,必然會存在多個dubbo:server標籤,就會有多個URL,這裏爲何能夠這樣作呢?從DubboProtocol#createServer中能夠看出,在解析第二個dubbo:service標籤時並不會調用createServer,而是會調用Server#reset方法,是否是這個方法有什麼魔法,在reset方法時能將URL也註冊到Server上,那接下來分析NettyServer#reset方法是如何實現的。

2.2源碼分析DdubboProtocol#reset reset方法最終將用Server的reset方法,一樣仍是以netty版本的NettyServer爲例,查看reset方法的實現原理。NettyServer#reset--->父類(AbstractServer)

AbstractServer#reset

public void reset(URL url) {
        if (url == null) {
            return;
        }
        try {                                                                                                       // @1 start
            if (url.hasParameter(Constants.ACCEPTS_KEY)) {
                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
                if (a &gt; 0) {
                    this.accepts = a;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
                if (t &gt; 0) {
                    this.idleTimeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.THREADS_KEY)
                    &amp;&amp; executor instanceof ThreadPoolExecutor &amp;&amp; !executor.isShutdown()) {
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                int threads = url.getParameter(Constants.THREADS_KEY, 0);
                int max = threadPoolExecutor.getMaximumPoolSize();
                int core = threadPoolExecutor.getCorePoolSize();
                if (threads &gt; 0 &amp;&amp; (threads != max || threads != core)) {
                    if (threads &lt; core) {
                        threadPoolExecutor.setCorePoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setMaximumPoolSize(threads);
                        }
                    } else {
                        threadPoolExecutor.setMaximumPoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setCorePoolSize(threads);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }              // @1 end
        super.setUrl(getUrl().addParameters(url.getParameters()));    // @2
    }

代碼@1:首先是調整線程池的相關線程數量,這個好理解。、 代碼@2:而後設置調用setUrl覆蓋原先NettyServer的private volatile URL url的屬性,那爲何不會影響原先註冊的dubbo:server呢? 原來NettyHandler上加了註解:@Sharable,由該註解去實現線程安全。

Dubbo服務提供者啓動流程將分析到這裏了,本文並未對網絡細節進行詳細分析,旨在梳理出啓動流程,有關Dubbo服務網絡實現原理將在後續章節中詳細分析,敬請期待。


做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。

</string></t></t></t></t></t></t></t></t></t></t></t></t></t></t>

相關文章
相關標籤/搜索