本節將主要學習Dubbo是如何使用Netty來實現網絡通信的。 從官網咱們得知,Dubbo協議是使用單一長鏈接來進行網絡傳輸,也就是說服務調用方持久與服務提供者創建一條鏈接,全部的服務調用調用信息經過。 一條TCP鏈接進行傳輸,在網絡層至少要考慮以下問題:緩存
服務端,客戶端網絡通信模型(線程模型)網絡
傳輸(編碼解碼、序列化)。架構
服務端轉發策略等。併發
Dubbo服務端的網絡啓動流程,在上篇中已給出序列圖,本節仍是以該圖爲切入點,引入本文的兩個主人公:NettyServer、NettyClient。app
dubbo使用SPI機制,根據配置,能夠支持以下框架實現網絡通信模型,例如netty3,netty四、mina、grizzly,本文重點分析基於Netty4的實現,包路徑:dubbo-remoting-netty4。 從上面的流程圖,NettyTransport的職責就是調用new NettyServer的構造方法,從而構建NettyServer對象,在深刻NettyServer對象構造過程以前,先來看一下NettyServer的類繼承層次: 框架
NettyServer構造函數:異步
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // @1 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); // @2 }
代碼@1:URL url:服務提供者URL;ChannelHandler網絡事件處理器, 分佈式
也就是當相應網絡事件觸發時,執行的事件處理器。ide
代碼@2:調用ChannelHandlers.wrap對原生Handler進行包裝,而後調用其父類的構造方法,首先,設置Dubbo服務端線程池中線程的名稱,能夠經過參數threadname來指定線程池中線程的前綴,默認爲:DubboServerHandler + dubbo服務端IP與接口號。我比較好奇的是這裏爲何須要對ChannelHandler進行包裝呢?是增長了些什麼邏輯呢?帶着者問題,引出本節重點探討的內容:事件派發機制。 事件派發機制指的是網絡事件(鏈接、讀、寫)等事件觸發後,這些事件如何執行,是由IO線程仍是派發到線程池中執行。Dubbo定義了以下5種事件派發機制: 函數
本文將詳細分析各類事件的派發實現原理。 ChannelHandlers#wrapInternal
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
這裏是典型的裝飾模式,MultiMessageHandler,多消息處理Handler,HeartbeatHandler,心跳Handler,其主要功能是處理心跳返回與心跳請求,直接在IO線程中執行,每次收到信息,更新通道的讀事件戳,每次發送數據時,記錄通道的寫事件戳。這裏的核心關鍵是利用SPI自適配,返回合適的事件派發機制。Dispatcher的類層次結構如圖所示:
線程派發機制:全部的消息都派發到線程池,包括請求、響應、鏈接事件、斷開事件、心跳等。
public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }
從中能夠看出,事件派發類繼承圖分兩個維度,Dispatcher(事件派發器)、與之對應的ChannelHandler,例如AllChannelHandler。
接下來分析事件派發機制,重點關注ChannelHandler類的實現體系。
縱觀Dubbo ChannelHanler體系的設計,是經典的類裝飾器模式,上述派發器主要解決的問題,是相關網絡事件(鏈接、讀(請求)、寫(響應)、心跳請求、心跳響應)是在IO線程、仍是在額外定義的線程池,故WrappedChannelHandler的主要職責是定義線程池相關的邏輯,具體是在IO線程上執行,仍是在定義的線程池中執行,則由子類具體去定製,WrappedChannelHandler默認實現ChannelHandler的全部方法,各個方法的實現直接調用被裝飾Handler的方法,見下圖:
接下來先重點關注一下WrappedChannelHandler的成員變量和構造方法的實現。
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url;
public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); // @1 String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); // @2 }
代碼@1:構建線程池,這裏基於SPI機制,用戶可選擇cached、eager、fixed、limited,將在本節下面詳細介紹,這裏只須要知道是構建了一個線程池。 代碼@2:將服務端都與線程池緩存起來,在服務端,線程池的緩存級別是 服務提供者協議(端口):線程池。
事件派發機制:全部網絡事件在線程池中執行,其實現機制確定是重寫ChannelHandler的全部網絡事件方法,將調用其修飾的ChannelHanlder在線程池中執行。因爲AllChannelHandler是第一個事件派發機制,故對其實現作一個詳細描述。
public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } }
鏈接事件,其主要實現是,首先先獲取執行線程池,其獲取邏輯是若是executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class). getAdaptiveExtension().getExecutor(url);獲取不到線程池,則使用共享線程池。能夠看出,鏈接事件的業務調用時異步執行,基於線程池。 注:調用時機,服務端收到客戶端鏈接後,該方法會被調用。
public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } }
其基本實現與connected相同,就是將具體的disconnected 事件所對應的業務擴展方法在線程池中執行。 注:調用時機,服務端收到客戶端斷開鏈接後,該方法會被調用。
public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
調用時機:當服務端收到客戶端發送的請求後,通過IO線程(Netty)會首先從二進制流中解碼出一個個的請求,參數Object message,就是調用請求,而後在提交給線程池執行,執行完後,當業務處理完畢後,組裝結果後,必然會在該線程中調用通道(Channel#write,flush)方法,向通道寫入響應結果。 注:all事件派發機制,ChannelHandler#recive是在線程池中執行。
public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } }
當發生異常時,ChannelHandler#caught也在線程池中執行。 使人頗感意外的是,AllChannelHandler並未重寫WrappedChannelHandler的sent方法,也就是說ChannelHandler#sent事件回調方法,是在IO線程中執行。 WrappedChannelHandler#sent
public void sent(Channel channel, Object message) throws RemotingException { handler.sent(channel, message); }
這個和官方文檔仍是有必定出入的。
對應事件派件器:ExecutionDispatcher,其配置值:execution,從其源碼的實現來看,與AllDispatcher實現基本相似,惟一的區別是,若是executor線程池爲空時,並不會使用共享線程池,目前我還想不出什麼狀況下,線程池會初始化失敗。
直接派發,也就是全部的事件所有在IO線程中執行,故其實現很是簡單:
public class DirectDispatcher implements Dispatcher { public static final String NAME = "direct"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return handler; } }
事件派發器:只有請求事件在線程池中執行,其餘響應事件、心跳,鏈接,斷開鏈接等事件在IO線程上執行,故其只須要重寫recive方法便可:
@Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
事件派發器:鏈接、斷開鏈接事件排隊執行,並可經過connect.queue.capacity屬性設置隊列長度,請求事件、異常事件在線程池中執行。
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME There's no place to release connectionExecutor! queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); }
重點關注一下connectionExecutor ,用來執行鏈接、斷開事件的線程池,線程池中只有一個線程,而且隊列能夠選擇時有界隊列,經過connect.queue.capacity屬性配置,超過的事件,則拒絕執行。
public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } }
檢查隊列長度,若是超過警告值,則輸出警告信息,而後提交鏈接線程池中執行,disconnected事件相似。其餘received、caught事件,則與AllDispatcher相同,就不在重複。
總結:本文主要是分析闡述了Dubbo Dispatch 機制,但與官方文檔存在出入,先概括以下: Dispatch:全部的sent事件方法、心跳請求所有在IO線程上執行。
>做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區優秀佈道師、CSDN2019博客之星TOP10,維護公衆號:中間件興趣圈目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接加入中間件知識星球 ,一塊兒探討高併發、分佈式服務架構,交流源碼。 </runnable>