源碼分析Dubbo網絡通信篇之NettyServer網絡事件派發機制(Dispatch)

本節將主要學習Dubbo是如何使用Netty來實現網絡通信的。 從官網咱們得知,Dubbo協議是使用單一長鏈接來進行網絡傳輸,也就是說服務調用方持久與服務提供者創建一條鏈接,全部的服務調用調用信息經過。 一條TCP鏈接進行傳輸,在網絡層至少要考慮以下問題:緩存

  1. 服務端,客戶端網絡通信模型(線程模型)網絡

  2. 傳輸(編碼解碼、序列化)。架構

  3. 服務端轉發策略等。併發

Dubbo服務端的網絡啓動流程,在上篇中已給出序列圖,本節仍是以該圖爲切入點,引入本文的兩個主人公:NettyServer、NettyClient。app

這裏寫圖片描述 dubbo使用SPI機制,根據配置,能夠支持以下框架實現網絡通信模型,例如netty3,netty四、mina、grizzly,本文重點分析基於Netty4的實現,包路徑:dubbo-remoting-netty4。 從上面的流程圖,NettyTransport的職責就是調用new NettyServer的構造方法,從而構建NettyServer對象,在深刻NettyServer對象構造過程以前,先來看一下NettyServer的類繼承層次: 這裏寫圖片描述框架

NettyServer構造函數:異步

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {  // @1
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));    // @2
}

代碼@1:URL url:服務提供者URL;ChannelHandler網絡事件處理器, 這裏寫圖片描述分佈式

也就是當相應網絡事件觸發時,執行的事件處理器。ide

  • void connected(Channel channel) throws RemotingException 鏈接事件,當收到客戶端的鏈接事件時,執行該方法處理相關業務操做。
  • void disconnected(Channel channel) throws RemotingException:鏈接斷開事件
  • void sent(Channel channel, Object message) throws RemotingException 當可寫事件觸發時,服務端向客戶端返回響應數據,就是經過該方法發送的。
  • void received(Channel channel, Object message) throws RemotingException 當讀事件觸發時執行該方法,服務端在收到客戶端的請求數據是,調用該方法執行解包等操做。
  • void caught(Channel channel, Throwable exception) throws RemotingException 發生異常時,調用該方法。

代碼@2:調用ChannelHandlers.wrap對原生Handler進行包裝,而後調用其父類的構造方法,首先,設置Dubbo服務端線程池中線程的名稱,能夠經過參數threadname來指定線程池中線程的前綴,默認爲:DubboServerHandler + dubbo服務端IP與接口號。我比較好奇的是這裏爲何須要對ChannelHandler進行包裝呢?是增長了些什麼邏輯呢?帶着者問題,引出本節重點探討的內容:事件派發機制。 事件派發機制指的是網絡事件(鏈接、讀、寫)等事件觸發後,這些事件如何執行,是由IO線程仍是派發到線程池中執行。Dubbo定義了以下5種事件派發機制: 這裏寫圖片描述函數

本文將詳細分析各類事件的派發實現原理。 ChannelHandlers#wrapInternal

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
}

這裏是典型的裝飾模式,MultiMessageHandler,多消息處理Handler,HeartbeatHandler,心跳Handler,其主要功能是處理心跳返回與心跳請求,直接在IO線程中執行,每次收到信息,更新通道的讀事件戳,每次發送數據時,記錄通道的寫事件戳。這裏的核心關鍵是利用SPI自適配,返回合適的事件派發機制。Dispatcher的類層次結構如圖所示: 這裏寫圖片描述

一、源碼分析AllDispatcher實現原理

線程派發機制:全部的消息都派發到線程池,包括請求、響應、鏈接事件、斷開事件、心跳等。

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}

從中能夠看出,事件派發類繼承圖分兩個維度,Dispatcher(事件派發器)、與之對應的ChannelHandler,例如AllChannelHandler。

1.1 WrappedChannelHandler

接下來分析事件派發機制,重點關注ChannelHandler類的實現體系。 這裏寫圖片描述

縱觀Dubbo ChannelHanler體系的設計,是經典的類裝飾器模式,上述派發器主要解決的問題,是相關網絡事件(鏈接、讀(請求)、寫(響應)、心跳請求、心跳響應)是在IO線程、仍是在額外定義的線程池,故WrappedChannelHandler的主要職責是定義線程池相關的邏輯,具體是在IO線程上執行,仍是在定義的線程池中執行,則由子類具體去定製,WrappedChannelHandler默認實現ChannelHandler的全部方法,各個方法的實現直接調用被裝飾Handler的方法,見下圖: 這裏寫圖片描述

接下來先重點關注一下WrappedChannelHandler的成員變量和構造方法的實現。

protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
  • ExecutorService SHARED_EXECUTOR:共享線程池,默認線程池,若是 ExecutorService executor爲空,則使用SHARED_EXECUTOR
  • ExecutorService executor 定義的線程池
  • ChannelHandler handler:被裝飾的ChannelHandler
  • URL url 服務提供者URL 接下來關注一下其構造函數:
