dubbo源碼:provider發佈service服務三

// 代理工廠建立invoker,服務代理,經過該代理進行遠程調用
Invoker<?> invoker = proxyFactory.getInvoker(ref,
		(Class) interfaceClass, registryURL
				.addParameterAndEncoded(
						Constants.EXPORT_KEY,
						url.toFullString()));
// 暴露遠程服務
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);

   接着上一篇,咱們繼續看發佈過程,前面咱們已經看過了經過代理工程建立invoker,接着咱們就看一下遠程服務的暴露,這裏能夠看到經過protocol建立一個exporter而後將其存入list中。遠程服務暴露的實現主要在dubbo-rpc模塊下,根據不一樣的協議有分爲不一樣的子工程。不一樣的協議對Protocol接口都有其對應實現類。其中有一些服務暴露接口並無具體的實現。java

Protocol接口中有4個方法定義getDefaultPort()獲取端口、export()暴露服務、refer()引用服務、destroy()釋放協議,從建立到銷燬可見Protocol維護了遠程調用Invoker的整個生命週期。bootstrap

@SPI("dubbo")
public interface Protocol {
    
    /**
     * 獲取缺省端口,當用戶沒有配置端口時使用。
     * 
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露遠程服務:<br>
     * 1. 協議在接收請求時,應記錄請求來源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必須是冪等的,也就是暴露同一個URL的Invoker兩次,和暴露一次沒有區別。<br>
     * 3. export()傳入的Invoker由框架實現並傳入,協議不須要關心。<br>
     * 
     * @param <T> 服務的類型
     * @param invoker 服務的執行體
     * @return exporter 暴露服務的引用,用於取消暴露
     * @throws RpcException 當暴露服務出錯時拋出,好比端口已佔用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用遠程服務:<br>
     * 1. 當用戶調用refer()所返回的Invoker對象的invoke()方法時,協議需相應執行同URL遠端export()傳入的Invoker對象的invoke()方法。<br>
     * 2. refer()返回的Invoker由協議實現,協議一般須要在此Invoker中發送遠程請求。<br>
     * 3. 當url中有設置check=false時,鏈接失敗不能拋出異常,並內部自動恢復。<br>
     * 
     * @param <T> 服務的類型
     * @param type 服務的類型
     * @param url 遠程服務的URL地址
     * @return invoker 服務的本地代理
     * @throws RpcException 當鏈接服務提供方失敗時拋出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 釋放協議:<br>
     * 1. 取消該協議全部已經暴露和引用的服務。<br>
     * 2. 釋放協議所佔用的全部資源,好比鏈接和端口。<br>
     * 3. 協議在釋放後,依然能暴露和引用新的服務。<br>
     */
    void destroy();

}

Protocol在執行時會先執行wrapper的監聽器和過濾器,這裏執行順序是這樣的緩存

ProtocolListenerWrapper.export()-->ProtocolFilterWrapper.export()-->RegistryProtocol.export()在這裏完成服務註冊到zookeeper上面。app

這裏咱們重點看一下dubboProtocol中export()方法的實現,在該方法中主要是建立了DubboExporter(包含一個invoker)而後把exporter存入map中,而後是打開server的過程。框架

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        
        // export service.
        //拼接key:group1/com.alibaba.dubbo.demo.DemoService:1.0(version):2880(port)
        String key = serviceKey(url);
        //建立exporter,父類中定義的map<String,Exporter> exporterMap
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);//exporter存入map中
        
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){//是否作本地緩存,這裏將stubServiceMethods單獨存儲
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        //打開服務鏈接
        openServer(url);
        
        return exporter;
    }
    
    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也能夠暴露一個只有server能夠調用的服務。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
        	ExchangeServer server = serverMap.get(key);//ExchangeServer存儲在map中,Map<key,交換層服務server>。
        	if (server == null) {
        		serverMap.put(key, createServer(url));
        	} else {
        		//server支持reset,配合override功能使用
        		server.reset(url);
        	}
        }
    }
    //建立信息交換層服務server
    private ExchangeServer createServer(URL url) {
        //默認開啓server關閉時發送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());//url添加channel.readonly.sent屬性true,返回的是一個新url對象
        //默認開啓heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));//url添加heartbeat屬性60000,返回一個新url
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);//獲取server屬性,默認爲netty

        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        //設置編碼解碼協議,默認dubbo
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
        	//綁定url和處理器,返回一個信息交換層HeaderExchangeServer,該信息交換層server裏面有一個NettyServer,
            server = Exchangers.bind(url, requestHandler);
            
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

打開server過程:HeaderExchangeServer(交換層服務) --> NettyServer(傳輸層服務) ,最終跟蹤到NettyServer中咱們能夠看到調用netty打開鏈接的方法。ide

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        //ChannelFactory 是一個建立和管理Channel通道及其相關資源的工廠接口
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

至此爲止,咱們已經將發佈暴露服務的過程看完再看就要去看netty中具體的NIO實現了,有興趣能夠看看netty的源碼。this

相關文章
相關標籤/搜索