Pipeline outbound

netty源碼死磕8html

Pipeline outbound 出站流程揭祕編程


1. Pipeline outbound流程

1.1. 出站的定義


簡單回顧一下。promise

出站(outbound) 操做,一般是處於上層的Netty channel,去操做底層Java  NIO channel/OIO Channel。服務器

主要出站(outbound)操做以下: 異步

1. 端口綁定 bindide

2. 鏈接服務端 connect性能

3. write寫通道學習

4.  flush刷新通道this

5.  read讀通道編碼

6. 主動斷開鏈接 disconnect

7. 主動關閉通道 close


最爲常見,也是最容易理解的出站操做,是第3個操做 —— write寫通道。

一個Netty Channel的write 出站操做 實例:


// server向 Channel寫登陸響應
ctx.channel().write(「恭喜,登陸成功」);

//....

1.2. 出站處理器Handler

對於出站操做,有相應的出站Handler處理器。

有四個比較重要的出站Handler類。

這個四個 Handler 相關的類結構圖以下:

wps27CD.tmp


在抽象的ChannelOutboundHandler 接口中,定義了全部的出站操做的方法聲明。

在ChannelOutboundHandlerAdapter 出站適配器中,提供了出站操做的默認實現。若是要實現定製的出站業務邏輯,繼承ChannelOutboundHandlerAdapter 適配器便可。ChannelOutboundHandlerAdapter 面向的是通用的出站處理場景。

有一個特殊場景的出站處理器——HeadContext。先按下不表,稍候重點介紹。


1.3. 出站的上下文包裹器Context

雖然有專門的Handler,可是,並無專門的出站Context上下文包裹器。

強調一下:

沒有單獨的出站上下文Context基類。出站和入站,複用了同一個上下文Context基類。它就是AbstractChannelHandlerContext。

在這個AbstractChannelHandlerContext基類中,定義了每個出站操做的默認實現。

基本的出站方法以下:

AbstractChannelHandlerContext.bind(SocketAddress, ChannelPromise)

AbstractChannelHandlerContext.connect(SocketAddress,SocketAddress, hannelPromise)

AbstractChannelHandlerContext.write(Object, ChannelPromise)

AbstractChannelHandlerContext.flush()

AbstractChannelHandlerContext.read()

AbstractChannelHandlerContext.disconnect(ChannelPromise)

AbstractChannelHandlerContext.close(ChannelPromise)


贅述一遍:

Context類型的接口是ChannelHandlerContext,抽象的基類是AbstractChannelHandlerContext。

出站和入站的區分,經過基類AbstractChannelHandlerContext的兩個屬性來完成——outbound、intbound。

出站Context的兩個屬性的值是:

(1)AbstractChannelHandlerContext基類對象的屬性outbound的值爲false

(2)AbstractChannelHandlerContext基類對象的屬性intbound值爲true

wps27ED.tmp


Pipeline 的第一個節點HeadContext,outbound屬性值爲true,因此一個典型的出站上下文。


1.4. 流逼哄哄的HeadContext

爲何說流逼哄哄呢?

由於:HeadContext不光是一個出站類型的上下文Context, 並且它完成整個出站流程的最後一棒。

不信,看源碼:

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler

{
 private final Unsafe unsafe;
 HeadContext(DefaultChannelPipeline pipeline)

{
  //父類構造器
  super(pipeline, null, HEAD_NAME, false, true);
  //...

}

}


在HeadContext的構造器中,調用super 方法去初始化基類AbstractChannelHandlerContext實例。

注意,第四個、第五個參數很是重要

第四個參數false,此參數對應是是基類inbound的值,表示Head不是入站Context。

第五個參數true,此參數對應是是基類outbound的值,表示Head是一個出站Context。

因此,在做爲上下文Context角色的時候,HeadContext是黑白分明的、沒有含糊的。它就是一個出站上下文

可是,順便說起一下,HeadContext還承擔了另外的兩個角色:

(1)入站處理器

(2)出站處理器

因此,總計一下,其實HeadContext 承擔了3個角色。

