Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

目錄

源碼之下無祕密 ── 作最好的 Netty 源碼分析教程java

此文章已同步發送到個人 github

前言

這篇是 Netty 源碼分析 的第二篇, 在這篇文章中, 我會爲讀者詳細地分析 Netty 中的 ChannelPipeline 機制.

Channel 與 ChannelPipeline

相信你們都知道了, 在 Netty 中每一個 Channel 都有且僅有一個 ChannelPipeline 與之對應, 它們的組成關係以下:

clipboard.png

經過上圖咱們能夠看到, 一個 Channel 包含了一個 ChannelPipeline, 而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向鏈表. 這個鏈表的頭是 HeadContext, 鏈表的尾是 TailContext, 而且每一個 ChannelHandlerContext 中又關聯着一個 ChannelHandler.
上面的圖示給了咱們一個對 ChannelPipeline 的直觀認識, 可是實際上 Netty 實現的 Channel 是否真的是這樣的呢? 咱們繼續用源碼說話.

在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭 中, 咱們已經知道了一個 Channel 的初始化的基本過程, 下面咱們再回顧一下.
下面的代碼是 AbstractChannel 構造器:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

AbstractChannel 有一個 pipeline 字段, 在構造器中會初始化它爲 DefaultChannelPipeline的實例. 這裏的代碼就印證了一點: 每一個 Channel 都有一個 ChannelPipeline.
接着咱們跟蹤一下 DefaultChannelPipeline 的初始化過程.
首先進入到 DefaultChannelPipeline 構造器中:

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

在 DefaultChannelPipeline 構造器中, 首先將與之關聯的 Channel 保存到字段 channel 中, 而後實例化兩個 ChannelHandlerContext, 一個是 HeadContext 實例 head, 另外一個是 TailContext 實例 tail. 接着將 head 和 tail 互相指向, 構成一個雙向鏈表.
特別注意到, 咱們在開始的示意圖中, head 和 tail 並無包含 ChannelHandler, 這是由於 HeadContext 和 TailContext 繼承於 AbstractChannelHandlerContext 的同時也實現了 ChannelHandler 接口了, 所以它們有 Context 和 Handler 的雙重屬性.

ChannelPipeline 的初始化再探

在第一章的時候, 咱們已經對 ChannelPipeline 的初始化有了一個大體的瞭解, 不過當時重點畢竟不在 ChannelPipeline 這裏, 所以沒有深刻地分析它的初始化過程. 那麼下面咱們就來看一下具體的 ChannelPipeline 的初始化都作了哪些工做吧.

ChannelPipeline 實例化過程

咱們再來回顧一下, 在實例化一個 Channel 時, 會伴隨着一個 ChannelPipeline 的實例化, 而且此 Channel 會與這個 ChannelPipeline 相互關聯, 這一點能夠經過NioSocketChannel 的父類 AbstractChannel 的構造器予以佐證:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

當實例化一個 Channel(這裏以 EchoClient 爲例, 那麼 Channel 就是 NioSocketChannel), 其 pipeline 字段就是咱們新建立的 DefaultChannelPipeline 對象, 那麼咱們就來看一下 DefaultChannelPipeline 的構造方法吧:

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

能夠看到, 在 DefaultChannelPipeline 的構造方法中, 將傳入的 channel 賦值給字段 this.channel, 接着又實例化了兩個特殊的字段: tailhead. 這兩個字段是一個雙向鏈表的頭和尾. 其實在 DefaultChannelPipeline 中, 維護了一個以 AbstractChannelHandlerContext 爲節點的雙向鏈表, 這個鏈表是 Netty 實現 Pipeline 機制的關鍵.
再回顧一下 head 和 tail 的類層次結構:

clipboard.png

clipboard.png

從類層次結構圖中能夠很清楚地看到, head 實現了 ChannelInboundHandler, 而 tail 實現了 ChannelOutboundHandler 接口, 而且它們都實現了 ChannelHandlerContext 接口, 所以能夠說 head 和 tail 便是一個 ChannelHandler, 又是一個 ChannelHandlerContext.

接着看一下 HeadContext 的構造器:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}

它調用了父類 AbstractChannelHandlerContext 的構造器, 並傳入參數 inbound = false, outbound = true.
TailContext 的構造器與 HeadContext 的相反, 它調用了父類 AbstractChannelHandlerContext 的構造器, 並傳入參數 inbound = true, outbound = false.
即 header 是一個 outboundHandler, 而 tail 是一個inboundHandler, 關於這一點, 你們要特別注意, 由於在後面的分析中, 咱們會反覆用到 inbound 和 outbound 這兩個屬性.

ChannelInitializer 的添加

上面一小節中, 咱們已經分析了 Channel 的組成, 其中咱們瞭解到, 最開始的時候 ChannelPipeline 中含有兩個 ChannelHandlerContext(同時也是 ChannelHandler), 可是這個 Pipeline並不能實現什麼特殊的功能, 由於咱們尚未給它添加自定義的 ChannelHandler.
一般來講, 咱們在初始化 Bootstrap, 會添加咱們自定義的 ChannelHandler, 就以咱們熟悉的 EchoClient 來舉例吧:

