pipeline能夠譯爲管道、流水線,正如工廠的流水線同樣,ChannelPipline將各類handler串聯起來,將IO事件在這些handler中進行傳播,每一個handler負責一部分邏輯。從ChannelPipeline接口定義的方法能夠看出來,它是一個雙向鏈表,處理過程相似於JavaWeb中的filter。這種責任鏈模式的設計不只有利於解耦,還能動態調整pipeline中的handler,這一點在前文中的channelInitializerHandler已經有所體現。spring
handler指的是ChannelHandler接口及其子類,是處理讀寫事件的類,也是實際開發時主要編寫的類。ChannelHandler做爲跟藉口,定義了3個方法和一個註解。數據結構
public interface ChannelHandler { void handlerAdded(ChannelHandlerContext ctx) throws Exception; void handlerRemoved(ChannelHandlerContext ctx) throws Exception; void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; @interface Sharable {} }
從方法的名字不難理解這3個方法分別在handler被添加、移除、拋出異常時回調觸發。而@Sharable註解代表某個handler實例能夠被多個pipeline共享(也即多個channel共享)。
通過pipeline的後,handler處理過的事件會做爲臨近handler的事件入口。netty將事件分紅了入站事件和出站事件,這裏的入和出是相對於netty所屬的應用程序而言的,通常來講,由外部觸發的事件是inbound事件,而outbound事件是由應用程序主動請求而觸發的事件。相應的,handler也被分紅inBoundHandler和outBoundHandler兩種。顧名思義,inBoundHandler只會處理inBound事件,outBoundHandler只會處理outBound事件。具體的入站和出站事件能夠參考ChannelInboundHandler和ChannelOutboundHandler2個接口各自定義的方法。ide
// inbound事件 fireChannelRegistered() fireChannelActive() fireChannelRead(Object) fireChannelReadComplete() fireExceptionCaught() fireUserEventTriggered() fireChannelWritabilityChanged() fireChannelInactive() fireChannelUnregistered() // outbound事件 bind() connect() write() flush() read() disconnect() close() deregister()
在上述事件中,別的事件都容易理解,惟獨read這個事件出現了3次,容易混淆,因此單獨拿出來提一下。
fireChannelRead(Object)和FireChannelReadComplete屬於inBound事件,而read屬於outBound事件,這代表,read事件是應用程序主動觸發的事件。在ChannelOutBoundInvoker關於read方法的註釋中也提到,請求將channel中的數據讀入第一個inbound緩衝區,而後根據是否還有數據來決定觸發channelRead(Object)和channelReadComplete。函數
爲了使handler類更關注於實際對數據的邏輯處理,netty將handler與pipeline關聯的過程交由ChannelHandlerContext完成。熟悉鏈表數據結構的都知道,鏈表的每個節點都包含數據域和指針域,顯然,handler和handlerContext的關係就像數據域和指針域。但context不只僅只是一個指針域,從它的接口定義能夠看出來,hannelHandlerContext一方面將handler包裹起來,繼而進行inbound和outbound事件的傳播,另外一方面繼承於attributeMap的attr方法也令其能夠自定義一些屬性(已經被廢棄,轉而使用handler的attr方法)。此外,context還能夠爲handler賦予名稱、獲取內存分配器,它還持有pipeline的引用,以便在必要時刻從頭尾指針從新開始處理。oop
ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker{...}
對以上3個類有概述性的瞭解後,咱們先看一下pipeline是如何初始化的。
在channel初始化時,channel的構造函數初始化了一個pipeline。this
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
能夠看到pipeline在初始化時,添加了Tail和Head2個ChannelHandlerContext,且將這2個節點做爲哨兵節點,組成雙向鏈表這樣一個數據結構。
兩個哨兵的繼承關係以下線程
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {...} final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; ... }
能夠看到tail節點只是InboundHandler,而head節點既是InboundHandler又是OutboundHandler。tailContext一般作的是一個收尾的工做,好比異常沒有捕獲,傳遞到tail,就會打印日誌等等、釋放內存等等;而headContext持有一個Unsafe對象,在前文說過,unsafe是實現底層數據讀寫的一個類,也所以,head在處理inbount事件時,會原封不動的往下傳播,而處理outbound事件時,會委託給unsafe進行處理。
這裏還有一個小細節。在傳播事件時須要判斷下一個handler是否能夠處理這個事件,netty因而將各類事件用位圖的形式區分,採用這種方式大大節省判斷操做所須要的額外空間。
// ChannelHandlerMask類定義的部分事件位運算設計
static final int MASK_EXCEPTION_CAUGHT = 1; static final int MASK_CHANNEL_REGISTERED = 1 << 1; static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; static final int MASK_CHANNEL_ACTIVE = 1 << 3;
利用掩碼判斷handler處理對應事件指針
do { ctx = ctx.prev; } while ((ctx.executionMask & mask) == 0);
handler在調用pipeline的addXXX系列方法裏添加,以addLast(ChannelHandler... handlers)方法爲例,默認狀況下,該方法會重載到addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法,默認狀況下group和name均爲null日誌
@Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }
總的來講能夠分爲3個步驟:
handler的刪除相似,先經過參數找到對應的handler,而後刪除鏈表中的context節點,最後回調handlerRemove方法。
因爲採用了責任鏈模式,鏈表節點之間的順序就顯得很是重要了,先看一下inbound事件是如何在pipeline中傳播的
inbound以AbstractChannelHandlerContext中的fireChannelRead(Object)方法爲例。
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; }
能夠看出,fireChannelRead作了2件事
// 步驟1 private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while ((ctx.executionMask & mask) == 0); return ctx; } // 步驟2 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(msg); } }); } }
步驟1的實現是不斷經過context的executionMask與事件掩碼作與運算,直到與的結果不爲0。這代表該context對應的handler具有處理對應事件的能力。此外要注意循環過程當中,context是next方向。
步驟2則判斷當前線程是不是eventLoop線程,如果,則執行下一個inboundHandlerContext的invokeChannelRead方法,若不是則添加到任務隊列裏,待eventLoop線程來執行
至於invokeChannelRead方法也很簡單,先判斷該handler是否已存在於pipeline,而後調用handler的channelRead方法。
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } // 判斷是否添加到pipeline中或即將添加到pipeline中 private boolean invokeHandler() { int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); }
outbound事件傳播與inbound相似,只是在經過掩碼查詢下一個outboundHandler時爲prev方向,與inbound相反。具體代碼略過。
pipeline.fireChannelRead()和ChannelHandlerContext.fireChannelRead()在代碼中都時常出現,那麼它們的區別是什麼?
不妨看一下DefaultChannelPipeline的fireChannelRead方法。
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); }
能夠看出,其將headContext做爲參數傳入,調用了HandlerContext的invokeChannelRead(AbstractChannelHandlerContext, Object)靜態方法,這個靜態方法會調用傳入的HandlerContext的invokeChannelRead(Object)方法,繼而調用Context內部持有的ChannelInboundHandler的channelRead(ChannelHandlerContext, Object)方法。這個方法由子類重寫,在這裏就是HeadContext重寫的方法,它調用傳入的ChannelHandlerContext,繼續往下傳播。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); }
而DefaultChannelPipeline的read方法則調用tail的read方法,tail會傳播給它的前一個節點。
小結
pipeline調用傳播方法時,如果inbound事件,從head開始往tail方向傳播,如果outbound事件,從tail開始往head方向傳播
context調用傳播方法,如果inbound事件,從當前context節點開始往tail方向傳播,如果outbound事件,從當前context節點開始往head方向傳播
在context處理各類事件時,用了channelRead的例子。能夠注意到invokeChannelRead方法實現用了一個try-catch的寫法。當拋出異常時,會調用notifyHandlerException(Throwable),代碼以下:
private void notifyHandlerException(Throwable cause) { if (inExceptionCaught(cause)) { if (logger.isWarnEnabled()) { logger.warn( "An exception was thrown by a user handler " + "while handling an exceptionCaught event", cause); } return; } invokeExceptionCaught(cause); }
首先調用inExceptionCaught,判斷異常是否發生在exceptionCaught方法內。如果,則打印警告日誌後直接返回,不然調用invokeExceptionCaught(Throwable)方法。該方法會調用handler複寫的exceptionCaught方法。
若複寫方法調用了ChannelHandlerContext.fireExceptionCaught方法,則異常會繼續往下傳播,不論下一個節點是inbound仍是outbound。若一直傳播到tail,則會打印一個日誌,並釋放異常佔用的內存。
在springMvc體系中,一般會有一個包含ControllerAdvice註解的類統一進行異常的處理,在netty中,也能夠在pipeline的末尾添加一個異常處理handler統一進行異常處理。甚至能夠用策略模式,對不一樣異常類進行分門別類的處理。