6.ChannelPipeline

pipeline和handler

ChannelPipline

pipeline能夠譯爲管道、流水線,正如工廠的流水線同樣,ChannelPipline將各類handler串聯起來,將IO事件在這些handler中進行傳播,每一個handler負責一部分邏輯。從ChannelPipeline接口定義的方法能夠看出來,它是一個雙向鏈表,處理過程相似於JavaWeb中的filter。這種責任鏈模式的設計不只有利於解耦,還能動態調整pipeline中的handler,這一點在前文中的channelInitializerHandler已經有所體現。spring

ChannelHandler

handler指的是ChannelHandler接口及其子類,是處理讀寫事件的類,也是實際開發時主要編寫的類。ChannelHandler做爲跟藉口,定義了3個方法和一個註解。數據結構

public interface ChannelHandler {

    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    @interface Sharable {}
}

從方法的名字不難理解這3個方法分別在handler被添加、移除、拋出異常時回調觸發。而@Sharable註解代表某個handler實例能夠被多個pipeline共享(也即多個channel共享)。
通過pipeline的後,handler處理過的事件會做爲臨近handler的事件入口。netty將事件分紅了入站事件和出站事件,這裏的入和出是相對於netty所屬的應用程序而言的,通常來講,由外部觸發的事件是inbound事件,而outbound事件是由應用程序主動請求而觸發的事件。相應的,handler也被分紅inBoundHandler和outBoundHandler兩種。顧名思義,inBoundHandler只會處理inBound事件,outBoundHandler只會處理outBound事件。具體的入站和出站事件能夠參考ChannelInboundHandler和ChannelOutboundHandler2個接口各自定義的方法。ide

// inbound事件
fireChannelRegistered()
fireChannelActive()
fireChannelRead(Object)
fireChannelReadComplete()
fireExceptionCaught()
fireUserEventTriggered()
fireChannelWritabilityChanged()
fireChannelInactive()
fireChannelUnregistered()

// outbound事件
bind()
connect()
write()
flush()
read()
disconnect()
close()
deregister()

在上述事件中,別的事件都容易理解,惟獨read這個事件出現了3次,容易混淆,因此單獨拿出來提一下。
fireChannelRead(Object)和FireChannelReadComplete屬於inBound事件,而read屬於outBound事件,這代表,read事件是應用程序主動觸發的事件。在ChannelOutBoundInvoker關於read方法的註釋中也提到,請求將channel中的數據讀入第一個inbound緩衝區,而後根據是否還有數據來決定觸發channelRead(Object)和channelReadComplete。函數

ChannelHandlerContext

爲了使handler類更關注於實際對數據的邏輯處理,netty將handler與pipeline關聯的過程交由ChannelHandlerContext完成。熟悉鏈表數據結構的都知道,鏈表的每個節點都包含數據域和指針域,顯然,handler和handlerContext的關係就像數據域和指針域。但context不只僅只是一個指針域,從它的接口定義能夠看出來,hannelHandlerContext一方面將handler包裹起來,繼而進行inbound和outbound事件的傳播,另外一方面繼承於attributeMap的attr方法也令其能夠自定義一些屬性(已經被廢棄,轉而使用handler的attr方法)。此外,context還能夠爲handler賦予名稱、獲取內存分配器,它還持有pipeline的引用,以便在必要時刻從頭尾指針從新開始處理。oop

ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker{...}

pipeline的初始化

對以上3個類有概述性的瞭解後,咱們先看一下pipeline是如何初始化的。
在channel初始化時,channel的構造函數初始化了一個pipeline。this

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

能夠看到pipeline在初始化時,添加了Tail和Head2個ChannelHandlerContext,且將這2個節點做爲哨兵節點,組成雙向鏈表這樣一個數據結構。
兩個哨兵的繼承關係以下線程

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {...}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
    private final Unsafe unsafe;
    ...
}

能夠看到tail節點只是InboundHandler,而head節點既是InboundHandler又是OutboundHandler。tailContext一般作的是一個收尾的工做,好比異常沒有捕獲,傳遞到tail,就會打印日誌等等、釋放內存等等;而headContext持有一個Unsafe對象,在前文說過,unsafe是實現底層數據讀寫的一個類,也所以,head在處理inbount事件時,會原封不動的往下傳播,而處理outbound事件時,會委託給unsafe進行處理。
這裏還有一個小細節。在傳播事件時須要判斷下一個handler是否能夠處理這個事件,netty因而將各類事件用位圖的形式區分,採用這種方式大大節省判斷操做所須要的額外空間。
// ChannelHandlerMask類定義的部分事件位運算設計

static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;

利用掩碼判斷handler處理對應事件指針

