dubbo源碼學習(四):暴露服務的過程

dubbo採用的nio異步的通訊,通訊協議默認爲 netty,固然也能夠選擇 mina,grizzy。在服務端(provider)在啓動時主要是開啓netty監聽,在zookeeper上註冊服務節點,處理消費者請求,返回處理後的消息給消費者,消費者使用服務時主要是訂閱服務的節點,監聽zookeeper節點目錄,服務端的變化時zookeeper會推送給消費者,消費者從新緩存服務地址等。服務者、消費者、zookeeper三者之間都是長鏈接。vue

 

下面看dubbo源碼來看服務暴露的過程,服務暴露的入口爲:com.alibaba.dubbo.config.ServiceConfig#export 方法,代碼以下:redis

 

//是否延時暴露  spring

        if (delay != null && delay > 0) {  bootstrap

            Thread thread = new Thread(new Runnable() {  緩存

                public void run() {  服務器

                    try {  網絡

                        Thread.sleep(delay);  app

                    } catch (Throwable e) {  異步

                    }  tcp

                    doExport();  

                }  

            });  

            thread.setDaemon(true);  

            thread.setName("DelayExportServiceThread");  

            thread.start();  

        } else {  

            //不延時暴露,則直接暴露  

            doExport();  

        }  

 上在代碼不管是延時暴露或直接暴露調用的方法是:doExport(),doExport會對解析完的配置再作一次檢查,核心代碼你們能夠查看dubbo的源碼,下面列出一小部分

/* 

            檢查默認設置,若是xml中沒有配置<dubbo:provider 

            主要是從系統環境變量中尋找是否有相應的provider的配置 

*/  

        checkDefault();  

        //下面設置的內容若是沒有配置<dubbo:provider時基本上都是Null  

        if (provider != null) {  

            if (application == null) {  

                application = provider.getApplication();  

            }  

            if (module == null) {  

                module = provider.getModule();  

            }  

            if (registries == null) {  

                registries = provider.getRegistries();  

            }  

            if (monitor == null) {  

                monitor = provider.getMonitor();  

            }  

            if (protocols == null) {  

                protocols = provider.getProtocols();  

            }  

        }  

        if (module != null) {  

            //registries通常都會配置  

            if (registries == null) {  

                registries = module.getRegistries();  

            }  

            if (monitor == null) {  

                monitor = module.getMonitor();  

            }  

        }  

        if (application != null) {  

            //application通常也會配置  

            if (registries == null) {  

                registries = application.getRegistries();  

            }  

            if (monitor == null) {  

                monitor = application.getMonitor();  

            }  

        }  

        //是否泛化調用  

        if (ref instanceof GenericService) {  

            interfaceClass = GenericService.class;  

            if (StringUtils.isEmpty(generic)) {  

                generic = Boolean.TRUE.toString();  

            }  

        } else {  

            try {  

                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()  

                        .getContextClassLoader());  

            } catch (ClassNotFoundException e) {  

                throw new IllegalStateException(e.getMessage(), e);  

            }  

            /* 

                檢查即將暴露的接口的方法配置,檢查方法是否在接口中存在 

                通常不會配置因此通常狀況下methods爲null 

                <dubbo:service  > <dubbo:method /> </dubbo:serivce> 

             */  

            checkInterfaceAndMethods(interfaceClass, methods);  

            /* 

                檢查接口的引用不爲空,而且必須實現的是要暴露的接口 

             */  

            checkRef();  

            generic = Boolean.FALSE.toString();  

        }  

 

全部的檢查經過以後,會調用 :com.alibaba.dubbo.config.ServiceConfig#doExportUrls 

/* 

            將註冊協議轉化成url 

            registry://45.119.68.23:2181/com.alibaba.dubbo.registry.RegistryService? 

            application=test-dubbo&dubbo=2.5.3&pid=7648&registry=zookeeper×tamp=1462349748801 

         */  

        List<URL> registryURLs = loadRegistries(true);  

        //配置多通訊協議時,都進行暴露  

        for (ProtocolConfig protocolConfig : protocols) {  

            doExportUrlsFor1Protocol(protocolConfig, registryURLs);  

        }  

 doExportUrlsFor1Protocol中主要將全部的配置轉化成map,而後將map轉化成dubbo的統一URL,最終暴露的dubbo服務也就是這個統一的url,這個url也會註冊到zookeeper的節點上,部分代碼以下:

/* 

    將不爲null的配置對象中的屬性設置到 map 中 

    即將 xml 配置文件中的配置設置的值全轉化成爲map 

    {side=provider, application=alijk-dubbo, accepts=1000, 

        dubbo=2.5.3, threads=100, pid=7236, interface=cn.eoncloud.account.sdk.export.AccountService, 

        threadpool=fixed, version=1.0.0, timeout=500, anyhost=true, timestamp=1462347843960} 

 */  

