Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (二)

目錄

源碼之下無祕密 ── 作最好的 Netty 源碼分析教程前端


接上篇 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

ChannelHandler 的名字

咱們注意到, pipeline.addXXX 都有一個重載的方法, 例如 addLast, 它有一個重載的版本是:

ChannelPipeline addLast(String name, ChannelHandler handler);

第一個參數指定了所添加的 handler 的名字(更準確地說是 ChannelHandlerContext 的名字, 不過咱們一般是以 handler 做爲敘述的對象, 所以說成 handler 的名字便於理解). 那麼 handler 的名字有什麼用呢? 若是咱們不設置name, 那麼 handler 會有怎樣的名字?
爲了解答這些疑惑, 老規矩, 依然是從源碼中找到答案.
咱們仍是以 addLast 方法爲例:

@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}

這個方法會調用重載的 addLast 方法:

@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name);

        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }

    return this;
}

第一個參數被設置爲 null, 咱們不關心它. 第二參數就是這個 handler 的名字. 看代碼可知, 在添加一個 handler 以前, 須要調用 checkDuplicateName 方法來肯定此 handler 的名字是否和已添加的 handler 的名字重複. 而這個 checkDuplicateName 方法咱們在前面已經有提到, 這裏再回顧一下:

private void checkDuplicateName(String name) {
    if (name2ctx.containsKey(name)) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

Netty 判斷一個 handler 的名字是否重複的依據很簡單: DefaultChannelPipeline 中有一個 類型爲 Map<String, AbstractChannelHandlerContext> 的 name2ctx 字段, 它的 key 是一個 handler 的名字, 而 value 則是這個 handler 所對應的 ChannelHandlerContext. 每當新添加一個 handler 時, 就會 put 到 name2ctx 中. 所以檢查 name2ctx 中是否包含這個 name 便可.
當沒有重名的 handler 時, 就爲這個 handler 生成一個關聯的 DefaultChannelHandlerContext 對象, 而後就將 name 和 newCtx 做爲 key-value 對 放到 name2Ctx 中.

自動生成 handler 的名字

若是咱們調用的是以下的 addLast 方法

ChannelPipeline addLast(ChannelHandler... handlers);

那麼 Netty 會調用 generateName 爲咱們的 handler 自動生成一個名字:

private String generateName(ChannelHandler handler) {
    WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)];
    Class<?> handlerType = handler.getClass();
    String name;
    synchronized (cache) {
        name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }
    }

    synchronized (this) {
        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
        if (name2ctx.containsKey(name)) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (!name2ctx.containsKey(newName)) {
                    name = newName;
                    break;
                }
            }
        }
    }

    return name;
}

而 generateName 會接着調用 generateName0 來實際產生一個 handler 的名字:

private static String generateName0(Class<?> handlerType) {
    return StringUtil.simpleClassName(handlerType) + "#0";
}

自動生成的名字的規則很簡單, 就是 handler 的簡單類名加上 "#0", 所以咱們的 EchoClientHandler 的名字就是 "EchoClientHandler#0", 這一點也能夠經過調試窗口佐證:

clipboard.png

關於 Pipeline 的事件傳輸機制

前面章節中, 咱們知道 AbstractChannelHandlerContext 中有 inbound 和 outbound 兩個 boolean 變量, 分別用於標識 Context 所對應的 handler 的類型, 即:

  • inbound 爲真時, 表示對應的 ChannelHandler 實現了 ChannelInboundHandler 方法.

  • outbound 爲真時, 表示對應的 ChannelHandler 實現了 ChannelOutboundHandler 方法.

讀者朋友確定很疑惑了吧: 那究竟這兩個字段有什麼做用呢? 其實這還要從 ChannelPipeline 的傳輸的事件類型提及.
Netty 的事件能夠分爲 Inbound 和 Outbound 事件.

以下是從 Netty 官網上拷貝的一個圖示:

I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

從上圖能夠看出, inbound 事件和 outbound 事件的流向是不同的, inbound 事件的流行是從下至上, 而 outbound 恰好相反, 是從上到下. 而且 inbound 的傳遞方式是經過調用相應的 ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是經過調用 ChannelHandlerContext.OUT_EVT() 方法. 例如 ChannelHandlerContext.fireChannelRegistered() 調用會發送一個 ChannelRegistered 的 inbound 給下一個ChannelHandlerContext, 而 ChannelHandlerContext.bind 調用會發送一個 bind 的 outbound 事件給 下一個 ChannelHandlerContext.

Inbound 事件傳播方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

Oubound 事件傳輸方法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)

注意, 若是咱們捕獲了一個事件, 而且想讓這個事件繼續傳遞下去, 那麼須要調用 Context 相應的傳播方法.
例如:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Connected!");
        ctx.fireChannelActive();
    }
}

