精進篇:netty源碼死磕7 html
巧奪天工——Pipeline入站流程詳解編程
在講解入站處理流程前,先腦補和鋪墊一下兩個知識點:緩存
(1)如何向Pipeline添加一個Handler節點安全
(2)Handler的出站和入站的區分方式ide
在Pipeline實例建立的同時,Netty爲Pipeline建立了一個Head和一個Tail,而且創建好了連接關係。函數
代碼以下:學習
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}this
也就是說,在加入業務Handler以前,Pipeline的內部雙向鏈表不是一個空鏈表。而新加入的Handler,加入的位置是,插入在鏈表的倒數第二個位置,在Tail的前面。spa
加入Handler的代碼,在DefaultChannelPipeline類中。指針
具體的代碼以下:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//檢查重複
checkMultiplicity(handler);
//建立上下文
newCtx = newContext(group, filterName(name, handler), handler);
//加入雙向鏈表
addLast0(newCtx);
//…
}
callHandlerAdded0(newCtx);
return this;
}
加入以前,首先進行Handler的重複性檢查。非共享類型的Handler,只能被添加一次。若是當前要添加的Handler是非共享的,而且已經添加過,那就拋出異常,不然,標識該handler已經添加。
什麼是共享類型,什麼是非共享類型呢?先聚焦一下主題,後面會詳細解答。
檢查完成後,給Handler建立包裹上下文Context,而後將Context加入到雙向列表的尾部Tail前面。
代碼以下:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
這裏主要是經過調整雙向連接的指針,完成節點的插入。若是對雙向鏈表不熟悉,能夠本身畫畫指向變化的草圖,就明白了。
對於入站和出站,Pipeline中兩種不一樣類型的Handler處理器,出站Handler和入站Handler。
入站(inBound)事件Handler的基類是 ChannelInboundHandler,出站(outBound)事件Handler的基類是 ChannelOutboundHandler。
處理入站(inBound)事件,最典型的就是處理Channel讀就緒事件,還有就是業務處理Handler。處理出站outBound操做,最爲典型的處理,是寫數據到Channel。
對應於兩種Handler處理器的Context 包裹器,更加須要區分入站和出站。對Context的區分方式,又是什麼呢?
首先,須要在Context加了一組boolean類型判斷屬性,判斷出站和入站的類型。這組屬性就是——inbound、outbound。這組屬性,定義在上下文包裹器的基類中——ContextAbstractChannelHandlerContext 定義。它們在構造函數中進行初始化。
ContextAbstractChannelHandlerContext 的構造器代碼以下: abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { private final boolean inbound; private final boolean outbound; AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { //…. this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; //… } //… }
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { //… private final ChannelHandler handler; private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); //…. this.handler = handler; } }
再看兩個非通用的HandlerContext——head和tail。
在HeadContext,則調用父類構造器的第五個參數(outbound)的值爲true,表示Head是一個出站類型的Context。代碼以下:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
//父類構造器
super(pipeline, null, HEAD_NAME, false, true);
//...
}
}
在TailContext,則調用父類構造器的第四個參數(inbound)的值爲true,表示Tail是一個入站類型的Context。代碼以下:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
//...
}
}
不管是哪一種類型的handler,Pipeline沒有單獨和分開的入站和出站鏈表,都是統一在一個雙向鏈表中進行管理。
下圖中,使用紫色表明入站Context,橙色表明出站Context。
在上圖中,橙色表示出站Context,紫色表示入站Context。
在上圖中的流程中,區分一個 ChannelHandlerContext究竟是in(入站)仍是out(出站) ,使用的是Context的isInbound() 和 isOutbound() 這一組方法。
贅述一下:
Tail是出站執行流程的啓動點,可是,它最後一個入站處理器。
Hearder,是入站流程的啓動起點,可是,它最後一個出站處理器。
感受,有點兒饒。容易讓人混淆。看完整個的入站流程和出站流程的詳細介紹,就清楚了。
入站事件前面已經講過,流向是從Java 底層IO到ChannelHandler。入站事件的類型包括鏈接創建和斷開、讀就緒、寫就緒等。
基本上,,在處理流程上,大部分的入站事件的處理過程,是一致的。
通用的入站Inbound事件處理過程,大體以下(使用IN_EVT符號代替一個通用事件):
(1)pipeline.fireIN_EVT
(2)AbstractChannelHandlerContext.invokeIN_EVT(head, msg);
(3)context.invokeIN_EVT(msg);
(4)handler.IN_EVT
(5)context.fireIN_EVT(msg);
(6)Connect.findContextInbound()
(7)context.invokeIN_EVT(msg);
上面的流程,若是短期內看不懂,沒有關係。能夠先看一個例子,再回來推敲學習這個通用流程。
下面以最爲常見和最好理解的事件——讀就緒的事件爲例,將Inbound事件作一個詳細的描述。
整個讀就緒的入站處理流程圖,以下:
入站事件處理的源頭,在Channel的底層Java NIO 就緒事件。
Netty對底層Java NIO的操做類,進行了封裝,封裝成了Unsafe系列的類。比方說,AbstractNioByteChannel 中,就有一個NioByteUnsafe 類,封裝了底層的Java NIO的底層Byte字節的讀取操做。
爲何叫Unsafe呢?
很簡單,就是在外部使用,是不安全的。Unsafe就是隻能在Channel內部使用的,在Netty 外部的應用開發中,不建議使用。Unsafe包裝了底層的數據讀取工做,包裝在Channel中,不須要應用程序關心。應用程序只須要從緩存中,取出緩存數據,完成業務處理便可。
Channel 讀取數據到緩存後,下一步就是調用Pipeline的fireChannelRead()方法,從這個點開始,正式開始了Handler的入站處理流程。
從Channel 到Pipeline這一段,Netty的代碼以下:
public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected class NioByteUnsafe extends AbstractNioUnsafe { @Override public final void read() { final ChannelPipeline pipeline = pipeline(); …… // 讀取結果. byteBuf = allocHandle.allocate(allocator); …… int localReadAmount = doReadBytes(byteBuf); ……… // 經過pipeline dispatch(分發)結果到Handler pipeline.fireChannelRead(byteBuf); …… } //經過重寫newUnsafe() 方法 //取得內部類NioSocketChannelUnsafe的實例 @Override protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); } … }
前面分析到,Pipeline中,入站事件處理流程的處理到的第一個Context是Head。
這一點,從DefaultChannelPipeline 源碼能夠獲得驗證,以下所示:
public class DefaultChannelPipeline implements ChannelPipeline { … @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } … }
Pipeline將內部鏈表的head頭做爲參數,傳入了invokeChannelRead的靜態方法中。
就像開動了流水線的開關,開啓了整個的流水線的循環處理。
一個Pipeline上有多個InBound Handler,每個InBound Handler的處理,能夠算作一次迭代,也能夠說成小迭代。
每個迭代,有四個動做。這個invokeIN_EVT方法,是整個四個動做的小迭代的起點。
四個動做,分別以下:
(1)invokeChannelRead(next, msg)
(2)context.invokeIN_EVT(msg);
(3)handler.IN_EVT
(4)context.fireIN_EVT(msg);
(5)Connect.findContextInbound()
局部的流程圖以下:
整個五個動做中,只有第三步在Handler中,其餘的四步都在Context中完成。
invokeChannelRead(next,msg) 靜態方法,很是關鍵,其重要做爲是:做爲流水線迭代處理的每一輪循環小迭代的第一步。在Context的抽象基類中,源碼以下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { //... static void invokeChannelRead(final AbstractChannelHandlerContext next, final Object msg) { …… next.invokeChannelRead(msg); …… } //... }
其次,這個方法沒有啥特別。只是作了一個二轉。將處理傳遞給context實例,調用context實例的invokeChannelRead方法。強調一下,使用了同一個名稱哈。可是後邊的invokeChannelRead,是一個實例方法,並且只有一個參數。
流水線小迭代第二步,觸發當前的Context實例的IN_EVT操做。
對於IN_EVT爲ChannelRead的時候,第二步方法爲invokeChannelRead,其源碼以下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext
{
private void invokeChannelRead(Object msg) {
……
((ChannelInboundHandler) handler()).channelRead(this, msg);
……
}
}
這一步很簡單,就是將context和msg(byteBuf)做爲參數,傳遞給Handler實例,完成業務處理。
在Handler中,能夠獲取到以上兩個參數實例,做爲業務處理的輸入。在業務Handler中的IN_EVT方法中,能夠寫本身的業務處理邏輯。
流水線小迭代第三步,完後Context實例中Handler的IN_EVT業務操做。
若是Handler中的IN_EVT方法中沒有寫業務邏輯,則Netty提供了默認的實現。默認源碼在ChannelInboundHandlerAdapter 適配器類中。
當IN_EVT爲ChannelRead的時候,第三步的默認實現源碼以下:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
{
//默認的通道讀操做
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
//...
}
讀完源碼發現,這份默認源碼,都沒有作什麼實際的處理。
惟一的乾的活,就是調用ctx.fireChannelRead(msg),將msg經過context再一次發射出去。
進入第四步。
流水線小迭代第四步,尋找下家,觸發下一家的入站處理。
整個是流水線的流動的關鍵一步,實現了向下一個HandlerContext的流動。
源碼以下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext
{
private final boolean inbound;
private final boolean outbound;
//...
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
//..
}
第四步仍是在ChannelInboundHandlerAdapter 適配器中定義。首先經過第五步,找到下一個Context,而後回到小迭代的第一步,完成了小迭代的一個閉環。
這一步,對於業務Handler而言,很重要。
在用戶Handler中,若是當前 Handler 須要將此事件繼續傳播下去,則調用contxt.fireIN_EVT方法。若是不這樣作, 那麼此事件的流水線傳播會提早終止。
第五步是查找下家。
代碼以下:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
{
//...
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
}
這個是一個標準的鏈表查詢操做。this表示當前的context,this.next表示下一個context。經過while循環,一直往流水線的下邊找,知道查找到下一個入站Context爲止。
假定流水下以下圖所示:
在上圖中,若是當前context是head,則下一個是Decoder;若是當前context是Decoder,則下一個是Business;若是當前context是Business,則下一個是Tail。
第五步,是在第四步調用的。
找到以後,第四步經過 invokeChannelRead(findContextInbound(), msg)這個靜態方法的調用,由回到小迭代的第一步,開始下一輪小的運行。
咱們在前面講到,在Netty中,Tail是最後一個IN boundContext。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
//…
//釋放msg的引用計數
ReferenceCountUtil.release(msg);
//..
}
}
在最後的一輪入站處理中。Tail沒有作任何的業務邏輯,僅僅是對msg 釋放一次引用計數。
這個msg ,是從channel 入站源頭的過來的byteBuf。有多是引用計數類型(ReferenceCounted)類型的緩存,則須要釋放其引用。若是不是ReferenceCounted,則什麼也不作。
關於緩存的引用計數,後續再開文章作專題介紹。
對入站(Inbound )事件的處理流程,作一下小節:
Inbound 事件是通知事件,當某件事情已經就緒後,從Java IO 通知上層Netty Channel。
Inbound 事件源頭是 Channel內部的UNSafe;
Inbound 事件啓動者是 Channel,經過Pipeline. fireIN_EVT啓動。
Inbound 事件在 Pipeline 中傳輸方向是從 head 到 tail。
Inbound 事件最後一個的處理者是 TailContext, 而且其處理方法是空實現。若是沒有其餘的處理者,則對Inbound ,TailContext是惟一的處理者。
Inbound 事件的向後傳遞方法是contxt.fireIN_EVT方法。在用戶Handler中,若是當前 Handler 須要將此事件繼續傳播下去,則調用contxt.fireIN_EVT方法。若是不這樣作, 那麼此事件的流水線傳播會提早終止。
瘋狂創客圈 Netty 死磕系列 10多篇深度文章: 【博客園 總入口】 QQ羣:104131248