源碼之下無祕密 ── 作最好的 Netty 源碼分析教程前端
Netty 源碼分析之 番外篇 Java NIO 的前生今世git
Java NIO 的前生今世 之一 簡介github
Java NIO 的前生今世 之二 NIO Channel 小結segmentfault
Netty 源碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭less
接上篇 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)
咱們注意到, pipeline.addXXX 都有一個重載的方法, 例如 addLast, 它有一個重載的版本是:
ChannelPipeline addLast(String name, ChannelHandler handler);
第一個參數指定了所添加的 handler 的名字(更準確地說是 ChannelHandlerContext 的名字, 不過咱們一般是以 handler 做爲敘述的對象, 所以說成 handler 的名字便於理解). 那麼 handler 的名字有什麼用呢? 若是咱們不設置name, 那麼 handler 會有怎樣的名字?
爲了解答這些疑惑, 老規矩, 依然是從源碼中找到答案.
咱們仍是以 addLast 方法爲例:
@Override public ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast(null, name, handler); }
這個方法會調用重載的 addLast 方法:
@Override public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addLast0(name, newCtx); } return this; }
第一個參數被設置爲 null, 咱們不關心它. 第二參數就是這個 handler 的名字. 看代碼可知, 在添加一個 handler 以前, 須要調用 checkDuplicateName 方法來肯定此 handler 的名字是否和已添加的 handler 的名字重複. 而這個 checkDuplicateName 方法咱們在前面已經有提到, 這裏再回顧一下:
private void checkDuplicateName(String name) { if (name2ctx.containsKey(name)) { throw new IllegalArgumentException("Duplicate handler name: " + name); } }
Netty 判斷一個 handler 的名字是否重複的依據很簡單: DefaultChannelPipeline 中有一個 類型爲 Map<String, AbstractChannelHandlerContext> 的 name2ctx 字段, 它的 key 是一個 handler 的名字, 而 value 則是這個 handler 所對應的 ChannelHandlerContext. 每當新添加一個 handler 時, 就會 put 到 name2ctx 中. 所以檢查 name2ctx 中是否包含這個 name 便可.
當沒有重名的 handler 時, 就爲這個 handler 生成一個關聯的 DefaultChannelHandlerContext 對象, 而後就將 name 和 newCtx 做爲 key-value 對 放到 name2Ctx 中.
若是咱們調用的是以下的 addLast 方法
ChannelPipeline addLast(ChannelHandler... handlers);
那麼 Netty 會調用 generateName 爲咱們的 handler 自動生成一個名字:
private String generateName(ChannelHandler handler) { WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)]; Class<?> handlerType = handler.getClass(); String name; synchronized (cache) { name = cache.get(handlerType); if (name == null) { name = generateName0(handlerType); cache.put(handlerType, name); } } synchronized (this) { // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid // any name conflicts. Note that we don't cache the names generated here. if (name2ctx.containsKey(name)) { String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'. for (int i = 1;; i ++) { String newName = baseName + i; if (!name2ctx.containsKey(newName)) { name = newName; break; } } } } return name; }
而 generateName 會接着調用 generateName0 來實際產生一個 handler 的名字:
private static String generateName0(Class<?> handlerType) { return StringUtil.simpleClassName(handlerType) + "#0"; }
自動生成的名字的規則很簡單, 就是 handler 的簡單類名加上 "#0", 所以咱們的 EchoClientHandler 的名字就是 "EchoClientHandler#0", 這一點也能夠經過調試窗口佐證:
前面章節中, 咱們知道 AbstractChannelHandlerContext 中有 inbound 和 outbound 兩個 boolean 變量, 分別用於標識 Context 所對應的 handler 的類型, 即:
inbound 爲真時, 表示對應的 ChannelHandler 實現了 ChannelInboundHandler 方法.
outbound 爲真時, 表示對應的 ChannelHandler 實現了 ChannelOutboundHandler 方法.
讀者朋友確定很疑惑了吧: 那究竟這兩個字段有什麼做用呢? 其實這還要從 ChannelPipeline 的傳輸的事件類型提及.
Netty 的事件能夠分爲 Inbound 和 Outbound 事件.
以下是從 Netty 官網上拷貝的一個圖示:
I/O Request via Channel or ChannelHandlerContext | +---------------------------------------------------+---------------+ | ChannelPipeline | | | \|/ | | +---------------------+ +-----------+----------+ | | | Inbound Handler N | | Outbound Handler 1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler N-1 | | Outbound Handler 2 | | | +----------+----------+ +-----------+----------+ | | /|\ . | | . . | | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| | [ method call] [method call] | | . . | | . \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 2 | | Outbound Handler M-1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 1 | | Outbound Handler M | | | +----------+----------+ +-----------+----------+ | | /|\ | | +---------------+-----------------------------------+---------------+ | \|/ +---------------+-----------------------------------+---------------+ | | | | | [ Socket.read() ] [ Socket.write() ] | | | | Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+
從上圖能夠看出, inbound 事件和 outbound 事件的流向是不同的, inbound 事件的流行是從下至上, 而 outbound 恰好相反, 是從上到下. 而且 inbound 的傳遞方式是經過調用相應的 ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是經過調用 ChannelHandlerContext.OUT_EVT() 方法. 例如 ChannelHandlerContext.fireChannelRegistered() 調用會發送一個 ChannelRegistered 的 inbound 給下一個ChannelHandlerContext, 而 ChannelHandlerContext.bind 調用會發送一個 bind 的 outbound 事件給 下一個 ChannelHandlerContext.
Inbound 事件傳播方法有:
ChannelHandlerContext.fireChannelRegistered() ChannelHandlerContext.fireChannelActive() ChannelHandlerContext.fireChannelRead(Object) ChannelHandlerContext.fireChannelReadComplete() ChannelHandlerContext.fireExceptionCaught(Throwable) ChannelHandlerContext.fireUserEventTriggered(Object) ChannelHandlerContext.fireChannelWritabilityChanged() ChannelHandlerContext.fireChannelInactive() ChannelHandlerContext.fireChannelUnregistered()
Oubound 事件傳輸方法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise) ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise) ChannelHandlerContext.write(Object, ChannelPromise) ChannelHandlerContext.flush() ChannelHandlerContext.read() ChannelHandlerContext.disconnect(ChannelPromise) ChannelHandlerContext.close(ChannelPromise)
注意, 若是咱們捕獲了一個事件, 而且想讓這個事件繼續傳遞下去, 那麼須要調用 Context 相應的傳播方法.
例如:
public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Connected!"); ctx.fireChannelActive(); } } public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) { System.out.println("Closing .."); ctx.close(promise); } }
上面的例子中, MyInboundHandler 收到了一個 channelActive 事件, 它在處理後, 若是但願將事件繼續傳播下去, 那麼須要接着調用 ctx.fireChannelActive().
Outbound 事件都是請求事件(request event)
, 即請求某件事情的發生, 而後經過 Outbound 事件進行通知.
Outbound 事件的傳播方向是 tail -> customContext -> head.
咱們接下來以 connect 事件爲例, 分析一下 Outbound 事件的傳播機制.
首先, 當用戶調用了 Bootstrap.connect 方法時, 就會觸發一個 Connect 請求事件, 此調用會觸發以下調用鏈:
Bootstrap.connect -> Bootstrap.doConnect -> Bootstrap.doConnect0 -> AbstractChannel.connect
繼續跟蹤的話, 咱們就發現, AbstractChannel.connect 其實由調用了 DefaultChannelPipeline.connect 方法:
@Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
而 pipeline.connect 的實現以下:
@Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }
能夠看到, 當 outbound 事件(這裏是 connect 事件)傳遞到 Pipeline 後, 它實際上是以 tail 爲起點開始傳播的.
而 tail.connect 其實調用的是 AbstractChannelHandlerContext.connect 方法:
@Override public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ... final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); ... next.invokeConnect(remoteAddress, localAddress, promise); ... return promise; }
findContextOutbound() 顧名思義, 它的做用是以當前 Context 爲起點, 向 Pipeline 中的 Context 雙向鏈表的前端尋找第一個 outbound 屬性爲真的 Context(即關聯着 ChannelOutboundHandler 的 Context), 而後返回.
它的實現以下:
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
當咱們找到了一個 outbound 的 Context 後, 就調用它的 invokeConnect 方法, 這個方法中會調用 Context 所關聯着的 ChannelHandler 的 connect 方法:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
若是用戶沒有重寫 ChannelHandler 的 connect 方法, 那麼會調用 ChannelOutboundHandlerAdapter 所實現的方法:
@Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); }
咱們看到, ChannelOutboundHandlerAdapter.connect 僅僅調用了 ctx.connect, 而這個調用又回到了:
Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect
這樣的循環中, 直到 connect 事件傳遞到DefaultChannelPipeline 的雙向鏈表的頭節點, 即 head 中. 爲何會傳遞到 head 中呢? 回想一下, head 實現了 ChannelOutboundHandler, 所以它的 outbound 屬性是 true.由於 head 自己既是一個 ChannelHandlerContext, 又實現了 ChannelOutboundHandler 接口
, 所以當 connect 消息傳遞到 head 後, 會將消息轉遞到對應的 ChannelHandler 中處理, 而剛好, head 的 handler() 返回的就是 head 自己:
@Override public ChannelHandler handler() { return this; }
所以最終 connect 事件是在 head 中處理的. head 的 connect 事件處理方法以下:
@Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
到這裏, 整個 Connect 請求事件就結束了.
下面以一幅圖來描述一個整個 Connect 請求事件的處理過程:
點此有無碼高清原圖
咱們僅僅以 Connect 請求事件爲例, 分析了 Outbound 事件的傳播過程, 可是其實全部的 outbound 的事件傳播都遵循着同樣的傳播規律, 讀者能夠試着分析一下其餘的 outbound 事件, 體會一下它們的傳播過程.
Inbound 事件和 Outbound 事件的處理過程有點鏡像.Inbound 事件是一個通知事件
, 即某件事已經發生了, 而後經過 Inbound 事件進行通知. Inbound 一般發生在 Channel 的狀態的改變或 IO 事件就緒.
Inbound 的特色是它傳播方向是 head -> customContext -> tail.
既然上面咱們分析了 Connect 這個 Outbound 事件, 那麼接着分析 Connect 事件後會發生什麼 Inbound 事件, 並最終找到 Outbound 和 Inbound 事件之間的聯繫.
當 Connect 這個 Outbound 傳播到 unsafe 後, 實際上是在 AbstractNioUnsafe.connect 方法中進行處理的:
@Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ... if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { ... } ... }
在 AbstractNioUnsafe.connect 中, 首先調用 doConnect 方法進行實際上的 Socket 鏈接, 當鏈接上後, 會調用 fulfillConnectPromise 方法:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { ... // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && isActive()) { pipeline().fireChannelActive(); } ... }
咱們看到, 在 fulfillConnectPromise 中, 會經過調用 pipeline().fireChannelActive() 將通道激活的消息(即 Socket 鏈接成功)發送出去.而這裏, 當調用 pipeline.fireXXX 後, 就是 Inbound 事件的起點.
所以當調用了 pipeline().fireChannelActive() 後, 就產生了一個 ChannelActive Inbound 事件, 咱們就從這裏開始看看這個 Inbound 事件是怎麼傳播的吧.
@Override public ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { channel.read(); } return this; }
哈哈, 果真, 在 fireChannelActive 方法中, 調用的是 head.fireChannelActive, 所以能夠證實了, Inbound 事件在 Pipeline 中傳輸的起點是 head.
那麼, 在 head.fireChannelActive() 中又作了什麼呢?
@Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); ... next.invokeChannelActive(); ... return this; }
上面的代碼應該很熟悉了吧. 回想一下在 Outbound 事件(例如 Connect 事件)的傳輸過程當中時, 咱們也有相似的操做:
首先調用 findContextInbound, 從 Pipeline 的雙向鏈表中中找到第一個屬性 inbound 爲真的 Context, 而後返回
調用這個 Context 的 invokeChannelActive
invokeChannelActive 方法以下:
private void invokeChannelActive() { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } }
這個方法和 Outbound 的對應方法(例如 invokeConnect) 一模一樣. 同 Outbound 同樣, 若是用戶沒有重寫 channelActive 方法, 那麼會調用 ChannelInboundHandlerAdapter 的 channelActive 方法:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); }
一樣地, 在 ChannelInboundHandlerAdapter.channelActive 中, 僅僅調用了 ctx.fireChannelActive 方法, 所以就會有以下循環:
Context.fireChannelActive -> Connect.findContextInbound -> nextContext.invokeChannelActive -> nextHandler.channelActive -> nextContext.fireChannelActive
這樣的循環中. 同理, tail 自己 既實現了 ChannelInboundHandler 接口, 又實現了 ChannelHandlerContext 接口, 所以當 channelActive 消息傳遞到 tail 後, 會將消息轉遞到對應的 ChannelHandler 中處理, 而剛好, tail 的 handler() 返回的就是 tail 自己:
@Override public ChannelHandler handler() { return this; }
所以 channelActive Inbound 事件最終是在 tail 中處理的, 咱們看一下它的處理方法:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { }
TailContext.channelActive 方法是空的. 若是讀者自行查看 TailContext 的 Inbound 處理方法時, 會發現, 它們的實現都是空的. 可見, 若是是 Inbound, 當用戶沒有實現自定義的處理器時, 那麼默認是不處理的.
用一幅圖來總結一下 Inbound 的傳輸過程吧:
點擊此能夠看高清無碼原圖
對於 Outbound事件:
Outbound 事件是請求事件(由 Connect 發起一個請求, 並最終由 unsafe 處理這個請求)
Outbound 事件的發起者是 Channel
Outbound 事件的處理者是 unsafe
Outbound 事件在 Pipeline 中的傳輸方向是 tail -> head.
在 ChannelHandler 中處理事件時, 若是這個 Handler 不是最後一個 Hnalder, 則須要調用 ctx.xxx (例如 ctx.connect) 將此事件繼續傳播下去. 若是不這樣作, 那麼此事件的傳播會提早終止.
Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT
對於 Inbound 事件:
Inbound 事件是通知事件, 當某件事情已經就緒後, 通知上層.
Inbound 事件發起者是 unsafe
Inbound 事件的處理者是 Channel, 若是用戶沒有實現自定義的處理方法, 那麼Inbound 事件默認的處理者是 TailContext, 而且其處理方法是空實現.
Inbound 事件在 Pipeline 中傳輸方向是 head -> tail
在 ChannelHandler 中處理事件時, 若是這個 Handler 不是最後一個 Hnalder, 則須要調用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 將此事件繼續傳播下去. 若是不這樣作, 那麼此事件的傳播會提早終止.
Outbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT
outbound 和 inbound 事件十分的鏡像, 而且 Context 與 Handler 直接的調用關係是否容易混淆, 所以讀者在閱讀這裏的源碼時, 須要特別的注意.
本文由 yongshun 發表於我的博客, 採用 署名-相同方式共享 3.0 中國大陸許可協議.
Email: yongshun1228@gmail.com
本文標題爲: Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (二)
本文連接爲: http://www.javashuo.com/article/p-hkcoeyiy-gz.html