【Dubbo 源碼解析】04_Dubbo 服務註冊&暴露

Dubbo 服務註冊&暴露

Dubbo 服務暴露過程是經過 com.alibaba.dubbo.config.spring.ServiceBean 來實現的。Spring 容器 refresh() 完成後,會發送 ContextRefreshedEvent,ServiceBean 會接收到這個 event 而後調用 export()。html

Dubbo 服務暴露過程:spring

  1. 獲取 Invoker Invoker<?> invoker = proxyFactory.getInvoker(T proxy, Class<T> type, com.alibaba.dubbo.common.URL url); 這個 Invoker 就是 Provider 接收到服務調用的 Request 以後,最終要調用的 invoker。apache

  2. 經過 Protocol 調用 export() Exporter<?> exporter = protocol.export(com.alibaba.dubbo.rpc.Invoker<T> invoker); 默認會使用 DubboProtocol 作服務暴露,過程當中會啓動 Netty Server 監聽端口。bootstrap

// JavassistProxyFactory.class
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            // 經過 wrapper 類去調用服務提供者的真實方法。(避免使用反射,提升效率)
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

 

 

服務註冊

經過以前 Dubbo Protocol & Filter 的學習,咱們知道這裏的 protocol 是一個 Wrappered Protocol,因此 protocol.export() 方法會先調用 Protocol SPI 擴展中的 wrapper 類的 export() 。 其中,ProtocolFilterWrapper#export(Invoker<T> invoker) 代碼以下:緩存

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 若是 URL 的 protocol 是註冊協議的話,就執行服務註冊流程
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    // 服務暴露過程當中,會添加 group="provider" 的 Filter
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

 

經過在上述 export() 方法上打斷點跟蹤,咱們能夠發現,Dubbo 首先執行的是服務註冊,在服務註冊過程當中,會再次調用 protocol.export(invokerDelegete) 來作服務本地暴露。app

RegistryProtocol#export(Invoker<T> originInvoker) 代碼以下:socket

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    // 會再次調用 protocol.export(invokerDelegete) 來作服務本地暴露
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    URL registryUrl = getRegistryUrl(originInvoker);
    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    //to judge to delay publish whether or not
    boolean register = registedProviderUrl.getParameter("register", true);
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
    if (register) {
        // 服務註冊
        register(registryUrl, registedProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }
    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

 

附: com.alibaba.dubbo.registry.support.AbstractRegistry#AbstractRegistry(URL url) 服務註冊緩存文件tcp

服務暴露

服務本地暴露,最終會調用 DubboProtocol#export(Invoker<T> invoker)ide

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    // export service.
    String key = serviceKey(url);
    // 1. 建立 DubboExporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);
    ......
    // 2. 開啓服務監聽
    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

 

Dubbo 服務暴露時,首先會建立一個 DubboExporter,而後再經過 netty 開啓服務端口監聽。 DubboExporter 的做用是緩存 Invoker,方便後續操做獲取 Invoker。其中最重要的操做就是: Provider 接收到 Request 請求後,獲取到對應的 Invoker,而後執行 Invoker。學習

openServer(url) 最終會調用 createServer(URL url) 來建立 tcp server:

private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    ......
    // 設置 codec = "dubbo"
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 建立 server,並傳遞 requestHandler
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    ......
    return server;
}

 

這裏的 requestHandler 很是重要,它是用來處理 consumer 端的 Request,將 Request 轉化成 Invoker 調用的。

建立 tcp server 的過程:

Exchangers.bind(url, requestHandler) --> Exchanger$Adaptive#bind(URL url, ExchangeHandler handler) --> HeaderExchanger#bind(URL url, ExchangeHandler handler) --> 返回 new HeaderExchangeServer(Server server)

// HeaderExchanger.class
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

 

Transporters.bind(URL url, ChannelHandler... handlers) --> Transporter$Adaptive#bind(URL url, ChannelHandler handler) --> NettyTransporter#bind(URL url, ChannelHandler listener) --> 返回 new NettyServer(url, listener)

編解碼最終使用的是:com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec

NettyServer 開啓服務監聽的代碼:

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
    // 這裏的 this 所指的 handler 就是 Exchangers.bind(url, requestHandler) 傳遞的 handler
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            // 編解碼的 handler
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            // 業務處理的 handler
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

 

 

 

官方如是說:

暴露服務

1. 只暴露服務端口:

在沒有註冊中心,直接暴露提供者的狀況下 [1]ServiceConfig 解析出的 URL 的格式爲:dubbo://service-host/com.foo.FooService?version=1.0.0

基於擴展點自適應機制,經過 URL 的 dubbo:// 協議頭識別,直接調用 DubboProtocolexport() 方法,打開服務端口。

2. 向註冊中心暴露服務:

在有註冊中心,須要註冊提供者地址的狀況下 [2]ServiceConfig 解析出的 URL 的格式爲: registry://registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0")

基於擴展點自適應機制,經過 URL 的 registry:// 協議頭識別,就會調用 RegistryProtocolexport()方法,將 export 參數中的提供者 URL,先註冊到註冊中心。

再從新傳給 Protocol 擴展點進行暴露: dubbo://service-host/com.foo.FooService?version=1.0.0,而後基於擴展點自適應機制,經過提供者 URL 的 dubbo:// 協議頭識別,就會調用 DubboProtocolexport()方法,打開服務端口。

服務提供者暴露一個服務的詳細過程


上圖是服務提供者暴露服務的主過程:

首先 ServiceConfig 類拿到對外提供服務的實際類 ref(如:HelloWorldImpl),而後經過 ProxyFactory 類的 getInvoker 方法使用 ref 生成一個 AbstractProxyInvoker 實例,到這一步就完成具體服務到 Invoker 的轉化。接下來就是 Invoker 轉換到 Exporter 的過程。

Dubbo 處理服務暴露的關鍵就在 Invoker 轉換到 Exporter 的過程,上圖中的紅色部分。下面咱們以 Dubbo 和 RMI 這兩種典型協議的實現來進行說明:

Dubbo 的實現

Dubbo 協議的 Invoker 轉爲 Exporter 發生在 DubboProtocol 類的 export 方法,它主要是打開 socket 偵聽服務,並接收客戶端發來的各類請求,通信細節由 Dubbo 本身實現。

RMI 的實現

RMI 協議的 Invoker 轉爲 Exporter 發生在 RmiProtocol類的 export 方法,它經過 Spring 或 Dubbo 或 JDK 來實現 RMI 服務,通信細節這一塊由 JDK 底層來實現,這就省了很多工做量。

相關文章
相關標籤/搜索