netty源碼死磕8html
Pipeline outbound 出站流程揭祕編程
簡單回顧一下。promise
出站(outbound) 操做,一般是處於上層的Netty channel,去操做底層Java NIO channel/OIO Channel。服務器
主要出站(outbound)操做以下: 異步
1. 端口綁定 bindide
2. 鏈接服務端 connect性能
3. write寫通道學習
4. flush刷新通道this
5. read讀通道編碼
6. 主動斷開鏈接 disconnect
7. 主動關閉通道 close
最爲常見,也是最容易理解的出站操做,是第3個操做 —— write寫通道。
一個Netty Channel的write 出站操做 實例:
// server向 Channel寫登陸響應 ctx.channel().write(「恭喜,登陸成功」); //....
對於出站操做,有相應的出站Handler處理器。
有四個比較重要的出站Handler類。
這個四個 Handler 相關的類結構圖以下:
在抽象的ChannelOutboundHandler 接口中,定義了全部的出站操做的方法聲明。
在ChannelOutboundHandlerAdapter 出站適配器中,提供了出站操做的默認實現。若是要實現定製的出站業務邏輯,繼承ChannelOutboundHandlerAdapter 適配器便可。ChannelOutboundHandlerAdapter 面向的是通用的出站處理場景。
有一個特殊場景的出站處理器——HeadContext。先按下不表,稍候重點介紹。
雖然有專門的Handler,可是,並無專門的出站Context上下文包裹器。
強調一下:
沒有單獨的出站上下文Context基類。出站和入站,複用了同一個上下文Context基類。它就是AbstractChannelHandlerContext。
在這個AbstractChannelHandlerContext基類中,定義了每個出站操做的默認實現。
基本的出站方法以下:
AbstractChannelHandlerContext.bind(SocketAddress, ChannelPromise) AbstractChannelHandlerContext.connect(SocketAddress,SocketAddress, hannelPromise) AbstractChannelHandlerContext.write(Object, ChannelPromise) AbstractChannelHandlerContext.flush() AbstractChannelHandlerContext.read() AbstractChannelHandlerContext.disconnect(ChannelPromise) AbstractChannelHandlerContext.close(ChannelPromise)
贅述一遍:
Context類型的接口是ChannelHandlerContext,抽象的基類是AbstractChannelHandlerContext。
出站和入站的區分,經過基類AbstractChannelHandlerContext的兩個屬性來完成——outbound、intbound。
出站Context的兩個屬性的值是:
(1)AbstractChannelHandlerContext基類對象的屬性outbound的值爲false
(2)AbstractChannelHandlerContext基類對象的屬性intbound值爲true
Pipeline 的第一個節點HeadContext,outbound屬性值爲true,因此一個典型的出站上下文。
爲何說流逼哄哄呢?
由於:HeadContext不光是一個出站類型的上下文Context, 並且它完成整個出站流程的最後一棒。
不信,看源碼:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { //父類構造器 super(pipeline, null, HEAD_NAME, false, true); //... } }
在HeadContext的構造器中,調用super 方法去初始化基類AbstractChannelHandlerContext實例。
注意,第四個、第五個參數很是重要。
第四個參數false,此參數對應是是基類inbound的值,表示Head不是入站Context。
第五個參數true,此參數對應是是基類outbound的值,表示Head是一個出站Context。
因此,在做爲上下文Context角色的時候,HeadContext是黑白分明的、沒有含糊的。它就是一個出站上下文。
可是,順便說起一下,HeadContext還承擔了另外的兩個角色:
(1)入站處理器
(2)出站處理器
因此,總計一下,其實HeadContext 承擔了3個角色。
HeadContext做爲Handler處理器的角色使用的時候,HeadContext整個Handler是一個兩面派,承擔了兩個Handler角色:
(1)HeadContext是一個入站Handler。HeadContext 是入站流水線處理處理的起點。是入站Handler隊列的排頭兵。
(2)HeadContext是一個出站Handler。HeadContext 是出站流水線處理的終點,完成了出站的最後棒—— 執行最終的Java IO Channel底層的出站方法。
整個出站處理的流水線,是如何一步一步,流轉到最後一棒的呢?
出站處理,起點在TailContext。
這一點,和入站處理的流程恰好反過來。
在Netty 的Pipeline流水線上,出站流程的起點是TailContext,終點是HeadContext,流水線執行的方向是從尾到頭。
強調再強調:
出站流程,只有outbound 類型的 Context 參與。inbound 類型的上下文Context不參與(TailContext另說)。
上圖中,橙色的是outbound Context ,是出站類型,確定參與出站流程的。紫色的是inbound context,是入站類型的上下文Context。
上圖中,橙色的Context有兩個,分別是EncoderContext和HeaderContext兩個Context。EncoderContext 負責出站報文的編碼,通常將Java 對象編碼成特定格式的傳輸數據包data package。HeaderContext 負責將數據包寫出到Channel 通道。
在Pipeline建立的時候,加入Handler以前,Pipeline就是有兩個默認的Context——HeadContext,和TailContext。
最初的Pipeline結構,以下圖所示:
TailContext是出站起點,HeadContext是出站的終點。也就是說,Pipeline 從建立開始,就具已經備了Channel出站操做的能力的。
關鍵的問題是:做爲出站的起點,爲何TailContext不是橙色呢?
首先,TailContext不是outbound類型,反而,是inbound入站類型的上下文包裹器。
其次,TailContext 在出站流水線上,僅僅是承擔了一個啓動工做,尋找出第一個真正的出站Context,而且,將出站的第一棒交給他。
總之,在出站流程上,TailContext做用,只是一把鑰匙,僅此而已。
老規則,先上例子。
以最爲常見、最好理解的出站操做——Netty Channel 出站write操做爲例,將outbound處理出站流程作一個詳細的描述。
整個寫出站的入站處理流程圖,以下:
再看一次Netty Channel的write出站實例:
// server向客戶 Channel寫登陸響應 ctx.channel().write(「恭喜,登陸成功」); //....
寫操做通常的源頭是從Netty 的通道channel開始的。當服務器須要發送一個業務消息到客戶端,會使用到以前打開的客戶端通道channel,調用通道channel的出站寫操做write方法,完成寫操做。
這個write方法的Netty 源碼,在基類AbstractChannel 實現了一個基礎的版本。
代碼以下:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { //… @Override public ChannelFuture write(Object msg) { return pipeline.write(msg); } //… }
回憶一下通道和流水線的關係:一個通道一個pipeline流水線。一個流水線串起來一系列的Handler。
因此,通道將出站的操做,直接委託給了本身的成員——pipeline流水線。直接調用pipeline流水線的出站操做去完成。
也就是說,Channel 是甩手掌櫃,將出站操做委託給了Pipeline。然而,Pipeline仍是一個甩手掌櫃。
Pipeline直接甩給了誰呢?
Pipeline 將出站操做,甩給雙向鏈表的最後一個節點—— tail 節點。Pipeline的源碼以下:
public class ChannelPipeline ….. { //… @Override public final ChannelFuture write(Object msg) { return tail.write(msg); } //… }
TailContext 類的定義中,並無實現 write寫出站的方法。這個write(Object msg) 方法,定義在TailContext的基類——AbstractChannelHandlerContext 中。
代碼以下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { //…… @Override public ChannelFuture write(Object msg) { //…. return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { //… write(msg, false, promise); return promise; } //…… @Override public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); } //第三個重載的write private void write(Object msg, boolean flush, ChannelPromise promise)
{ //... //找出下一棒 next AbstractChannelHandlerContext next = findContextOutbound(); //.... //執行下一棒 next.invoke next.invokeWrite(msg, promise); //... } }
有三個版本的write重載方法:
ChannelFuture write(Object msg)
ChannelFuture write(Object msg,ChannelPromise promise)
ChannelFuture write(Object msg, boolean flush,ChannelPromise promise)
第一個調用第二個,第二個調用第三個。
第一個write 建立了一個ChannelPromise 對象,這個對象實例很是重要。由於Netty的write出站操做,並不必定是一調用write就當即執行,更多的時候是異步執行的。write返回的這個ChannelPromise 對象,是專門提供給業務程序,用來干預異步操做的過程。
能夠經過ChannelPromise 實例,監聽異步處理的是否結束,完成write出站正真執行後的一些業務處理,好比,統計出站操做執行的時間等等。
ChannelPromise 接口,繼承了 Netty 的Future的接口,使用了Future/Promise 模式。這個是一種異步處理干預的經典的模式。瘋狂創客圈另外開了一篇文章,專門講述Future/Promise 模式。
第二個write,最爲單薄。簡單的直接調用第三個write,調用前設置flush 參數的值爲false。flush 參數,表示是否要將緩衝區ByteBuf中的數據,當即寫入Java IO Channel底層套接字,發送出去。通常狀況下,第二個write設置了false,表示不當即發出,儘可能減小底層的發送,提高性能。
第三個write,最爲重要,也最爲複雜。分紅兩步,第一步是找出下一棒 next,下一棒next也是一個出站Context。第二步是,執行下一棒的invoke方法,也便是next.invokeWrite(msg, promise);
完成以上的三步操做,TailContext 終於將write出站的實際工做,交到了第一棒outbound Context的手中。
至此,TailContext終於完成的啓動write流程的使命。
通常來講,Pipeline上會有多個OutBound Context(包裹着Handler),每個OutBound Context 的處理,能夠當作是大的流水處理中的一次小迭代。
每個小迭代,有五個動做。
五個動做,具體以下:
(1)context.write(msg,promise)
(2)context.write(msg,flush,promise)
(3)context.findContextOutbound();
(4)next.invokeWrite(msg,promise)
(5)handler.write(this,msg,promise)
Context中的write(msg,promise)方法,是整個小迭代的起點。局部的流程圖以下:
整個五個動做中,只有第五步在Handler中定義。其餘的四步,都在Context中定義。
第一步、第二步的 write 方法在前面已經詳細介紹過了。這兩步主要完成promise實例的 建立,flush 參數的設置。
如今到了比較關鍵的步驟:第三步。這一步是尋找出站的下一棒。
出站流程的尋找下一棒的工做,和入站處理的尋找下一棒的方向,恰好反過來。
出站流程,查找的方向是從尾到頭。這就用到的雙向鏈表的指針是prev——向前的指針。具體來講,從當前的context開始,不斷的使用prev指針,進行循環迭代查找。一直找到終點HeadContext,結束。
Netty源碼以下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { //… private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; } //… }
每一次的查找,this表示當前的查詢所在的context 節點,this.prev表示前一個context節點。
查找時,用到的雙向鏈表的指針是prev——向前的指針。經過while循環,一直往Pipeline的前面查找,若是前面的context不是outbound出站上下文,則一直向前。直到,直到,一直到查找出下一個出站Context上下文爲止。
最初的查找,從TailContext開始,this就是TailContext。後續的每一次查找,都是從當前的Context上下文開始的。
找到下一棒出站Context後,執行context的invokeWrite的操做。
源碼以下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { //…… private void invokeWrite(Object msg, ChannelPromise promise) { //... invokeWrite0(msg, promise); //... } //…… private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } }
context的invokeWrite操做,最終調用到了其所包裹的handler的write方法。完成 定義在handler中的業務處理動做。
默認的write 出站方法的實現,定義在ChannelOutboundHandlerAdapter 中。write方法的源碼以下:
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { //…… @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } //…… }
Handler的write出站操做,已經到了一輪出站小迭代的最後一步。這個默認的write方法,簡單的調用context. write方法,回到了小迭代的第一步。
換句話說,默認的ChannelOutboundHandlerAdapter 中的handler方法,是流水線的迭代一個一個環節先後鏈接起來,的關鍵的一小步,保證了流水線不被中斷掉。
反覆進行小迭代,迭代處理完中間的業務handler以後,就會走到流水線的HeadContext。
在出站迭代處理pipeline的最後一步, 會來到HeadContext。
HeadContext是如何完成最後一棒的呢?
上源碼:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { //父類構造器 super(pipeline, null, HEAD_NAME, false, true); this.unsafe = pipeline.channel().unsafe(); //... } public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { this.unsafe.write(msg, promise); } }
HeadContext 包含了一個unsafe 成員。 這個unsafe 成員,是一個只供在netty內部使用的類型。
unsafe 主要功能,就是完成對 Java NIO 底層Channel的寫入 。
因而可知,在Netty中的三大上下文包裹器HeadContext、TailContext 、 DefaultChannelHandlerContext中,HeadContext是離 Java NIO 底層Channel最近的地方。三大包裹器,除了HeadContext,也沒有誰包含Unsafe。對完成出站的最終操做職能來講,沒有誰比HeadContext 更加直接。因此,這個出站處理的最後一棒,只能是HeadContext 了,呵呵。
至此爲止,write出站的整個流水線流程,已經所有講完。
從具體到抽象,咱們再回到出站處理的通用流程。
基本上,在流程上,全部的出站事件的處理過程,是一致的。
爲了方便說明,使用OUT_EVT符號代替一個通用出站操做。
通用的出站Outbound操做處理過程,大體以下:
(1)channel.OUT_EVT(msg);
(2)pipeline.OUT_EVT(msg);
(3)context.OUT_EVT(msg);
(4)context.OUT_EVT(msg,promise);
(5)context.OUT_EVT(msg,flush,promise);
(6)context.findContextOutbound();
(7)next.invoke(msg,flush,promise);
(8)handle.OUT_EVT(context,msg,promise);
(9)context.OUT_EVT(msg,promise);
上面的流程,若是短期內看不懂,能夠在回頭看看write出站的實例。
瘋狂創客圈 Netty 死磕系列 10多篇深度文章: 【博客園 總入口】 QQ羣:104131248