Pipeline inbound

精進篇:netty源碼死磕7  html

巧奪天工——Pipeline入站流程詳解編程

1. Pipeline的入站流程

在講解入站處理流程前,先腦補和鋪墊一下兩個知識點:緩存

(1)如何向Pipeline添加一個Handler節點安全

(2)Handler的出站和入站的區分方式ide

1.1. HandlerContext節點的添加

在Pipeline實例建立的同時,Netty爲Pipeline建立了一個Head和一個Tail,而且創建好了連接關係。函數

代碼以下:學習

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);

    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
this

也就是說,在加入業務Handler以前,Pipeline的內部雙向鏈表不是一個空鏈表。而新加入的Handler,加入的位置是,插入在鏈表的倒數第二個位置,在Tail的前面。spa

wpsF713.tmp

加入Handler的代碼,在DefaultChannelPipeline類中。指針

具體的代碼以下:

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {

     //檢查重複
        checkMultiplicity(handler);
        //建立上下文
        newCtx = newContext(group, filterName(name, handler), handler);
         //加入雙向鏈表
        addLast0(newCtx);
        //…
    }
    callHandlerAdded0(newCtx);
    return this;
}

加入以前,首先進行Handler的重複性檢查。非共享類型的Handler,只能被添加一次。若是當前要添加的Handler是非共享的,而且已經添加過,那就拋出異常,不然,標識該handler已經添加。

什麼是共享類型,什麼是非共享類型呢?先聚焦一下主題,後面會詳細解答。

檢查完成後,給Handler建立包裹上下文Context,而後將Context加入到雙向列表的尾部Tail前面。

代碼以下:

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

這裏主要是經過調整雙向連接的指針,完成節點的插入。若是對雙向鏈表不熟悉,能夠本身畫畫指向變化的草圖,就明白了。

1.2. Context的出站和入站的類型

對於入站和出站,Pipeline中兩種不一樣類型的Handler處理器,出站Handler和入站Handler。

入站(inBound)事件Handler的基類是 ChannelInboundHandler,出站(outBound)事件Handler的基類是 ChannelOutboundHandler。

處理入站(inBound)事件,最典型的就是處理Channel讀就緒事件,還有就是業務處理Handler。處理出站outBound操做,最爲典型的處理,是寫數據到Channel。

對應於兩種Handler處理器的Context 包裹器,更加須要區分入站和出站。對Context的區分方式,又是什麼呢?

首先,須要在Context加了一組boolean類型判斷屬性,判斷出站和入站的類型。這組屬性就是——inbound、outbound。這組屬性,定義在上下文包裹器的基類中——ContextAbstractChannelHandlerContext 定義。它們在構造函數中進行初始化。

ContextAbstractChannelHandlerContext 的構造器代碼以下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{

private final boolean inbound;
private final boolean outbound;

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
   //….
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
   //…
}

//…

}
對於通用的默認包裹器,繼承了ContextAbstractChannelHandlerContext 基類,而且在本身的構造器中,初始化這兩個父類屬性的方法,以下:

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

 //…
private final ChannelHandler handler;
 private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
 }

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
 super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
   //….

this.handler = handler;

}

}
從上面的代碼能夠看出, 通用的包裹器DefaultChannelHandlerContext ,經過本身的isInbound()、isOutbound()方法的返回值,對構造函數參數中的Handler 類型進行判斷,來設置分類的boolean類型屬性inbound、outbound的值。

再看兩個非通用的HandlerContext——head和tail。

在HeadContext,則調用父類構造器的第五個參數(outbound)的值爲true,表示Head是一個出站類型的Context。代碼以下:

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {
    private final Unsafe unsafe;
    HeadContext(DefaultChannelPipeline pipeline) {

//父類構造器
        super(pipeline, null, HEAD_NAME, false, true);
//...

}

}

在TailContext,則調用父類構造器的第四個參數(inbound)的值爲true,表示Tail是一個入站類型的Context。代碼以下:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
//...

   }

}

不管是哪一種類型的handler,Pipeline沒有單獨和分開的入站和出站鏈表,都是統一在一個雙向鏈表中進行管理。

下圖中,使用紫色表明入站Context,橙色表明出站Context。

wpsF724.tmp

在上圖中,橙色表示出站Context,紫色表示入站Context。

在上圖中的流程中,區分一個 ChannelHandlerContext究竟是in(入站)仍是out(出站) ,使用的是Context的isInbound() 和 isOutbound() 這一組方法。

贅述一下:

Tail是出站執行流程的啓動點,可是,它最後一個入站處理器。

Hearder,是入站流程的啓動起點,可是,它最後一個出站處理器。

感受,有點兒饒。容易讓人混淆。看完整個的入站流程和出站流程的詳細介紹,就清楚了。