do {
    ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);

handler的添加和刪除

handler在調用pipeline的addXXX系列方法裏添加,以addLast(ChannelHandler... handlers)方法爲例,默認狀況下,該方法會重載到addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法,默認狀況下group和name均爲null日誌

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

總的來講能夠分爲3個步驟:

  1. 檢查handler是否重複添加,主要是經過handler的@Sharable註解和added字段判斷;
  2. 建立HandlerContext,並添加到鏈表中。
  3. 回調handlerAdded方法。

handler的刪除相似,先經過參數找到對應的handler,而後刪除鏈表中的context節點,最後回調handlerRemove方法。

handler的傳播順序

因爲採用了責任鏈模式,鏈表節點之間的順序就顯得很是重要了,先看一下inbound事件是如何在pipeline中傳播的

inbount事件的傳播

inbound以AbstractChannelHandlerContext中的fireChannelRead(Object)方法爲例。

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

能夠看出,fireChannelRead作了2件事

  1. 經過事件對應的掩碼找到下一個inboundHandler
  2. 將本節點處理好的數據傳播給下一個inboundHandler
// 步驟1
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}    
// 步驟2
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(msg);
            }
        });
    }
}

步驟1的實現是不斷經過context的executionMask與事件掩碼作與運算,直到與的結果不爲0。這代表該context對應的handler具有處理對應事件的能力。此外要注意循環過程當中,context是next方向。
步驟2則判斷當前線程是不是eventLoop線程,如果,則執行下一個inboundHandlerContext的invokeChannelRead方法,若不是則添加到任務隊列裏,待eventLoop線程來執行
至於invokeChannelRead方法也很簡單,先判斷該handler是否已存在於pipeline,而後調用handler的channelRead方法。

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}
// 判斷是否添加到pipeline中或即將添加到pipeline中
private boolean invokeHandler() {
    int handlerState = this.handlerState;
    return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}

outbound事件傳播與inbound相似,只是在經過掩碼查詢下一個outboundHandler時爲prev方向,與inbound相反。具體代碼略過。

pipeline與context調用傳播方法的區別

pipeline.fireChannelRead()和ChannelHandlerContext.fireChannelRead()在代碼中都時常出現,那麼它們的區別是什麼?
不妨看一下DefaultChannelPipeline的fireChannelRead方法。

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
}

能夠看出,其將headContext做爲參數傳入,調用了HandlerContext的invokeChannelRead(AbstractChannelHandlerContext, Object)靜態方法,這個靜態方法會調用傳入的HandlerContext的invokeChannelRead(Object)方法,繼而調用Context內部持有的ChannelInboundHandler的channelRead(ChannelHandlerContext, Object)方法。這個方法由子類重寫,在這裏就是HeadContext重寫的方法,它調用傳入的ChannelHandlerContext,繼續往下傳播。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.fireChannelRead(msg);
}

而DefaultChannelPipeline的read方法則調用tail的read方法,tail會傳播給它的前一個節點。
小結
pipeline調用傳播方法時,如果inbound事件,從head開始往tail方向傳播,如果outbound事件,從tail開始往head方向傳播
context調用傳播方法,如果inbound事件,從當前context節點開始往tail方向傳播,如果outbound事件,從當前context節點開始往head方向傳播

異常的傳播

異常的傳播路徑

在context處理各類事件時,用了channelRead的例子。能夠注意到invokeChannelRead方法實現用了一個try-catch的寫法。當拋出異常時,會調用notifyHandlerException(Throwable),代碼以下:

private void notifyHandlerException(Throwable cause) {
    if (inExceptionCaught(cause)) {
        if (logger.isWarnEnabled()) {
            logger.warn(
                    "An exception was thrown by a user handler " +
                            "while handling an exceptionCaught event", cause);
        }
        return;
    }
    invokeExceptionCaught(cause);
}

首先調用inExceptionCaught,判斷異常是否發生在exceptionCaught方法內。如果,則打印警告日誌後直接返回,不然調用invokeExceptionCaught(Throwable)方法。該方法會調用handler複寫的exceptionCaught方法。
若複寫方法調用了ChannelHandlerContext.fireExceptionCaught方法,則異常會繼續往下傳播,不論下一個節點是inbound仍是outbound。若一直傳播到tail,則會打印一個日誌,並釋放異常佔用的內存。

異常優雅處理

在springMvc體系中,一般會有一個包含ControllerAdvice註解的類統一進行異常的處理,在netty中,也能夠在pipeline的末尾添加一個異常處理handler統一進行異常處理。甚至能夠用策略模式,對不一樣異常類進行分門別類的處理。

相關文章
相關標籤/搜索