解決dubbo優雅停機

  • 現狀

dubbo目前的不支持優雅停機,是由於服務端關閉時,客戶端主動關閉長鏈接,致使服務端響應消息不能返回。java

在服務端和客戶端配置參數:-Ddubbo.service.shutdown.wait=30000,該參數爲停機等待時間,可是結果也不生效,仍是立刻停機了。app

  • 指望結果
  1. 正在讀或者寫的消息,須要繼續處理;
  2. 線程池需等全部任務執行完成後,才關閉;
  • 服務端停機時客戶端操做

當服務端停機時,客戶端檢測斷開事件,立刻關閉了與該服務端之間的鏈接,並沒有限次重連服務提供折。若是服務提供者只有一個時,則不在進行服務的遠程調用。框架

客戶端斷開鏈接時序圖:函數

經過zk的節點變化事件觸發notify,客戶端刷新服務提供者,刪除該服務提供者【destroyUnusedInvokers方法】;若是隻有一個服務提供者時,則銷燬全部Invoker【destroyAllInvokers方法】。測試

  • 客戶端代碼修改

在dubboInvoker銷燬時,直接調用了client.clise()方法,源碼以下:this

public void destroy() {
        if (super.isDestroyed()){
            return ;
        } else {
            destroyLock.lock();
            try{
                if (super.isDestroyed()){
                    return ;
                }
                super.destroy();
                if (invokers != null){
                    invokers.remove(this);
                }
                for (ExchangeClient client : clients) {
                    try {
                        //直接關閉
                        client.close();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
                
            }finally {
                destroyLock.unlock();
            }
        }
    }

 修改client.close()爲client.clise(timeout),代碼修改以下:url

public void destroy() {
        if (super.isDestroyed()){
            return ;
        } else {
            destroyLock.lock();
            try{
                if (super.isDestroyed()){
                    return ;
                }
                super.destroy();
                if (invokers != null){
                    invokers.remove(this);
                }
                for (ExchangeClient client : clients) {
                    try {
                        //修改關閉
                        this.close(client);
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
                
            }finally {
                destroyLock.unlock();
            }
        }
    }
    
    /**
     * 若是沒有設置dubbo.service.shutdown.wait
     *    或者dubbo.service.shutdown.wait.seconds參數,則直接關閉
     * @param client
     * @author 夏志強
     */
    @SuppressWarnings("deprecation")
	private void close(ExchangeClient client) {
    	String timeout = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);
    	if(timeout != null && timeout.length() > 0) {
    		try{
    			client.close(Integer.parseInt(timeout));
    		} catch(Exception e) {
    		}
    	} else {
    		timeout = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);
    		if(timeout != null && timeout.length() > 0) {
        		try{
        			client.close(Integer.parseInt(timeout));
        		} catch(Exception e) {
        		}
        	} else {
        		client.close();
        	}
    	}
    }

修改完上述代碼,運行後發現客戶端仍是立刻關閉鏈接,調試代碼發現HeaderExchangeChannel的close方法裏,判斷HeaderExchangeChannel.this是否在DefaultFuture中和優雅退出是否超過超時時間,若是過了超時時間,則當即關閉,源碼以下:spa

public void close(int timeout) {
        if (closed) {
            return;
        }
        closed = true;
        if (timeout > 0) {
            long start = System.currentTimeMillis();
            //DefaultFuture中CHANNELS不包含HeaderExchangeChannel類型
            while (DefaultFuture.hasFuture(HeaderExchangeChannel.this) 
                    && System.currentTimeMillis() - start < timeout) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        close();
    }

查看HeaderExchangeChannel的request方法,線程

DefaultFuture future = new DefaultFuture(channel, req, timeout);調試

實際上channel是NettyClient,因此channel不會是HeaderExchangeChannel類型。

修改代碼以下:

// graceful close
    public void close(int timeout) {
        if (closed) {
            return;
        }
        closed = true;
        if (timeout > 0) {
            long start = System.currentTimeMillis();
            while (DefaultFuture.hasFuture(channel) 
                    && System.currentTimeMillis() - start < timeout) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        close();
    }

再次運行代碼,發現結果仍是不對,客戶端一直在等待關閉,而服務端已經關閉了。

  • 服務端停機時序圖

目前我分析的是ProtocolConfig的destoryAll()方法,主要分兩步,第一步刪除zk服務提供者節點,關閉zk監聽;第二步dubbo協議銷燬,連接關閉。

dubbo協議銷燬時序圖:

經過時序圖看到服務端的優雅關閉是在AbstractServer的close(timeout)方法,源碼以下:

public void close(int timeout) {
        ExecutorUtil.gracefulShutdown(executor ,timeout);
        close();
    }

//ExecutorUtil類
public static void gracefulShutdown(Executor executor, int timeout) {
        if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
            return;
        }
        final ExecutorService es = (ExecutorService) executor;
        try {
            es.shutdown(); // Disable new tasks from being submitted
        } catch (SecurityException ex2) {
            return ;
        } catch (NullPointerException ex2) {
            return ;
        }
        try {
            if(! es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                es.shutdownNow();
            }
        } catch (InterruptedException ex) {
            es.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (!isShutdown(es)){
            newThreadToCloseExecutor(es);
        }
    }

其中executor是dubbo的線程派發模型,只有executor不爲空時,纔會等待線程池任務執行完後關閉。這裏有一個坑,那就是調用es.awaitTermination時,必定要先調用es.shutdown(),不然就算線程池中的任務執行完或者超時後,都不會關閉,一直阻塞【詳情請查看jdk】。調試時發現executor爲空,executor值是經過AbstractServer的構造函數來初始化的,源碼以下:

ExecutorService executor;

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                        || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //設置executor
        if (handler instanceof WrappedChannelHandler ){
            executor = ((WrappedChannelHandler)handler).getExecutor();
        }
    }

咱們默認dubbo的底層通訊框架爲netty,因此查看NettyServer代碼,

public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

//如下爲ChannelHandlers代碼
public static ChannelHandler wrap(ChannelHandler handler, URL url){
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                        .getAdaptiveExtension().dispatch(handler, url)));
    }

構造函數中將handler進行了包裝,此時handler類型已經變成了MultiMessageHandler,而不是WrappedChannelHandler。看一下handler的繼承關係:

由於dubbo的默認線程模型爲AllChannelHandler(參照dubbo官方文檔),AllChannelHandler父類爲WrappedChannelHandler類型,因此須要經過反射來設置executor值。

修改AbstractServer的構造函數,給executor賦值:

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                        || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //修改s
        this.setExecutor(handler);
    }
    
    /**
     * 設置executor
     * @param handler
     * @author 夏志強
     */
    private void setExecutor(ChannelHandler handler) {
    	if(handler != null) {
    		if (handler instanceof WrappedChannelHandler ){
                executor = ((WrappedChannelHandler)handler).getExecutor();
            } else if (handler instanceof AbstractChannelHandlerDelegate ){
                try {
					Field field = AbstractChannelHandlerDelegate.class.getDeclaredField("handler");
					field.setAccessible(true);
					setExecutor((ChannelHandler)field.get(handler));
				} catch (Exception e) {
				} 
            }
    	}
    }

再次運行測試代碼,這時返回結果正常。

相關文章
相關標籤/搜索