本文繼續上文Dubbo服務提供者啓動流程,在上篇文章中詳細梳理了基於dubbo spring文件的配置方式,Dubbo是如何加載配置文件,服務提供者dubbo:service標籤服務暴露全流程,本節重點關注RegistryProtocol#export中調用doLocalExport方法,其實主要是根據各自協議,服務提供者創建網絡服務器,在特定端口創建監聽,監聽來自消息消費端服務的請求。spring
RegistryProtocol#doLocalExport:bootstrap
private <t> ExporterChangeableWrapper<t> doLocalExport(final Invoker<t> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<t> exporter = (ExporterChangeableWrapper<t>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<t>) bounds.get(key); if (exporter == null) { final Invoker<!--?--> invokerDelegete = new InvokerDelegete<t>(originInvoker, getProviderUrl(originInvoker)); // @1 exporter = new ExporterChangeableWrapper<t>((Exporter<t>) protocol.export(invokerDelegete), originInvoker); // @2 bounds.put(key, exporter); } } } return exporter; }
代碼@1:若是服務提供者以dubbo協議暴露服務,getProviderUrl(originInvoker)返回的URL將以dubbo://開頭。 代碼@2:根據Dubbo內置的SPI機制,將調用DubboProtocol#export方法。緩存
一、源碼分析DubboProtocol#export安全
public <t> Exporter<t> export(Invoker<t> invoker) throws RpcException { URL url = invoker.getUrl(); // @1 // export service. String key = serviceKey(url); // @2 DubboExporter<t> exporter = new DubboExporter<t>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); //@3 start Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // @3 end openServer(url); // @4 optimizeSerialization(url); // @5 return exporter; }
代碼@1:獲取服務提供者URL,以協議名稱,這裏是dubbo://開頭。 代碼@2:從服務提供者URL中獲取服務名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880。 代碼@3:是否將轉發事件導出成stub。 代碼@4:根據url打開服務,下面將詳細分析其實現。 代碼@5:根據url優化器序列化方式。服務器
二、源碼分析DubboProtocol#openServer網絡
private void openServer(URL url) { // find server. String key = url.getAddress(); // @1 //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); // @2 if (server == null) { serverMap.put(key, createServer(url)); //@3 } else { // server supports reset, use together with override server.reset(url); //@4 } } }
代碼@1:根據url獲取網絡地址:ip:port,例如:192.168.56.1:20880,服務提供者IP與暴露服務端口號。 代碼@2:根據key從服務器緩存中獲取,若是存在,則執行代碼@4,若是不存在,則執行代碼@3. 代碼@3:根據URL建立一服務器,Dubbo服務提供者服務器實現類爲ExchangeServer。 代碼@4:若是服務器已經存在,用當前URL重置服務器,這個不難理解,由於一個Dubbo服務中,會存在多個dubbo:service標籤,這些標籤都會在服務檯提供者的同一個IP地址、端口號上暴露服務。架構
2.1 源碼分析DubboProtocol#createServer併發
private ExchangeServer createServer(URL url) { // send readonly event when server closes, it's enabled by default url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // @1 // enable heartbeat by default url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @2 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @3 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) // @4 throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // @5 ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); // @6 } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); //@7 if (str != null && str.length() > 0) { Set<string> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
代碼@1:爲服務提供者url增長channel.readonly.sent屬性,默認爲true,表示在發送請求時,是否等待將字節寫入socket後再返回,默認爲true。 代碼@2:爲服務提供者url增長heartbeat屬性,表示心跳間隔時間,默認爲60*1000,表示60s。 代碼@3:爲服務提供者url增長server屬性,可選值爲netty,mina等等,默認爲netty。 代碼@4:根據SPI機制,判斷server屬性是否支持。 代碼@5:爲服務提供者url增長codec屬性,默認值爲dubbo,協議編碼方式。 代碼@6:根據服務提供者URI,服務提供者命令請求處理器requestHandler構建ExchangeServer實例。requestHandler的實現具體在之後詳細分析Dubbo服務調用時再詳細分析。 代碼@7:驗證客戶端類型是否可用。app
2.1.1 源碼分析Exchangers.bind 根據URL、ExchangeHandler構建服務器socket
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); }
上述代碼不難看出,首先根據url獲取Exchanger實例,而後調用bind方法構建ExchangeServer,Exchanger接口以下
dubbo提供的實現類爲:HeaderExchanger,其bind方法以下:
HeaderExchanger#bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
從這裏能夠看出,端口的綁定由Transporters的bind方法實現。
2.1.2 源碼分析Transporters.bind方法
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); } public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
從這裏得知,Dubbo網絡傳輸的接口有Transporter接口實現,其繼承類圖所示: 本文以netty版原本查看一下Transporter實現。
NettyTransporter源碼以下:
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } @Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
NettyServer創建網絡鏈接的實現方法爲:
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // @1 channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); // @2 return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
熟悉本方法須要具有Netty的知識,有關源碼:閱讀Netty系列文章,這裏不對每一行代碼進行解讀,對於與網絡相關的參數,將在後續文章中詳細講解,本方法@一、@2引發了個人注意,首先建立NettyServer必須傳入一個服務提供者URL,但從DubboProtocol#createServer中能夠看出,Server是基於網絡套接字(ip:port)緩存的,一個JVM應用中,必然會存在多個dubbo:server標籤,就會有多個URL,這裏爲何能夠這樣作呢?從DubboProtocol#createServer中能夠看出,在解析第二個dubbo:service標籤時並不會調用createServer,而是會調用Server#reset方法,是否是這個方法有什麼魔法,在reset方法時能將URL也註冊到Server上,那接下來分析NettyServer#reset方法是如何實現的。
2.2源碼分析DdubboProtocol#reset reset方法最終將用Server的reset方法,一樣仍是以netty版本的NettyServer爲例,查看reset方法的實現原理。NettyServer#reset--->父類(AbstractServer)
AbstractServer#reset
public void reset(URL url) { if (url == null) { return; } try { // @1 start if (url.hasParameter(Constants.ACCEPTS_KEY)) { int a = url.getParameter(Constants.ACCEPTS_KEY, 0); if (a > 0) { this.accepts = a; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) { int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0); if (t > 0) { this.idleTimeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { if (url.hasParameter(Constants.THREADS_KEY) && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; int threads = url.getParameter(Constants.THREADS_KEY, 0); int max = threadPoolExecutor.getMaximumPoolSize(); int core = threadPoolExecutor.getCorePoolSize(); if (threads > 0 && (threads != max || threads != core)) { if (threads < core) { threadPoolExecutor.setCorePoolSize(threads); if (core == max) { threadPoolExecutor.setMaximumPoolSize(threads); } } else { threadPoolExecutor.setMaximumPoolSize(threads); if (core == max) { threadPoolExecutor.setCorePoolSize(threads); } } } } } catch (Throwable t) { logger.error(t.getMessage(), t); } // @1 end super.setUrl(getUrl().addParameters(url.getParameters())); // @2 }
代碼@1:首先是調整線程池的相關線程數量,這個好理解。、 代碼@2:而後設置調用setUrl覆蓋原先NettyServer的private volatile URL url的屬性,那爲何不會影響原先註冊的dubbo:server呢? 原來NettyHandler上加了註解:@Sharable,由該註解去實現線程安全。
Dubbo服務提供者啓動流程將分析到這裏了,本文並未對網絡細節進行詳細分析,旨在梳理出啓動流程,有關Dubbo服務網絡實現原理將在後續章節中詳細分析,敬請期待。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。
</string></t></t></t></t></t></t></t></t></t></t></t></t></t></t>