public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);    // @1

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);  // @2
    }

代碼@1:構建線程池,這裏基於SPI機制,用戶可選擇cached、eager、fixed、limited,將在本節下面詳細介紹,這裏只須要知道是構建了一個線程池。 代碼@2:將服務端都與線程池緩存起來,在服務端,線程池的緩存級別是 服務提供者協議(端口):線程池。

1.2 AllChannelHandler

事件派發機制:全部網絡事件在線程池中執行,其實現機制確定是重寫ChannelHandler的全部網絡事件方法,將調用其修飾的ChannelHanlder在線程池中執行。因爲AllChannelHandler是第一個事件派發機制,故對其實現作一個詳細描述。

1.2.1 AllChannelHandler#connected

public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

鏈接事件,其主要實現是,首先先獲取執行線程池,其獲取邏輯是若是executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class). getAdaptiveExtension().getExecutor(url);獲取不到線程池,則使用共享線程池。能夠看出,鏈接事件的業務調用時異步執行,基於線程池。 注:調用時機,服務端收到客戶端鏈接後,該方法會被調用。

2.2.2 AllChannelHandler#disconnected

public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

其基本實現與connected相同,就是將具體的disconnected 事件所對應的業務擴展方法在線程池中執行。 注:調用時機,服務端收到客戶端斷開鏈接後,該方法會被調用。

2.2.3 AllChannelHandler#received

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
        		if(request.isTwoWay()){
        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

調用時機:當服務端收到客戶端發送的請求後,通過IO線程(Netty)會首先從二進制流中解碼出一個個的請求,參數Object message,就是調用請求,而後在提交給線程池執行,執行完後,當業務處理完畢後,組裝結果後,必然會在該線程中調用通道(Channel#write,flush)方法,向通道寫入響應結果。 注:all事件派發機制,ChannelHandler#recive是在線程池中執行。

2.2.4 AllChannelHandler#caught

public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

當發生異常時,ChannelHandler#caught也在線程池中執行。 使人頗感意外的是,AllChannelHandler並未重寫WrappedChannelHandler的sent方法,也就是說ChannelHandler#sent事件回調方法,是在IO線程中執行。 WrappedChannelHandler#sent

public void sent(Channel channel, Object message) throws RemotingException {
        handler.sent(channel, message);
}

這個和官方文檔仍是有必定出入的。 這裏寫圖片描述

1.3 ExecutionChannelHandler

對應事件派件器:ExecutionDispatcher,其配置值:execution,從其源碼的實現來看,與AllDispatcher實現基本相似,惟一的區別是,若是executor線程池爲空時,並不會使用共享線程池,目前我還想不出什麼狀況下,線程池會初始化失敗。

1.4 DirectDispatcher

直接派發,也就是全部的事件所有在IO線程中執行,故其實現很是簡單:

public class DirectDispatcher implements Dispatcher {
    public static final String NAME = "direct";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}

1.5 MessageOnlyDispatcher、MessageOnlyChannelHandler

事件派發器:只有請求事件在線程池中執行,其餘響應事件、心跳,鏈接,斷開鏈接等事件在IO線程上執行,故其只須要重寫recive方法便可:

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

1.6 ConnectionOrderedDispatcher ConnectionOrderedChannelHandler

事件派發器:鏈接、斷開鏈接事件排隊執行,並可經過connect.queue.capacity屬性設置隊列長度,請求事件、異常事件在線程池中執行。

1.6.1 構造方法

public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

重點關注一下connectionExecutor ,用來執行鏈接、斷開事件的線程池,線程池中只有一個線程,而且隊列能夠選擇時有界隊列,經過connect.queue.capacity屬性配置,超過的事件,則拒絕執行。

1.6.2 ConnectionOrderedChannelHandler#connected

public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

檢查隊列長度,若是超過警告值,則輸出警告信息,而後提交鏈接線程池中執行,disconnected事件相似。其餘received、caught事件,則與AllDispatcher相同,就不在重複。

總結:本文主要是分析闡述了Dubbo Dispatch 機制,但與官方文檔存在出入,先概括以下: Dispatch:全部的sent事件方法、心跳請求所有在IO線程上執行。

  1. all 除sent事件回調方法、心跳外,所有在線程池上執行。
  2. execution 與all相似,惟一區就是all在線程池未指定時,可使用共享線程池,這個差異等同於沒有。
  3. message 只有請求事件在線程池中執行,其餘在IO線程上執行。
  4. connection 請求事件在線程池中執行,鏈接、斷開鏈接事件排隊執行(含一個線程的線程池)
  5. direct 全部事件都在IO線程中執行。

>做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區優秀佈道師、CSDN2019博客之星TOP10,維護公衆號:中間件興趣圈目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接加入中間件知識星球 ,一塊兒探討高併發、分佈式服務架構,交流源碼。 在這裏插入圖片描述</runnable>

相關文章
相關標籤/搜索