appendParameters(map, application);  

appendParameters(map, module);  

appendParameters(map, provider, Constants.DEFAULT_KEY);  

appendParameters(map, protocolConfig);  

appendParameters(map, this);  

...... 

/* 

    將配置信息轉化成 url ,主要根據以前map裏的數據組裝成url 

    調用 URL#buildString方法 

    dubbo://10.6.13.137:9998/cn.eoncloud.account.sdk.export.AccountService 

    ?accepts=1000&anyhost=true&application=test-dubbo&dubbo=2.5.3 

    &interface=cn.eoncloud.account.sdk.export.AccountService 

    &methods=getAccountName,getAllTest&pid=7236&revision=1.0.0&side=provider 

    &threadpool=fixed&threads=100&timeout=500×tamp=1462347843960&version=1.0.0 

 */  

URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);  

  

if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)  

        .hasExtension(url.getProtocol())) {  

    url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)  

            .getExtension(url.getProtocol()).getConfigurator(url).configure(url);  

}  

......  

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));  

//com.alibaba.dubbo.registry.integration.RegistryProtocol#export 即將進行暴露  

Exporter<?> exporter = protocol.export(invoker);  

上面的代碼核心暴露的一行代碼爲:protocol.export(invoker); 這個protocol的值爲:RegistryProtocol,也就是暴露會跳到:RegistryProtocol.exprot中去處理,RegistryProtocol.exprot主要作兩件事情:

一、開啓netty服務端 。

二、建立zookeeper服務節點。

下面來看RegistryProtocol.export方法,代碼以下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {  

        //export invoker doLocalExport調用dubboProtocol.export開啓netty服務監聽  

        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);  

        //registry provider  

        final Registry registry = getRegistry(originInvoker);  

        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);  

        //調用zodoRegister的doRegister 建立zookeeper的服務節點  

        registry.register(registedProviderUrl);  

        // 訂閱override數據  

        // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。  

        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);  

        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);  

        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);  

        //訂閱  

        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);  

        //保證每次export都返回一個新的exporter實例  

        return new Exporter<T>() {  

            public Invoker<T> getInvoker() {  

                return exporter.getInvoker();  

            }  

            public void unexport() {  

                try {  

                    exporter.unexport();  

                } catch (Throwable t) {  

                    logger.warn(t.getMessage(), t);  

                }  

                try {  

                    registry.unregister(registedProviderUrl);  

                } catch (Throwable t) {  

                    logger.warn(t.getMessage(), t);  

                }  

                try {  

                    overrideListeners.remove(overrideSubscribeUrl);  

                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);  

                } catch (Throwable t) {  

                    logger.warn(t.getMessage(), t);  

                }  

            }  

        };  

    }  

上面的代碼裏有一段特別重要,關鍵性的代碼在doLocalExport中:

final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));  

//此處protol爲dubboProtocol  

exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);  

從上面的代碼中能夠看到會調用dubboProtocol的export對服務進行暴露,這個export最終目的就是開啓netty的監聽,下面來看dubbo是如何一步一步開啓netty的

private void openServer(URL url) {  

       // find server. ip:port  

       String key = url.getAddress();  

       //client 也能夠暴露一個只有server能夠調用的服務。  

       boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);  

       if (isServer) {  

        ExchangeServer server = serverMap.get(key);  

        if (server == null) {  

               //建立 Server  

            serverMap.put(key, createServer(url));  

        } else {  

            //server支持reset,配合override功能使用  

            server.reset(url);  

        }  

       }  

   }  

     

   private ExchangeServer createServer(URL url) {  

       //默認開啓server關閉時發送readonly事件  

       url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());  

       //默認開啓heartbeat  

       url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));  

       //默認使用netty  

       String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);  

  

       if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))  

           throw new RpcException("Unsupported server type: " + str + ", url: " + url);  

       //默認使用dubbo協議編碼  

       url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);  

       ExchangeServer server;  

       try {  

           //HeaderExchangeServer 在此處已經開啓了Netty Server 進行監聽  

           server = Exchangers.bind(url, requestHandler);  

       } catch (RemotingException e) {  

           throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);  

       }  

       str = url.getParameter(Constants.CLIENT_KEY);  

       if (str != null && str.length() > 0) {  

           Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();  

           if (!supportedTypes.contains(str)) {  

               throw new RpcException("Unsupported client type: " + str);  

           }  

       }  

       return server;  

   }  