HeadContext做爲Handler處理器的角色使用的時候,HeadContext整個Handler是一個兩面派,承擔了兩個Handler角色:

(1)HeadContext是一個入站Handler。HeadContext 是入站流水線處理處理的起點。是入站Handler隊列的排頭兵。

(2)HeadContext是一個出站Handler。HeadContext 是出站流水線處理的終點,完成了出站的最後棒—— 執行最終的Java IO Channel底層的出站方法。

整個出站處理的流水線,是如何一步一步,流轉到最後一棒的呢?


1.5. TailContext出站流程的起點

出站處理,起點在TailContext。

這一點,和入站處理的流程恰好反過來。


pipeline-out-1


在Netty 的Pipeline流水線上,出站流程的起點是TailContext,終點是HeadContext,流水線執行的方向是從尾到頭。

強調再強調:

出站流程,只有outbound 類型的 Context 參與。inbound 類型的上下文Context不參與(TailContext另說)。

上圖中,橙色的是outbound Context ,是出站類型,確定參與出站流程的。紫色的是inbound context,是入站類型的上下文Context。

上圖中,橙色的Context有兩個,分別是EncoderContext和HeaderContext兩個Context。EncoderContext 負責出站報文的編碼,通常將Java 對象編碼成特定格式的傳輸數據包data  package。HeaderContext 負責將數據包寫出到Channel 通道。

在Pipeline建立的時候,加入Handler以前,Pipeline就是有兩個默認的Context——HeadContext,和TailContext。

最初的Pipeline結構,以下圖所示:

pipeline-out-2


TailContext是出站起點,HeadContext是出站的終點。也就是說,Pipeline 從建立開始,就具已經備了Channel出站操做的能力的。

關鍵的問題是:做爲出站的起點,爲何TailContext不是橙色呢?

首先,TailContext不是outbound類型,反而,是inbound入站類型的上下文包裹器。

其次,TailContext 在出站流水線上,僅僅是承擔了一個啓動工做,尋找出第一個真正的出站Context,而且,將出站的第一棒交給他。

總之,在出站流程上,TailContext做用,只是一把鑰匙,僅此而已。


1.6. 出站write操做的流程實例

老規則,先上例子。

以最爲常見、最好理解的出站操做——Netty  Channel 出站write操做爲例,將outbound處理出站流程作一個詳細的描述。

整個寫出站的入站處理流程圖,以下:

wps283E.tmp

1.7. 出站操做的最初源頭

再看一次Netty Channel的write出站實例:


// server向客戶 Channel寫登陸響應
ctx.channel().write(「恭喜,登陸成功」);

//....


寫操做通常的源頭是從Netty 的通道channel開始的。當服務器須要發送一個業務消息到客戶端,會使用到以前打開的客戶端通道channel,調用通道channel的出站寫操做write方法,完成寫操做。

這個write方法的Netty 源碼,在基類AbstractChannel 實現了一個基礎的版本。

代碼以下:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel

{

 //…

@Override
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

//…

}


回憶一下通道和流水線的關係:一個通道一個pipeline流水線。一個流水線串起來一系列的Handler。

因此,通道將出站的操做,直接委託給了本身的成員——pipeline流水線。直接調用pipeline流水線的出站操做去完成。

也就是說,Channel 是甩手掌櫃,將出站操做委託給了Pipeline。然而,Pipeline仍是一個甩手掌櫃。

Pipeline直接甩給了誰呢?

Pipeline 將出站操做,甩給雙向鏈表的最後一個節點—— tail 節點。Pipeline的源碼以下:

public class ChannelPipeline …..

{

//…

@Override

public final ChannelFuture write(Object msg) {

  return tail.write(msg);

}

//…

}
因而乎,在pipeline的鏈表上,tail節點,恰恰就是出站操做的啓動節點。

1.8. Tail是出站操做的起點


TailContext 類的定義中,並無實現 write寫出站的方法。這個write(Object msg) 方法,定義在TailContext的基類——AbstractChannelHandlerContext 中。

代碼以下:

abstract class AbstractChannelHandlerContext

extends DefaultAttributeMap implements ChannelHandlerContext