Bootstrap b = new Bootstrap();
b.group(group)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });

上面代碼的初始化過程, 相信你們都不陌生. 在調用 handler 時, 傳入了 ChannelInitializer 對象, 它提供了一個 initChannel 方法供咱們初始化 ChannelHandler. 那麼這個初始化過程是怎樣的呢? 下面咱們就來揭開它的神祕面紗.

ChannelInitializer 實現了 ChannelHandler, 那麼它是在何時添加到 ChannelPipeline 中的呢? 進行了一番搜索後, 咱們發現它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的.
其代碼以下:

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(handler());
    ...
}

上面的代碼將 handler() 返回的 ChannelHandler 添加到 Pipeline 中, 而 handler() 返回的是handler 其實就是咱們在初始化 Bootstrap 調用 handler 設置的 ChannelInitializer 實例, 所以這裏就是將 ChannelInitializer 插入到了 Pipeline 的末端.
此時 Pipeline 的結構以下圖所示:

clipboard.png

有朋友可能就有疑惑了, 我明明插入的是一個 ChannelInitializer 實例, 爲何在 ChannelPipeline 中的雙向鏈表中的元素倒是一個 ChannelHandlerContext? 爲了解答這個問題, 咱們繼續在代碼中尋找答案吧.
咱們剛纔提到, 在 Bootstrap.init 中會調用 p.addLast() 方法, 將 ChannelInitializer 插入到鏈表末端:

@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name); // 檢查此 handler 是否有重複的名字

        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }

    return this;
}

addLast 有不少重載的方法, 咱們關注這個比較重要的方法就能夠了.
上面的 addLast 方法中, 首先檢查這個 ChannelHandler 的名字是不是重複的, 若是不重複的話, 則爲這個 Handler 建立一個對應的 DefaultChannelHandlerContext 實例, 並與之關聯起來(Context 中有一個 handler 屬性保存着對應的 Handler 實例). 判斷此 Handler 是否重名的方法很簡單: Netty 中有一個 name2ctx Map 字段, key 是 handler 的名字, 而 value 則是 handler 自己. 所以經過以下代碼就能夠判斷一個 handler 是否重名了:

private void checkDuplicateName(String name) {
    if (name2ctx.containsKey(name)) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

爲了添加一個 handler 到 pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext. 所以在上面的代碼中咱們能夠看到新實例化了一個 newCtx 對象, 並將 handler 做爲參數傳遞到構造方法中. 那麼咱們來看一下實例化的 DefaultChannelHandlerContext 到底有什麼玄機吧.
首先看它的構造器:

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
    super(pipeline, group, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

DefaultChannelHandlerContext 的構造器中, 調用了兩個頗有意思的方法: isInboundisOutbound, 這兩個方法是作什麼的呢?

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

從源碼中能夠看到, 當一個 handler 實現了 ChannelInboundHandler 接口, 則 isInbound 返回真; 類似地, 當一個 handler 實現了 ChannelOutboundHandler 接口, 則 isOutbound 就返回真.
而這兩個 boolean 變量會傳遞到父類 AbstractChannelHandlerContext 中, 並初始化父類的兩個字段: inboundoutbound.
那麼這裏的 ChannelInitializer 所對應的 DefaultChannelHandlerContext 的 inbound 與 inbound 字段分別是什麼呢? 那就看一下 ChannelInitializer 到底實現了哪一個接口不就好了? 以下是 ChannelInitializer 的類層次結構圖:

clipboard.png

能夠清楚地看到, ChannelInitializer 僅僅實現了 ChannelInboundHandler 接口, 所以這裏實例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false.
不就是 inbound 和 outbound 兩個字段嘛, 爲何須要這麼大費周章地分析一番? 其實這兩個字段關係到 pipeline 的事件的流向與分類, 所以是十分關鍵的, 不過我在這裏先賣個關子, 後面咱們再來詳細分析這兩個字段所起的做用. 在這裏, 讀者只須要記住, ChannelInitializer 所對應的 DefaultChannelHandlerContext 的 inbound = true, outbound = false 便可.

當建立好 Context 後, 就將這個 Context 插入到 Pipeline 的雙向鏈表中:

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}

顯然, 這個代碼就是典型的雙向鏈表的插入操做了. 當調用了 addLast 方法後, Netty 就會將此 handler 添加到雙向鏈表中 tail 元素以前的位置.

自定義 ChannelHandler 的添加過程

在上一小節中, 咱們已經分析了一個 ChannelInitializer 如何插入到 Pipeline 中的, 接下來就來探討一下 ChannelInitializer 在哪裏被調用, ChannelInitializer 的做用, 以及咱們自定義的 ChannelHandler 是如何插入到 Pipeline 中的.

Netty 源碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭 (客戶端) 一章的 channel 的註冊過程 小節中, 咱們已經分析過 Channel 的註冊過程了, 這裏咱們再簡單地複習一下:

  • 首先在 AbstractBootstrap.initAndRegister中, 經過 group().register(channel), 調用 MultithreadEventLoopGroup.register 方法

  • 在MultithreadEventLoopGroup.register 中, 經過 next() 獲取一個可用的 SingleThreadEventLoop, 而後調用它的 register

  • 在 SingleThreadEventLoop.register 中, 經過 channel.unsafe().register(this, promise) 來獲取 channel 的 unsafe() 底層操做對象, 而後調用它的 register.

  • 在 AbstractUnsafe.register 方法中, 調用 register0 方法註冊 Channel

  • 在 AbstractUnsafe.register0 中, 調用 AbstractNioChannel#doRegister 方法

  • AbstractNioChannel.doRegister 方法經過 javaChannel().register(eventLoop().selector, 0, this) 將 Channel 對應的 Java NIO SockerChannel 註冊到一個 eventLoop 的 Selector 中, 而且將當前 Channel 做爲 attachment.

而咱們自定義 ChannelHandler 的添加過程, 發生在 AbstractUnsafe.register0 中, 在這個方法中調用了 pipeline.fireChannelRegistered() 方法, 其實現以下:

@Override
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
    return this;
}

