上一篇文章分析了暴露服務到本地,Dubbo的服務導出1之導出到本地。接下來分析暴露服務到遠程。bootstrap
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { // 添加動態參數,此動態參數是決定Zookeeper建立臨時節點仍是持久節點 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } // 步驟1)建立Invoker,這裏建立Invoker邏輯和上面同樣 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded( Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 步驟2)暴露服務 Exporter<?> exporter = ServiceConfig.protocol.export(wrapperInvoker); exporters.add(exporter); } } }
/** * 下面分析步驟2,該方法兩大核心邏輯,導出服務和註冊服務,服務註冊下篇文章分析 */ @Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 1. 導出服務,export invoker,本篇文章僅分析第一步導出服務到遠程 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// zookeeper://10.101.99.127:2181/com.alibaba.dubbo.registry.RegistryService // ?application=demo-provider&dubbo=2.0.2 URL registryUrl = getRegistryUrl(originInvoker); // registry provider,默認返回ZookeeperRegistry實例 final Registry registry = getRegistry(originInvoker); // dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true // &application=demo-provider&default.server=netty4&dubbo=2.0.2&generic=false // &interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=8140&side=provider final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // 不配置的話默認返回true boolean register = registeredProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
// 2.註冊服務,這篇文章已經比較長了,決定將步驟2和步驟3新起一篇文章分析,服務暴露以後須要註冊到註冊中心 if (register) { register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // 3.數據更新訂閱 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 調用protocol的export方法導出服務,默認是採用Dubbo協議,對應DubboProtocol的export方法 // 可是這裏protocol.export()並非先走DubboProtocol的export方法,而是先走 // ProtocolListenerWrapper的wrapper方法 // 由於ProtocolListenerWrapper對DubboProtocol作了一層包裝,具體參考 // https://segmentfault.com/a/1190000020387196,核心方法protocal.export() exporter = new ExporterChangeableWrapper<T>( (Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
/** * 上述核心方法protocol.export()會先走到ProtocolListenerWrapper的export方法,該方法是在服務暴露上作了 監聽器功能的加強,也就是加上了監聽器 */ @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 若是是註冊中心,則暴露該invoker if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 建立一個暴露者監聽器包裝類對象,暴露服務時這裏的protocol是ProtocolFilterWrapper,這裏用到了 // Wrapper包裝原有的DubboProtocol,能夠參考https://segmentfault.com/a/1190000020387196 return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); }
/** * ProtocolFilterWrapper的export方法,該方法是在服務暴露上作了過濾器鏈的加強,也就是加上了過濾器 */ @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 若是是註冊中心,則直接暴露服務 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 服務提供側暴露服務,這裏經過buildInvokerChain造成了過濾器鏈 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
/** * 該方法就是建立帶Filter鏈的Invoker對象,倒序的把每個過濾器串連起來,造成一個invoker */ private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 得到過濾器的全部擴展實現類實例集合 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class). getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { // 從最後一個過濾器開始循環,建立一個帶有過濾器鏈的invoker對象 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); }
@Override public boolean isAvailable() { return invoker.isAvailable(); } // 關鍵在這裏,調用下一個filter表明的invoker,把每個過濾器串起來 @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
// 通過兩個Wrapper的export方法包裝以後,走到DubboProtocol的export方法,這裏是核心方法 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // url形如dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true // &application=demo-provider&bind.ip=172.22.213.93&bind.port=20880&dubbo=2.0.2&generic=false // /&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=648&qos.port=22222 // &side=provider×tamp=1569585915258 URL url = invoker.getUrl(); // 獲取服務標識,理解成服務座標也行,由服務組名,服務名,服務版本號以及端口組成,key形如 // com.alibaba.dubbo.demo.DemoService:20880 String key = serviceKey(url); // 建立DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 將<key, exporter>鍵值對放入緩存中 exporterMap.put(key, exporter); // 本地存根相關代碼, export an stub service for dispatching event // 刪除,暫時尚未分析本地存根相關 // 啓動服務器,重點關注這裏 openServer(url); optimizeSerialization(url); return exporter; }
// 根據URL值能夠猜想,openServer方法就是啓動Netty服務器,在172.22.213.93:20880端口上監聽調用請求 openServer(url);
/** * 在同一臺機器上(單網卡),同一個端口上僅容許啓動一個服務器實例,若某個端口上已有服務器實例,此時則調用reset方法 重置服務器的一些配置 */ private void openServer(URL url) { // 獲取host:port,並將其做爲服務器實例的key,用於標識當前的服務器實例,key形如172.22.213.93:20880 String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { // 建立服務器實例,put以後serverMap形如<172.22.213.93:20880, HeaderExchangeServer> serverMap.put(key, createServer(url)); } else { // 服務器已建立,則根據url中的配置重置服務器 server.reset(url); } } }
private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // 添加心跳檢測配置到URL中,enable heartbeat by default url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 獲取server參數,默認爲netty,這裏配置成了netty4,str就爲netty4 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 經過SPI檢測是否存在server參數所表明的Transporter拓展,不存在則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader. getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 添加編碼解碼器參數 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server; try { // 建立 ExchangeServer,核心方法 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader. getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); } public static Exchanger getExchanger(URL url) { // 默認type就是header String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); // 建立HeadExchanger return getExchanger(type); } public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 用傳輸層綁定返回的server建立對應的信息交換服務端 // 這裏也是分紅兩步,下面先分析bind方法,該方法就是開啓Netty4服務器監聽請求 // 1) bind方法 // 2) new HeaderExchangeServer(Server server) return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
// 步驟1)bind方法 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // 獲取自適應Transporter實例,並調用實例方法. getTransporter()方法獲取的Transporter是在運行時動態建立的, // 類名爲TransporterAdaptive,也就是自適應拓展類.TransporterAdaptive會在運行時根據傳入的URL參數決定加載 // 什麼類型的Transporter,這裏咱們配置成了netty4的NettyTransporter // String string = url.getParameter("server", url.getParameter("transporter", "netty")); // transporter = ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(string); return getTransporter().bind(url, handler); }
public Server bind(URL url, ChannelHandler listener) throws RemotingException { // 建立一個NettyServer return new NettyServer(url, listener); } public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // 調用父類構造方法,這裏的wrap方法返回的是 // MultiMessageHandler->HeartbeatHandler->AllDispatcherHandler->DecodeHandler->HeaderExchangeHandler // -> 表示前一個handler裏面包裝了下一個handler super(url, ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } // 包裝了MultiMessageHandler功能,增長了多消息處理的功能,以及對心跳消息作了功能加強 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { // 調用了多消息處理器,對心跳消息進行了功能增強 return new MultiMessageHandler( new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { // url形如dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true // &application=demo-provider&bind.ip=172.22.213.93&bind.port=20880&channel.readonly.sent=true // &codec=dubbo&default.server=netty4&dubbo=2.0.2&generic=false&heartbeat=60000 // &interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=6900&qos.port=22222 // &side=provider×tamp=1569633535398 // handler是MultiMessageHandler實例 super(url, handler); // 從url中得到本地地址,/172.22.213.93:20880 localAddress = getUrl().toInetSocketAddress(); // 從url配置中得到綁定的ip,本機IP地址172.22.213.93 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); // 從url配置中得到綁定的端口號,20880 int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); // 判斷url中配置anyhost是否爲true或者判斷host是否爲不可用的本地Host,url中配置了anyhost爲true if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; }
// /0.0.0.0:20880 bindAddress = new InetSocketAddress(bindIp, bindPort); // 從url中獲取配置,默認值爲0 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); // 從url中獲取配置,默認600s this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { // 開啓服務器 doOpen(); } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
/** * 該類是端點的抽象類,其中封裝了編解碼器以及兩個超時時間. * 基於dubbo 的SPI機制,得到相應的編解碼器實現對象,編解碼器優先從Codec2的擴展類中尋找 */ public AbstractEndpoint(URL url, ChannelHandler handler) { super(url, handler); this.codec = getChannelCodec(url); // 優先從url配置中取,若是沒有,默認爲1s this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 優先從url配置中取,若是沒有,默認爲3s this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); }
// 該方法是建立服務器,而且開啓 @Override protected void doOpen() throws Throwable { // 建立服務引導類 bootstrap = new ServerBootstrap(); // 建立boss線程組 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); // 建立worker線程組 workerGroup = new NioEventLoopGroup( getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // 建立服務器處理器 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); // 得到通道集合 channels = nettyServerHandler.getChannels(); // 設置eventLoopGroup還有可選項 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 編解碼器 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); // 增長責任鏈 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO)) .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) // 加入了NettyServerHandler,後續消息處理應該就是經過這個來處理,猜想,TODO .addLast("handler", nettyServerHandler); } }); // bind綁定,這裏bind完成以後Netty服務器就啓動了,監聽20880端口上的請求,有興趣能夠研究下Netty的源碼 ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); // 等待綁定完成 channelFuture.syncUninterruptibly(); // 設置通道 channel = channelFuture.channel(); }
// 步驟2)new HeaderExchangeServer(Server server) // 構造函數就是對屬性的設置,心跳的機制以及默認值都跟HeaderExchangeClient中的如出一轍 public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; //得到心跳週期配置,若是沒有配置,默認設置爲0 this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); // 得到心跳超時配置,默認是心跳週期的三倍 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); // 若是心跳超時時間小於心跳週期的兩倍,則拋出異常 if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } // 開始心跳 startHeartbeatTimer(); }
/** * 該方法是開始心跳,跟HeaderExchangeClient類中的開始心跳方法惟一區別是得到的通道不同, * 客戶端跟通道是一一對應的,全部只要對一個通道進行心跳檢測,而服務端跟通道是一對多的關係, * 全部須要對該服務器鏈接的全部通道進行心跳檢測 */ private void startHeartbeatTimer() { // 先中止現有的心跳檢測 stopHeartbeatTimer(); if (heartbeat > 0) { // 建立心跳定時器 heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { // 返回一個不可修改的鏈接該服務器的信息交換通道集合 return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
/** * 該類實現了Runnable接口,實現的是心跳任務,裏面包含了核心的心跳策略 */ final class HeartBeatTask implements Runnable { // 通道管理 private ChannelProvider channelProvider; // 心跳間隔,單位:ms private int heartbeat; // 心跳超時時間,單位:ms private int heartbeatTimeout; HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) { this.channelProvider = provider; this.heartbeat = heartbeat; this.heartbeatTimeout = heartbeatTimeout; }
/** * 該方法中是心跳機制的核心邏輯,注意如下幾個點: * * 若是須要心跳的通道自己若是關閉了,那麼跳過,不添加心跳機制. * 不管是接收消息仍是發送消息,只要超過了設置的心跳間隔,就發送心跳消息來測試是否斷開 * 若是最後一次接收到消息到到如今已經超過了心跳超時時間,那就認定對方的確斷開,分兩種狀況來處理對方斷開的狀況. * 分別是服務端斷開,客戶端重連以及客戶端斷開,服務端斷開這個客戶端的鏈接.這裏要好好品味一下誰是發送方, * 誰在等誰的響應,苦苦沒有等到. */ @Override public void run() { try { long now = System.currentTimeMillis(); // 遍歷全部通道 for (Channel channel : channelProvider.getChannels()) { // 若是通道關閉了,則跳過 if (channel.isClosed()) { continue; }
try { // 最後一次接收到消息的時間戳 Long lastRead = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP); // 最後一次發送消息的時間戳 Long lastWrite = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 若是最後一次接收或者發送消息到時間到如今的時間間隔超過了心跳間隔時間 if ((lastRead != null && now - lastRead > heartbeat) || (lastWrite != null && now - lastWrite > heartbeat)) { // 建立一個request,設置版本號,設置須要獲得響應 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); // 設置事件類型,爲心跳事件 req.setEvent(Request.HEARTBEAT_EVENT); // 發送心跳請求 channel.send(req); }
// 若是最後一次接收消息的時間到如今已經超過了超時時間 if (lastRead != null && now - lastRead > heartbeatTimeout) { // 若是該通道是客戶端,也就是請求的服務器掛掉了,客戶端嘗試重連服務器 if (channel instanceof Client) { try { // 從新鏈接服務器 ((Client) channel).reconnect(); } catch (Exception e) { //do nothing } } else { // 若是不是客戶端,也就是是服務端返回響應給客戶端,可是客戶端掛掉了, // 則服務端關閉客戶端鏈接 channel.close(); } } } } } }
interface ChannelProvider { // 得到全部的通道集合,須要心跳的通道數組 Collection<Channel> getChannels(); }
}segmentfault