{

//……

@Override

public ChannelFuture write(Object msg) {

//….

return write(msg, newPromise());

}

@Override

public ChannelFuture write(final Object msg, final ChannelPromise promise) {

//…

write(msg, false, promise);

return promise;

}

//……

@Override

public ChannelPromise newPromise() {

return new DefaultChannelPromise(channel(), executor());

}

//第三個重載的write

 private void write(Object msg, boolean flush, ChannelPromise promise) 
 {

       //...

   //找出下一棒 next

     AbstractChannelHandlerContext next = findContextOutbound();

       //....

    //執行下一棒 next.invoke

       next.invokeWrite(msg, promise);

       //...

 }

}


有三個版本的write重載方法:

ChannelFuture write(Object msg)

ChannelFuture write(Object msg,ChannelPromise promise)

ChannelFuture write(Object msg, boolean flush,ChannelPromise promise)

調用的次序是:

第一個調用第二個,第二個調用第三個。


第一個write 建立了一個ChannelPromise 對象,這個對象實例很是重要。由於Netty的write出站操做,並不必定是一調用write就當即執行,更多的時候是異步執行的。write返回的這個ChannelPromise 對象,是專門提供給業務程序,用來干預異步操做的過程。

能夠經過ChannelPromise 實例,監聽異步處理的是否結束,完成write出站正真執行後的一些業務處理,好比,統計出站操做執行的時間等等。

ChannelPromise 接口,繼承了 Netty 的Future的接口,使用了Future/Promise 模式。這個是一種異步處理干預的經典的模式。瘋狂創客圈另外開了一篇文章,專門講述Future/Promise 模式。


第二個write,最爲單薄。簡單的直接調用第三個write,調用前設置flush 參數的值爲false。flush 參數,表示是否要將緩衝區ByteBuf中的數據,當即寫入Java IO Channel底層套接字,發送出去。通常狀況下,第二個write設置了false,表示不當即發出,儘可能減小底層的發送,提高性能。


第三個write,最爲重要,也最爲複雜。分紅兩步,第一步是找出下一棒 next,下一棒next也是一個出站Context。第二步是,執行下一棒的invoke方法,也便是next.invokeWrite(msg, promise);

完成以上的三步操做,TailContext 終於將write出站的實際工做,交到了第一棒outbound Context的手中。

wps285E.tmp


至此,TailContext終於完成的啓動write流程的使命。


1.9. 出站流程小迭代的五個動做

通常來講,Pipeline上會有多個OutBound Context(包裹着Handler),每個OutBound Context 的處理,能夠當作是大的流水處理中的一次小迭代。

每個小迭代,有五個動做。

五個動做,具體以下:


(1)context.write(msg,promise)

(2)context.write(msg,flush,promise)

(3)context.findContextOutbound();

(4)next.invokeWrite(msg,promise)

(5)handler.write(this,msg,promise)


Context中的write(msg,promise)方法,是整個小迭代的起點。局部的流程圖以下:


wps286F.tmp


整個五個動做中,只有第五步在Handler中定義。其餘的四步,都在Context中定義。

第一步、第二步的 write 方法在前面已經詳細介紹過了。這兩步主要完成promise實例的 建立,flush 參數的設置。


如今到了比較關鍵的步驟:第三步。這一步是尋找出站的下一棒。


1.10. findContextOutbound找出下一棒


出站流程的尋找下一棒的工做,和入站處理的尋找下一棒的方向,恰好反過來。

出站流程,查找的方向是從尾到頭。這就用到的雙向鏈表的指針是prev——向前的指針。具體來講,從當前的context開始,不斷的使用prev指針,進行循環迭代查找。一直找到終點HeadContext,結束。

Netty源碼以下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{

//…
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

//…

}
這個是一個標準的鏈表的前向查詢。

每一次的查找,this表示當前的查詢所在的context 節點,this.prev表示前一個context節點。


查找時,用到的雙向鏈表的指針是prev——向前的指針。經過while循環,一直往Pipeline的前面查找,若是前面的context不是outbound出站上下文,則一直向前。直到,直到,一直到查找出下一個出站Context上下文爲止。