1.3. 入站操做的全流程

入站事件前面已經講過,流向是從Java 底層IO到ChannelHandler。入站事件的類型包括鏈接創建和斷開、讀就緒、寫就緒等。

基本上,,在處理流程上,大部分的入站事件的處理過程,是一致的。

通用的入站Inbound事件處理過程,大體以下(使用IN_EVT符號代替一個通用事件):

(1)pipeline.fireIN_EVT

(2)AbstractChannelHandlerContext.invokeIN_EVT(head, msg);

(3)context.invokeIN_EVT(msg);

(4)handler.IN_EVT

(5)context.fireIN_EVT(msg);

(6)Connect.findContextInbound()

(7)context.invokeIN_EVT(msg);

上面的流程,若是短期內看不懂,沒有關係。能夠先看一個例子,再回來推敲學習這個通用流程。

1.4. 讀就緒事件的流程實例

下面以最爲常見和最好理解的事件——讀就緒的事件爲例,將Inbound事件作一個詳細的描述。

整個讀就緒的入站處理流程圖,以下:


inbount 1


1.5. 入站源頭的Java底層 NIO封裝

入站事件處理的源頭,在Channel的底層Java NIO 就緒事件。

Netty對底層Java NIO的操做類,進行了封裝,封裝成了Unsafe系列的類。比方說,AbstractNioByteChannel 中,就有一個NioByteUnsafe 類,封裝了底層的Java NIO的底層Byte字節的讀取操做。

爲何叫Unsafe呢?

很簡單,就是在外部使用,是不安全的。Unsafe就是隻能在Channel內部使用的,在Netty 外部的應用開發中,不建議使用。Unsafe包裝了底層的數據讀取工做,包裝在Channel中,不須要應用程序關心。應用程序只須要從緩存中,取出緩存數據,完成業務處理便可。

Channel 讀取數據到緩存後,下一步就是調用Pipeline的fireChannelRead()方法,從這個點開始,正式開始了Handler的入站處理流程。

從Channel 到Pipeline這一段,Netty的代碼以下:

public abstract class AbstractNioByteChannel extends AbstractNioChannel

{

 protected class NioByteUnsafe extends AbstractNioUnsafe {

     @Override
    public final void read() {
final ChannelPipeline pipeline = pipeline();
        ……
      // 讀取結果.

      byteBuf = allocHandle.allocate(allocator);
      ……
      int localReadAmount = doReadBytes(byteBuf);
      ………

  // 經過pipeline dispatch(分發)結果到Handler

  pipeline.fireChannelRead(byteBuf);
      ……
   }

//經過重寫newUnsafe() 方法

//取得內部類NioSocketChannelUnsafe的實例
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioSocketChannelUnsafe();
}

…

}
channel調用了 pipeline.fireChannelRead(byteBuf)後,進入pipeline 開始處理。這是流程的真正啓動的動做。

1.6. Head是入站流程的起點

前面分析到,Pipeline中,入站事件處理流程的處理到的第一個Context是Head。

這一點,從DefaultChannelPipeline 源碼能夠獲得驗證,以下所示:

public class DefaultChannelPipeline implements ChannelPipeline

{

…

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

…

}

Pipeline將內部鏈表的head頭做爲參數,傳入了invokeChannelRead的靜態方法中。

就像開動了流水線的開關,開啓了整個的流水線的循環處理。


1.7. 小迭代的五個動做

一個Pipeline上有多個InBound Handler,每個InBound Handler的處理,能夠算作一次迭代,也能夠說成小迭代。

每個迭代,有四個動做。這個invokeIN_EVT方法,是整個四個動做的小迭代的起點。

四個動做,分別以下:

(1)invokeChannelRead(next, msg)

(2)context.invokeIN_EVT(msg);

(3)handler.IN_EVT

(4)context.fireIN_EVT(msg);

(5)Connect.findContextInbound()

局部的流程圖以下:

inbount 2

整個五個動做中,只有第三步在Handler中,其餘的四步都在Context中完成。

1.8. 流水線小迭代的第一步

invokeChannelRead(next,msg) 靜態方法,很是關鍵,其重要做爲是:做爲流水線迭代處理的每一輪循環小迭代的第一步。在Context的抽象基類中,源碼以下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{
//...
static void invokeChannelRead(final AbstractChannelHandlerContext next, final Object msg) {
  ……
next.invokeChannelRead(msg);
  ……
}

//...
}
首先,這個是一個靜態方法。

其次,這個方法沒有啥特別。只是作了一個二轉。將處理傳遞給context實例,調用context實例的invokeChannelRead方法。強調一下,使用了同一個名稱哈。可是後邊的invokeChannelRead,是一個實例方法,並且只有一個參數。