在上面的代碼中:Exchangers.bind(url, requestHandler)  默認爲:HeaderExchanger.bind()

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {  

        //Transporters默認爲NettyTransporter  

        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));  

    }  

代碼運行到這裏能夠看到傳輸方式了,dubbo默認採用的通訊方式爲 NettyTransporter ,再來看NettyTransporter.bind方法

public static final String NAME = "netty";  

      

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {  

        return new NettyServer(url, listener);  

    }  

 

已經能看到NettyServer了,dubbo在暴露服務最終開啓的netty服務監聽,監聽消費者發送的請求,經過反射調用方法獲得結果經過 tcp/ip 網絡傳輸返回給消費者。再進入到NettyServer中咱們就能看到很是傳統的開啓Netty服務的代碼了

protected void doOpen() throws Throwable {  

        NettyHelper.setNettyLoggerFactory();  

        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));  

        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));  

        //最後一個參數爲 NIO 最大工做線程數  

        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));  

        //netty server 啓動器  

        bootstrap = new ServerBootstrap(channelFactory);  

          

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);  

        channels = nettyHandler.getChannels();  

        // https://issues.jboss.org/browse/NETTY-365  

        // https://issues.jboss.org/browse/NETTY-379  

        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));  

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  

            public ChannelPipeline getPipeline() {  

                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);  

                ChannelPipeline pipeline = Channels.pipeline();  

                /*int idleTimeout = getIdleTimeout();  

                if (idleTimeout > 10000) {  

                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));  

                }*/  

                pipeline.addLast("decoder", adapter.getDecoder());  

                pipeline.addLast("encoder", adapter.getEncoder());  

                pipeline.addLast("handler", nettyHandler);  

                return pipeline;  

            }  

        });  

        // 建立一個綁定到指定地址的新通道,也就是綁定IP、端口供客戶端鏈接  

        channel = bootstrap.bind(getBindAddress());  

    }  

上面的代碼執行完成後,netty的服務端就已經開啓了,能夠接收客戶端的鏈接了,但客戶端鏈接上來要怎麼處理呢?消息接收、發送怎麼處理呢?全部的處理都在上面代碼的 NettyHandler類中,Nettyhandler繼承了Netty包中的的SimpleChannelHandler

 

NettyHandler extends SimpleChannelHandler   

重寫了 channelConnected、channelDisconnected、messageReceived等方法,而咱們比較關注的多是messagereceived方法,在收到消息時如何處理,但今天暫時先不看dubbo若是處理消息,只看暴露,消息處理如何實現異步通訊下一節再講。

 

/**  

     * 收到消息時觸發  

     * @param ctx  

     * @param e  

     * @throws Exception  

     */  

    @Override  

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {  

        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);  

        try {  

            handler.received(channel, e.getMessage());  

        } finally {  

            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());  

        }  

    }  

從前面知道,開啓netty服務是在RegistryProtocol.export 的 doLocalExport 中,在開啓了netty服務後,就是在zookeeper上註冊服務節點了,消費者在消費服務時會根據消費的接口名找到對應的zookeeper節點目錄,對目錄進行監聽,接收推送

 

//registry provider  

final Registry registry = getRegistry(originInvoker);  

final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);  

//調用zodoRegister的doRegister 建立zookeeper的服務節點  

registry.register(registedProviderUrl);  

// 訂閱override數據  

// FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。  

final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);  

final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);  

overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);  

//訂閱  

registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);  

dubbo服務在zookeeper上的節點註冊是:com.alibaba.dubbo.registry.support.FailbackRegistry#register

 

@Override  

    public void register(URL url) {  

        super.register(url);  

        failedRegistered.remove(url);  

        failedUnregistered.remove(url);  

        try {  

            // 向服務器端發送註冊請求  

            doRegister(url);  

由於doRegister是一個抽象的方法,查看他的實現能夠看到:

 

從上圖能夠看到doRegister實現有 dubbo、redis、zookeeper,這也是在咱們配置時常常看到的 註冊協議的配置 ,最爲經常使用的就是 zookeeper了,因此再看ZookeeperRegistry的代碼,看他的doRegistry幹什麼了以下

protected void doRegister(URL url) {  

       try {  

        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));  

       } catch (Throwable e) {  

           throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);  

       }  

   }  

其實從上面已經能夠看到 在zookeeper上面建立 節點了,默認不分組的狀況下,服務結構以下:/dubbo/XXXXservice/consumers、providers

至此,dubbo的暴露基本上已經完成,開啓了netty服務,註冊了zookeeper的節點,就等着消費者鏈接上來使用了。下一節將介紹dubbo的消息發送和接收,NIO異步通信的實現。

 

關注獲取 spring 視頻  vue視頻

相關文章
相關標籤/搜索