public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("Closing ..");
        ctx.close(promise);
    }
}

上面的例子中, MyInboundHandler 收到了一個 channelActive 事件, 它在處理後, 若是但願將事件繼續傳播下去, 那麼須要接着調用 ctx.fireChannelActive().

Outbound 操做(outbound operations of a channel)

Outbound 事件都是請求事件(request event), 即請求某件事情的發生, 而後經過 Outbound 事件進行通知.
Outbound 事件的傳播方向是 tail -> customContext -> head.

咱們接下來以 connect 事件爲例, 分析一下 Outbound 事件的傳播機制.
首先, 當用戶調用了 Bootstrap.connect 方法時, 就會觸發一個 Connect 請求事件, 此調用會觸發以下調用鏈:

Bootstrap.connect -> Bootstrap.doConnect -> Bootstrap.doConnect0 -> AbstractChannel.connect

繼續跟蹤的話, 咱們就發現, AbstractChannel.connect 其實由調用了 DefaultChannelPipeline.connect 方法:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

而 pipeline.connect 的實現以下:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

能夠看到, 當 outbound 事件(這裏是 connect 事件)傳遞到 Pipeline 後, 它實際上是以 tail 爲起點開始傳播的.
而 tail.connect 其實調用的是 AbstractChannelHandlerContext.connect 方法:

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    ...
    next.invokeConnect(remoteAddress, localAddress, promise);
    ...
    return promise;
}

findContextOutbound() 顧名思義, 它的做用是以當前 Context 爲起點, 向 Pipeline 中的 Context 雙向鏈表的前端尋找第一個 outbound 屬性爲真的 Context(即關聯着 ChannelOutboundHandler 的 Context), 而後返回.
它的實現以下:

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

當咱們找到了一個 outbound 的 Context 後, 就調用它的 invokeConnect 方法, 這個方法中會調用 Context 所關聯着的 ChannelHandler 的 connect 方法:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

若是用戶沒有重寫 ChannelHandler 的 connect 方法, 那麼會調用 ChannelOutboundHandlerAdapter 所實現的方法:

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception {
    ctx.connect(remoteAddress, localAddress, promise);
}

咱們看到, ChannelOutboundHandlerAdapter.connect 僅僅調用了 ctx.connect, 而這個調用又回到了:

Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect

這樣的循環中, 直到 connect 事件傳遞到DefaultChannelPipeline 的雙向鏈表的頭節點, 即 head 中. 爲何會傳遞到 head 中呢? 回想一下, head 實現了 ChannelOutboundHandler, 所以它的 outbound 屬性是 true.
由於 head 自己既是一個 ChannelHandlerContext, 又實現了 ChannelOutboundHandler 接口, 所以當 connect 消息傳遞到 head 後, 會將消息轉遞到對應的 ChannelHandler 中處理, 而剛好, head 的 handler() 返回的就是 head 自己:

@Override
public ChannelHandler handler() {
    return this;
}

所以最終 connect 事件是在 head 中處理的. head 的 connect 事件處理方法以下:

@Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

到這裏, 整個 Connect 請求事件就結束了.
下面以一幅圖來描述一個整個 Connect 請求事件的處理過程:

clipboard.png
點此有無碼高清原圖

咱們僅僅以 Connect 請求事件爲例, 分析了 Outbound 事件的傳播過程, 可是其實全部的 outbound 的事件傳播都遵循着同樣的傳播規律, 讀者能夠試着分析一下其餘的 outbound 事件, 體會一下它們的傳播過程.

Inbound 事件

Inbound 事件和 Outbound 事件的處理過程有點鏡像.
Inbound 事件是一個通知事件, 即某件事已經發生了, 而後經過 Inbound 事件進行通知. Inbound 一般發生在 Channel 的狀態的改變或 IO 事件就緒.
Inbound 的特色是它傳播方向是 head -> customContext -> tail.

既然上面咱們分析了 Connect 這個 Outbound 事件, 那麼接着分析 Connect 事件後會發生什麼 Inbound 事件, 並最終找到 Outbound 和 Inbound 事件之間的聯繫.

當 Connect 這個 Outbound 傳播到 unsafe 後, 實際上是在 AbstractNioUnsafe.connect 方法中進行處理的:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    if (doConnect(remoteAddress, localAddress)) {
        fulfillConnectPromise(promise, wasActive);
    } else {
        ...
    }
    ...
}

在 AbstractNioUnsafe.connect 中, 首先調用 doConnect 方法進行實際上的 Socket 鏈接, 當鏈接上後, 會調用 fulfillConnectPromise 方法:

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    ...
    // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
    // because what happened is what happened.
    if (!wasActive && isActive()) {
        pipeline().fireChannelActive();
    }
    ...
}