1.9. context.invokeIN_EVT實例方法

流水線小迭代第二步,觸發當前的Context實例的IN_EVT操做。

對於IN_EVT爲ChannelRead的時候,第二步方法爲invokeChannelRead,其源碼以下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{

private void invokeChannelRead(Object msg) {
    ……
       ((ChannelInboundHandler) handler()).channelRead(this, msg);
……
}

}

這一步很簡單,就是將context和msg(byteBuf)做爲參數,傳遞給Handler實例,完成業務處理。

在Handler中,能夠獲取到以上兩個參數實例,做爲業務處理的輸入。在業務Handler中的IN_EVT方法中,能夠寫本身的業務處理邏輯。

1.10. 默認的handler.IN_EVT 入站處理操做

流水線小迭代第三步,完後Context實例中Handler的IN_EVT業務操做。

若是Handler中的IN_EVT方法中沒有寫業務邏輯,則Netty提供了默認的實現。默認源碼在ChannelInboundHandlerAdapter 適配器類中。

當IN_EVT爲ChannelRead的時候,第三步的默認實現源碼以下:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler

{

//默認的通道讀操做
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

//...

}

讀完源碼發現,這份默認源碼,都沒有作什麼實際的處理。

惟一的乾的活,就是調用ctx.fireChannelRead(msg),將msg經過context再一次發射出去。

進入第四步。

1.11. context.fireIN_EVT再發射消息

流水線小迭代第四步,尋找下家,觸發下一家的入站處理。

整個是流水線的流動的關鍵一步,實現了向下一個HandlerContext的流動。

源碼以下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{

private final boolean inbound;
private final boolean outbound;

//...

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

//..

}

第四步仍是在ChannelInboundHandlerAdapter 適配器中定義。首先經過第五步,找到下一個Context,而後回到小迭代的第一步,完成了小迭代的一個閉環。

這一步,對於業務Handler而言,很重要。

在用戶Handler中,若是當前 Handler 須要將此事件繼續傳播下去,則調用contxt.fireIN_EVT方法。若是不這樣作, 那麼此事件的流水線傳播會提早終止。

1.12. findContextInbound()找下家

第五步是查找下家。

代碼以下:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler

{

//...

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

}

這個是一個標準的鏈表查詢操做。this表示當前的context,this.next表示下一個context。經過while循環,一直往流水線的下邊找,知道查找到下一個入站Context爲止。

假定流水下以下圖所示:

wpsF775.tmp

在上圖中,若是當前context是head,則下一個是Decoder;若是當前context是Decoder,則下一個是Business;若是當前context是Business,則下一個是Tail。

第五步,是在第四步調用的。

找到以後,第四步經過 invokeChannelRead(findContextInbound(), msg)這個靜態方法的調用,由回到小迭代的第一步,開始下一輪小的運行。

1.13. 最後一輪Context處理

咱們在前面講到,在Netty中,Tail是最後一個IN boundContext。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}

protected void onUnhandledInboundMessage(Object msg) {
  //…

        //釋放msg的引用計數
        ReferenceCountUtil.release(msg);
  //..
}

}

在最後的一輪入站處理中。Tail沒有作任何的業務邏輯,僅僅是對msg 釋放一次引用計數。

這個msg ,是從channel 入站源頭的過來的byteBuf。有多是引用計數類型(ReferenceCounted)類型的緩存,則須要釋放其引用。若是不是ReferenceCounted,則什麼也不作。

關於緩存的引用計數,後續再開文章作專題介紹。

1.14. 小結

對入站(Inbound )事件的處理流程,作一下小節:

Inbound 事件是通知事件,當某件事情已經就緒後,從Java IO 通知上層Netty  Channel。

Inbound 事件源頭是 Channel內部的UNSafe;

Inbound 事件啓動者是 Channel,經過Pipeline. fireIN_EVT啓動。

Inbound 事件在 Pipeline 中傳輸方向是從 head 到 tail。

Inbound 事件最後一個的處理者是 TailContext, 而且其處理方法是空實現。若是沒有其餘的處理者,則對Inbound ,TailContext是惟一的處理者。

Inbound 事件的向後傳遞方法是contxt.fireIN_EVT方法。在用戶Handler中,若是當前 Handler 須要將此事件繼續傳播下去,則調用contxt.fireIN_EVT方法。若是不這樣作, 那麼此事件的流水線傳播會提早終止。




無編程不創客,無案例不學習。瘋狂創客圈,一大波高手正在交流、學習中!

瘋狂創客圈 Netty 死磕系列 10多篇深度文章博客園 總入口】  QQ羣:104131248

相關文章
相關標籤/搜索