最初的查找,從TailContext開始,this就是TailContext。後續的每一次查找,都是從當前的Context上下文開始的。


1.11. context的invokeWrite

找到下一棒出站Context後,執行context的invokeWrite的操做。

源碼以下:

abstract class AbstractChannelHandlerContext

extends DefaultAttributeMap implements ChannelHandlerContext

{

//……

    private void invokeWrite(Object msg, ChannelPromise promise) {

        //...

            invokeWrite0(msg, promise);

       //...

    }

//……

    private void invokeWrite0(Object msg, ChannelPromise promise) {

        try {

            ((ChannelOutboundHandler) handler()).write(this, msg, promise);

        } catch (Throwable t) {

            notifyOutboundHandlerException(t, promise);

        }

    }

}


context的invokeWrite操做,最終調用到了其所包裹的handler的write方法。完成 定義在handler中的業務處理動做。


1.12. 默認的write出站實現

默認的write 出站方法的實現,定義在ChannelOutboundHandlerAdapter 中。write方法的源碼以下:

public class ChannelOutboundHandlerAdapter
              extends ChannelHandlerAdapter
 implements ChannelOutboundHandler
{
//……
    @Override
    public void write(ChannelHandlerContext ctx,
        Object msg, ChannelPromise promise) throws Exception
  {
        ctx.write(msg, promise);
  }
//……
}


Handler的write出站操做,已經到了一輪出站小迭代的最後一步。這個默認的write方法,簡單的調用context. write方法,回到了小迭代的第一步。


換句話說,默認的ChannelOutboundHandlerAdapter 中的handler方法,是流水線的迭代一個一個環節先後鏈接起來,的關鍵的一小步,保證了流水線不被中斷掉。


反覆進行小迭代,迭代處理完中間的業務handler以後,就會走到流水線的HeadContext。


1.13. HeadContext出站的最後一棒


在出站迭代處理pipeline的最後一步, 會來到HeadContext。

HeadContext是如何完成最後一棒的呢?

上源碼:

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler
{
    private final Unsafe unsafe;
    HeadContext(DefaultChannelPipeline pipeline) {
         //父類構造器
        super(pipeline, null, HEAD_NAME, false, true);
        this.unsafe = pipeline.channel().unsafe();
       //...
}

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
     this.unsafe.write(msg, promise);
}
}


HeadContext 包含了一個unsafe 成員。 這個unsafe 成員,是一個只供在netty內部使用的類型。

unsafe 主要功能,就是完成對 Java NIO 底層Channel的寫入 。


因而可知,在Netty中的三大上下文包裹器HeadContext、TailContext 、 DefaultChannelHandlerContext中,HeadContext是離 Java NIO 底層Channel最近的地方。三大包裹器,除了HeadContext,也沒有誰包含Unsafe。對完成出站的最終操做職能來講,沒有誰比HeadContext 更加直接。因此,這個出站處理的最後一棒,只能是HeadContext 了,呵呵。


至此爲止,write出站的整個流水線流程,已經所有講完。


從具體到抽象,咱們再回到出站處理的通用流程。


1.14. 出站操做的全流程

基本上,在流程上,全部的出站事件的處理過程,是一致的。

爲了方便說明,使用OUT_EVT符號代替一個通用出站操做。

通用的出站Outbound操做處理過程,大體以下:


(1)channel.OUT_EVT(msg);

(2)pipeline.OUT_EVT(msg);

(3)context.OUT_EVT(msg);

(4)context.OUT_EVT(msg,promise);

(5)context.OUT_EVT(msg,flush,promise);

(6)context.findContextOutbound();

(7)next.invoke(msg,flush,promise);

(8)handle.OUT_EVT(context,msg,promise);

(9)context.OUT_EVT(msg,promise);


上面的流程,若是短期內看不懂,能夠在回頭看看write出站的實例。




無編程不創客,無案例不學習。瘋狂創客圈,一大波高手正在交流、學習中!

瘋狂創客圈 Netty 死磕系列 10多篇深度文章博客園 總入口】  QQ羣:104131248

相關文章
相關標籤/搜索