Dubbo 服務暴露過程是經過 com.alibaba.dubbo.config.spring.ServiceBean
來實現的。Spring 容器 refresh() 完成後,會發送 ContextRefreshedEvent,ServiceBean 會接收到這個 event 而後調用 export()。html
Dubbo 服務暴露過程:spring
apache
經過 Protocol 調用 export() Exporter<?> exporter = protocol.export(com.alibaba.dubbo.rpc.Invoker<T> invoker); 默認會使用 DubboProtocol 作服務暴露,過程當中會啓動 Netty Server 監聽端口。bootstrap
// JavassistProxyFactory.class public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 經過 wrapper 類去調用服務提供者的真實方法。(避免使用反射,提升效率) return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
經過以前 Dubbo Protocol & Filter 的學習,咱們知道這裏的 protocol 是一個 Wrappered Protocol,因此 protocol.export() 方法會先調用 Protocol SPI 擴展中的 wrapper 類的 export() 。 其中,ProtocolFilterWrapper#export(Invoker<T> invoker) 代碼以下:緩存
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 若是 URL 的 protocol 是註冊協議的話,就執行服務註冊流程 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 服務暴露過程當中,會添加 group="provider" 的 Filter return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
經過在上述 export() 方法上打斷點跟蹤,咱們能夠發現,Dubbo 首先執行的是服務註冊,在服務註冊過程當中,會再次調用 protocol.export(invokerDelegete) 來作服務本地暴露。app
RegistryProtocol#export(Invoker<T> originInvoker) 代碼以下:socket
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker // 會再次調用 protocol.export(invokerDelegete) 來作服務本地暴露 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registedProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if (register) { // 服務註冊 register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
附: com.alibaba.dubbo.registry.support.AbstractRegistry#AbstractRegistry(URL url) 服務註冊緩存文件tcp
服務本地暴露,最終會調用 DubboProtocol#export(Invoker<T> invoker)ide
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); // 1. 建立 DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); ...... // 2. 開啓服務監聽 openServer(url); optimizeSerialization(url); return exporter; }
Dubbo 服務暴露時,首先會建立一個 DubboExporter,而後再經過 netty 開啓服務端口監聽。 DubboExporter 的做用是緩存 Invoker,方便後續操做獲取 Invoker。其中最重要的操做就是: Provider 接收到 Request 請求後,獲取到對應的 Invoker,而後執行 Invoker。學習
openServer(url) 最終會調用 createServer(URL url) 來建立 tcp server:
private ExchangeServer createServer(URL url) { // send readonly event when server closes, it's enabled by default url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // enable heartbeat by default url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); ...... // 設置 codec = "dubbo" url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 建立 server,並傳遞 requestHandler server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } ...... return server; }
這裏的 requestHandler
很是重要,它是用來處理 consumer 端的 Request,將 Request 轉化成 Invoker 調用的。
建立 tcp server 的過程:
Exchangers.bind(url, requestHandler) --> Exchanger$Adaptive#bind(URL url, ExchangeHandler handler) --> HeaderExchanger#bind(URL url, ExchangeHandler handler) --> 返回 new HeaderExchangeServer(Server server)
// HeaderExchanger.class public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
Transporters.bind(URL url, ChannelHandler... handlers) --> Transporter$Adaptive#bind(URL url, ChannelHandler handler) --> NettyTransporter#bind(URL url, ChannelHandler listener) --> 返回 new NettyServer(url, listener)
編解碼最終使用的是:com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec
NettyServer 開啓服務監聽的代碼:
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); // 這裏的 this 所指的 handler 就是 Exchangers.bind(url, requestHandler) 傳遞的 handler final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); // 編解碼的 handler pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); // 業務處理的 handler pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
在沒有註冊中心,直接暴露提供者的狀況下 [1],ServiceConfig
解析出的 URL 的格式爲:dubbo://service-host/com.foo.FooService?version=1.0.0
。
基於擴展點自適應機制,經過 URL 的 dubbo://
協議頭識別,直接調用 DubboProtocol
的 export()
方法,打開服務端口。
在有註冊中心,須要註冊提供者地址的狀況下 [2],ServiceConfig
解析出的 URL 的格式爲: registry://registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0")
,
基於擴展點自適應機制,經過 URL 的 registry://
協議頭識別,就會調用 RegistryProtocol
的 export()
方法,將 export
參數中的提供者 URL,先註冊到註冊中心。
再從新傳給 Protocol
擴展點進行暴露: dubbo://service-host/com.foo.FooService?version=1.0.0
,而後基於擴展點自適應機制,經過提供者 URL 的 dubbo://
協議頭識別,就會調用 DubboProtocol
的 export()
方法,打開服務端口。
上圖是服務提供者暴露服務的主過程:
首先 ServiceConfig
類拿到對外提供服務的實際類 ref(如:HelloWorldImpl),而後經過 ProxyFactory
類的 getInvoker
方法使用 ref 生成一個 AbstractProxyInvoker
實例,到這一步就完成具體服務到 Invoker
的轉化。接下來就是 Invoker
轉換到 Exporter
的過程。
Dubbo 處理服務暴露的關鍵就在 Invoker
轉換到 Exporter
的過程,上圖中的紅色部分。下面咱們以 Dubbo 和 RMI 這兩種典型協議的實現來進行說明:
Dubbo 協議的 Invoker
轉爲 Exporter
發生在 DubboProtocol
類的 export
方法,它主要是打開 socket 偵聽服務,並接收客戶端發來的各類請求,通信細節由 Dubbo 本身實現。
RMI 協議的 Invoker
轉爲 Exporter
發生在 RmiProtocol
類的 export