dubbo目前的不支持優雅停機,是由於服務端關閉時,客戶端主動關閉長鏈接,致使服務端響應消息不能返回。java
在服務端和客戶端配置參數:-Ddubbo.service.shutdown.wait=30000,該參數爲停機等待時間,可是結果也不生效,仍是立刻停機了。app
當服務端停機時,客戶端檢測斷開事件,立刻關閉了與該服務端之間的鏈接,並沒有限次重連服務提供折。若是服務提供者只有一個時,則不在進行服務的遠程調用。框架
客戶端斷開鏈接時序圖:函數
經過zk的節點變化事件觸發notify,客戶端刷新服務提供者,刪除該服務提供者【destroyUnusedInvokers方法】;若是隻有一個服務提供者時,則銷燬全部Invoker【destroyAllInvokers方法】。測試
在dubboInvoker銷燬時,直接調用了client.clise()方法,源碼以下:this
public void destroy() { if (super.isDestroyed()){ return ; } else { destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { //直接關閉 client.close(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }finally { destroyLock.unlock(); } } }
修改client.close()爲client.clise(timeout),代碼修改以下:url
public void destroy() { if (super.isDestroyed()){ return ; } else { destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { //修改關閉 this.close(client); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }finally { destroyLock.unlock(); } } } /** * 若是沒有設置dubbo.service.shutdown.wait * 或者dubbo.service.shutdown.wait.seconds參數,則直接關閉 * @param client * @author 夏志強 */ @SuppressWarnings("deprecation") private void close(ExchangeClient client) { String timeout = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY); if(timeout != null && timeout.length() > 0) { try{ client.close(Integer.parseInt(timeout)); } catch(Exception e) { } } else { timeout = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY); if(timeout != null && timeout.length() > 0) { try{ client.close(Integer.parseInt(timeout)); } catch(Exception e) { } } else { client.close(); } } }
修改完上述代碼,運行後發現客戶端仍是立刻關閉鏈接,調試代碼發現HeaderExchangeChannel的close方法裏,判斷HeaderExchangeChannel.this是否在DefaultFuture中和優雅退出是否超過超時時間,若是過了超時時間,則當即關閉,源碼以下:spa
public void close(int timeout) { if (closed) { return; } closed = true; if (timeout > 0) { long start = System.currentTimeMillis(); //DefaultFuture中CHANNELS不包含HeaderExchangeChannel類型 while (DefaultFuture.hasFuture(HeaderExchangeChannel.this) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); }
查看HeaderExchangeChannel的request方法,線程
DefaultFuture future = new DefaultFuture(channel, req, timeout);調試
實際上channel是NettyClient,因此channel不會是HeaderExchangeChannel類型。
修改代碼以下:
// graceful close public void close(int timeout) { if (closed) { return; } closed = true; if (timeout > 0) { long start = System.currentTimeMillis(); while (DefaultFuture.hasFuture(channel) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); }
再次運行代碼,發現結果仍是不對,客戶端一直在等待關閉,而服務端已經關閉了。
目前我分析的是ProtocolConfig的destoryAll()方法,主要分兩步,第一步刪除zk服務提供者節點,關閉zk監聽;第二步dubbo協議銷燬,連接關閉。
dubbo協議銷燬時序圖:
經過時序圖看到服務端的優雅關閉是在AbstractServer的close(timeout)方法,源碼以下:
public void close(int timeout) { ExecutorUtil.gracefulShutdown(executor ,timeout); close(); } //ExecutorUtil類 public static void gracefulShutdown(Executor executor, int timeout) { if (!(executor instanceof ExecutorService) || isShutdown(executor)) { return; } final ExecutorService es = (ExecutorService) executor; try { es.shutdown(); // Disable new tasks from being submitted } catch (SecurityException ex2) { return ; } catch (NullPointerException ex2) { return ; } try { if(! es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { es.shutdownNow(); } } catch (InterruptedException ex) { es.shutdownNow(); Thread.currentThread().interrupt(); } if (!isShutdown(es)){ newThreadToCloseExecutor(es); } }
其中executor是dubbo的線程派發模型,只有executor不爲空時,纔會等待線程池任務執行完後關閉。這裏有一個坑,那就是調用es.awaitTermination時,必定要先調用es.shutdown(),不然就算線程池中的任務執行完或者超時後,都不會關閉,一直阻塞【詳情請查看jdk】。調試時發現executor爲空,executor值是經過AbstractServer的構造函數來初始化的,源碼以下:
ExecutorService executor; public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //設置executor if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); } }
咱們默認dubbo的底層通訊框架爲netty,因此查看NettyServer代碼,
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } //如下爲ChannelHandlers代碼 public static ChannelHandler wrap(ChannelHandler handler, URL url){ return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
構造函數中將handler進行了包裝,此時handler類型已經變成了MultiMessageHandler,而不是WrappedChannelHandler。看一下handler的繼承關係:
由於dubbo的默認線程模型爲AllChannelHandler(參照dubbo官方文檔),AllChannelHandler父類爲WrappedChannelHandler類型,因此須要經過反射來設置executor值。
修改AbstractServer的構造函數,給executor賦值:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //修改s this.setExecutor(handler); } /** * 設置executor * @param handler * @author 夏志強 */ private void setExecutor(ChannelHandler handler) { if(handler != null) { if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); } else if (handler instanceof AbstractChannelHandlerDelegate ){ try { Field field = AbstractChannelHandlerDelegate.class.getDeclaredField("handler"); field.setAccessible(true); setExecutor((ChannelHandler)field.get(handler)); } catch (Exception e) { } } } }
再次運行測試代碼,這時返回結果正常。