上面的代碼很簡單, 就是調用了 head.fireChannelRegistered() 方法而已.

關於上面代碼的 head.fireXXX 的調用形式, 是 Netty 中 Pipeline 傳遞事件的經常使用方式, 咱們之後會常常看到.

還記得 head 的 類層次結構圖不, head 是一個 AbstractChannelHandlerContext 實例, 而且它沒有重寫 fireChannelRegistered 方法, 所以 head.fireChannelRegistered 實際上是調用的 AbstractChannelHandlerContext.fireChannelRegistered:

@Override
public ChannelHandlerContext fireChannelRegistered() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
    return this;
}

這個方法的第一句是調用 findContextInbound 獲取一個 Context, 那麼它返回的 Context 究竟是什麼呢? 咱們跟進代碼中看一下:

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

很顯然, 這個代碼會從 head 開始遍歷 Pipeline 的雙向鏈表, 而後找到第一個屬性 inbound 爲 true 的 ChannelHandlerContext 實例. 想起來了沒? 咱們在前面分析 ChannelInitializer 時, 花了大量的筆墨來分析了 inbound 和 outbound 屬性, 你看如今這裏就用上了. 回想一下, ChannelInitializer 實現了 ChannelInboudHandler, 所以它所對應的 ChannelHandlerContext 的 inbound 屬性就是 true, 所以這裏返回就是 ChannelInitializer 實例所對應的 ChannelHandlerContext. 即:

clipboard.png

當獲取到 inbound 的 Context 後, 就調用它的 invokeChannelRegistered 方法:

private void invokeChannelRegistered() {
    try {
        ((ChannelInboundHandler) handler()).channelRegistered(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

咱們已經強調過了, 每一個 ChannelHandler 都與一個 ChannelHandlerContext 關聯, 咱們能夠經過 ChannelHandlerContext 獲取到對應的 ChannelHandler. 所以很顯然了, 這裏 handler() 返回的, 其實就是一開始咱們實例化的 ChannelInitializer 對象, 並接着調用了 ChannelInitializer.channelRegistered 方法. 看到這裏, 讀者朋友是否會以爲有點眼熟呢? ChannelInitializer.channelRegistered 這個方法咱們在第一章的時候已經大量地接觸了, 可是咱們並無深刻地分析這個方法的調用過程, 那麼在這裏讀者朋友應該對它的調用有了更加深刻的瞭解了吧.

那麼這個方法中又有什麼玄機呢? 繼續看代碼:

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}

initChannel 這個方法咱們很熟悉了吧, 它就是咱們在初始化 Bootstrap 時, 調用 handler 方法傳入的匿名內部類所實現的方法:

.handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });

所以當調用了這個方法後, 咱們自定義的 ChannelHandler 就插入到 Pipeline 了, 此時的 Pipeline 以下圖所示:

clipboard.png

當添加了自定義的 ChannelHandler 後, 會刪除 ChannelInitializer 這個 ChannelHandler, 即 "ctx.pipeline().remove(this)", 所以最後的 Pipeline 以下:

clipboard.png

好了, 到了這裏, 咱們的 自定義 ChannelHandler 的添加過程 也分析的查很少了.

下一小節 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (二)

本文由 yongshun 發表於我的博客, 採用 署名-相同方式共享 3.0 中國大陸許可協議.
Email: yongshun1228@gmail.com
本文標題爲: Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)
本文連接爲: http://www.javashuo.com/article/p-dzuxyzxk-hk.html

相關文章
相關標籤/搜索