咱們看到, 在 fulfillConnectPromise 中, 會經過調用 pipeline().fireChannelActive() 將通道激活的消息(即 Socket 鏈接成功)發送出去.
而這裏, 當調用 pipeline.fireXXX 後, 就是 Inbound 事件的起點.
所以當調用了 pipeline().fireChannelActive() 後, 就產生了一個 ChannelActive Inbound 事件, 咱們就從這裏開始看看這個 Inbound 事件是怎麼傳播的吧.

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}

哈哈, 果真, 在 fireChannelActive 方法中, 調用的是 head.fireChannelActive, 所以能夠證實了, Inbound 事件在 Pipeline 中傳輸的起點是 head.
那麼, 在 head.fireChannelActive() 中又作了什麼呢?

@Override
public ChannelHandlerContext fireChannelActive() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    ...
    next.invokeChannelActive();
    ...
    return this;
}

上面的代碼應該很熟悉了吧. 回想一下在 Outbound 事件(例如 Connect 事件)的傳輸過程當中時, 咱們也有相似的操做:

  • 首先調用 findContextInbound, 從 Pipeline 的雙向鏈表中中找到第一個屬性 inbound 爲真的 Context, 而後返回

  • 調用這個 Context 的 invokeChannelActive

invokeChannelActive 方法以下:

private void invokeChannelActive() {
    try {
        ((ChannelInboundHandler) handler()).channelActive(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

這個方法和 Outbound 的對應方法(例如 invokeConnect) 一模一樣. 同 Outbound 同樣, 若是用戶沒有重寫 channelActive 方法, 那麼會調用 ChannelInboundHandlerAdapter 的 channelActive 方法:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
}

一樣地, 在 ChannelInboundHandlerAdapter.channelActive 中, 僅僅調用了 ctx.fireChannelActive 方法, 所以就會有以下循環:

Context.fireChannelActive -> Connect.findContextInbound -> nextContext.invokeChannelActive -> nextHandler.channelActive -> nextContext.fireChannelActive

這樣的循環中. 同理, tail 自己 既實現了 ChannelInboundHandler 接口, 又實現了 ChannelHandlerContext 接口, 所以當 channelActive 消息傳遞到 tail 後, 會將消息轉遞到對應的 ChannelHandler 中處理, 而剛好, tail 的 handler() 返回的就是 tail 自己:

@Override
public ChannelHandler handler() {
    return this;
}

所以 channelActive Inbound 事件最終是在 tail 中處理的, 咱們看一下它的處理方法:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }

TailContext.channelActive 方法是空的. 若是讀者自行查看 TailContext 的 Inbound 處理方法時, 會發現, 它們的實現都是空的. 可見, 若是是 Inbound, 當用戶沒有實現自定義的處理器時, 那麼默認是不處理的.

用一幅圖來總結一下 Inbound 的傳輸過程吧:

clipboard.png

點擊此能夠看高清無碼原圖

總結

對於 Outbound事件:

  • Outbound 事件是請求事件(由 Connect 發起一個請求, 並最終由 unsafe 處理這個請求)

  • Outbound 事件的發起者是 Channel

  • Outbound 事件的處理者是 unsafe

  • Outbound 事件在 Pipeline 中的傳輸方向是 tail -> head.

  • 在 ChannelHandler 中處理事件時, 若是這個 Handler 不是最後一個 Hnalder, 則須要調用 ctx.xxx (例如 ctx.connect) 將此事件繼續傳播下去. 若是不這樣作, 那麼此事件的傳播會提早終止.

  • Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT

對於 Inbound 事件:

  • Inbound 事件是通知事件, 當某件事情已經就緒後, 通知上層.

  • Inbound 事件發起者是 unsafe

  • Inbound 事件的處理者是 Channel, 若是用戶沒有實現自定義的處理方法, 那麼Inbound 事件默認的處理者是 TailContext, 而且其處理方法是空實現.

  • Inbound 事件在 Pipeline 中傳輸方向是 head -> tail

  • 在 ChannelHandler 中處理事件時, 若是這個 Handler 不是最後一個 Hnalder, 則須要調用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 將此事件繼續傳播下去. 若是不這樣作, 那麼此事件的傳播會提早終止.

  • Outbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT

outbound 和 inbound 事件十分的鏡像, 而且 Context 與 Handler 直接的調用關係是否容易混淆, 所以讀者在閱讀這裏的源碼時, 須要特別的注意.

本文由 yongshun 發表於我的博客, 採用 署名-相同方式共享 3.0 中國大陸許可協議.
Email: yongshun1228@gmail.com
本文標題爲: Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (二)
本文連接爲: http://www.javashuo.com/article/p-hkcoeyiy-gz.html

相關文章
相關標籤/搜索