Channel 概念與 java.nio.channel 概念一致, 用以鏈接IO設備 (socket, 文件等) 的紐帶. 例如將網絡的讀、寫, 客戶端發起鏈接, 主動關閉鏈接, 鏈路關閉, 獲取通訊雙方的網絡地址等.java
Channel 的 IO 類型主要有兩種: 非阻塞IO (NIO) 以及阻塞IO(OIO).promise
數據傳輸類型有兩種: 按事件消息傳遞 (Message) 以及按字節傳遞 (Byte).服務器
適用方類型也有兩種: 服務器(ServerSocket) 以及客戶端(Socket). 還有一些根據傳輸協議而制定的的Channel, 如: UDT、SCTP等.網絡
Netty 按照類型逐層設計相應的類. 最底層的爲抽象類 AbstractChannel
, 再以此根據IO類型、數據傳輸類型、適用方類型實現. 類圖能夠一目瞭然, 以下圖所示:異步
channelRegistered 狀態socket
/** * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop} */ void channelRegistered(ChannelHandlerContext ctx) throws Exception;
從註釋裏面能夠看到是在 Channel
綁定到 Eventloop
上面的時候調用的.ide
不論是 Server 仍是 Client, 綁定到 Eventloop
的時候, 最終都是調用 Abstract.initAndRegister()
這個方法上(Server是在 AbstractBootstrap.doBind()
的時候調用的, Client 是在 Bootstrap.doConnect()
的時候調用的).oop
initAndRegister()
方法定義以下:佈局
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // 把channel綁定到Eventloop對象上面去 ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
繼續跟蹤下去會定位到 AbstractChannel.AbstractUnsafe.register0()
方法上.性能
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 作實際的綁定動做。把Channel感興趣的事件註冊到Eventloop.selector上面.具體實如今Abstract.doRegister()方法內 doRegister(); neverRegistered = false; registered = true; // 經過pipeline的傳播機制,觸發handlerAdded事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); // 經過pipeline的傳播機制,觸發channelRegistered事件 pipeline.fireChannelRegistered(); // 尚未綁定,因此這裏的 isActive() 返回false. if (isActive()) { if (firstRegistration) { // 若是當前鏈路已經激活,則調用channelActive()方法 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
從上面的代碼也能夠看出, 在調用完 pipeline.fireChannelRegistered()
以後, 緊接着會調用 isActive()
判斷當前鏈路是否激活, 若是激活了則會調用 pipeline.fireChannelActive()
方法.
這個時候, 對於 Client 和 Server 都尚未激活, 因此, 這個時候不論是 Server 仍是 Client 都不會調用 pipeline.fireChanenlActive()
方法.
channelActive 狀態
從啓動器的 bind()
接口開始, 往下調用 doBind()
方法:
private ChannelFuture doBind(final SocketAddress localAddress) { // 初始化及註冊 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 調用 doBind0 doBind0(regFuture, channel, localAddress, promise); return promise; } else { .... } }
doBind
方法又會調用 doBind0()
方法, 在 doBind0()
方法中會經過 EventLoop
去執行 channel
的 bind()
任務.
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 調用channel.bind接口 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
doBind0()
方法往下會調用到 pipeline.bind(localAddress, promise)
; 方法, 經過 pipeline
的傳播機制, 最終會調用到 AbstractChannel.AbstractUnsafe.bind()
方法, 這個方法主要作兩件事情:
doBind()
: 調用底層JDK API進行 Channel 的端口綁定.pipeline.fireChannelActive()
.@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { .... // wasActive 在綁定成功前爲 false boolean wasActive = isActive(); try { // 調用doBind()調用JDK底層API進行端口綁定 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // 完成綁定以後,isActive() 返回true if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { // 觸發channelActive事件 pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
也就是說當有新客戶端鏈接的時候, 會變成活動狀態.
channelInactive 狀態
fireChannelnactive()
方法在兩個地方會被調用: Channel.close()
和 Channel.disconnect()
.
在調用前會先確認狀態是從 Active
--->Inactive
.
channelUnregistered 狀態
fireChannelUnregistered()
方法是在 Channel
從 Eventloop
中解除註冊的時候被調用的. Channel.close()
的時候被觸發執行.
handlerAdded()
: 添加到 ChannelPipeline
時調用.handlerRemoved()
: 從 ChannelPipeline
中移除時調用.exceptionCaught()
: 處理過程當中在 ChannelPipeline
中有錯誤產生時調用.
處理 I/O 事件或截獲 I/O 操做, 並將其轉發到 ChannelPipeline
中的下一個處理程序. ChannelHandler
自己不提供許多方法, 但一般必須實現其子類型之一:
ChannelInboundHandler
: 處理入站數據以及各類狀態變化.ChannelOutboundHandler
: 處理出站數據而且容許攔截全部的操做.channelRegistered()
: 當 Channel 已經註冊到它的 EventLoop 而且可以處理 I/O 時被調用.channelUnregistered()
: 當 Channel 從他的 EventLoop 註銷而且沒法處理任何 I/O 時被調用.channelActive()
: 當 Channel 處於活動狀態時被調用.channelInactive()
: 當 Channel 離開活動狀態而且再也不鏈接遠程節點時被調用.channelRead()
: 當從 Channel 讀取數據時被調用.channelReadComplete()
: 當 Channel 上的一個讀操做完成時被調用. 當全部可讀字節都從 Channel 中讀取以後, 將會調用該回調方法.
出站操做和數據將由 ChannelOutboundHandler 處理. 它的方法將被 Channel
ChannelPipeline
以及 ChannelHandlerContext
調用.
ChannelOutboundHandler
的一個強大的功能是能夠按需推遲操做或事件, 這使得能夠經過一些複雜的方法來處理請求. 例如, 若是到遠程節點的寫入被暫停, 那麼你能夠推遲刷新操做並在稍後繼續.
connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)
: 當請求將 Channel 鏈接到遠程節點時被調用.disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
: 當請求將 Channel 從遠程節點斷開時被調用.deregister(ChannelHandlerContext ctx, ChannelPromise promise)
: 當請求將 Channel 從它的 EventLoop 註銷時被調用.read(ChannelHandlerContext ctx)
: 當請求從 Channel 讀取更多的數據時被調用.write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
: 當請求經過 Channel 將數據寫到遠程節點時被調用.flush(ChannelHandlerContext ctx)
: 當請求從 Channel 將入隊數據沖刷到遠程節點時被調用.
ChannelPromise 和 ChannelFuture
ChannelFuture
表示 Channel
中異步I/O操做的結果, 在 netty 中全部的 I/O 操做都是異步的, I/O 的調用會直接返回, 能夠經過 ChannelFuture
來獲取 I/O 操做的結果或者狀態信息.
當 I/O 操做開始時, 將建立一個新對象. 新的對象是未完成的-它既沒有成功, 也沒有失敗, 也沒有被取消, 由於 I/O 操做尚未完成.
若是 I/O 操做已成功完成(失敗或取消), 則對象將標記爲已完成, 其中包含更具體的信息, 例如故障緣由.
請注意, 即便失敗和取消屬於已完成狀態.
ChannelPromise
是 ChannelFuture
的一個子接口, 其定義了一些可寫的方法, 如 setSuccess()
和 setFailure()
, 從而使 ChannelFuture
不可變.
優先使用addListener(GenericFutureListener),而非await()
當作了一個 I/O 操做並有任何後續任務的時候, 推薦優先使用 addListener(GenericFutureListener)
的方式來得到通知, 而非 await()
addListener(GenericFutureListener)
是非阻塞的. 它會把特定的 ChannelFutureListener
添加到 ChannelFuture
中, 而後 I/O 線程會在 I/O 操做相關的 future 完成的時候通知監聽器.
ChannelFutureListener
會利於最佳的性能和資源的利用, 由於它一點阻塞都沒有. 並且不會形成死鎖.
ChannelInboundHandlerAdapter
和 ChannelOutboundHandlerAdapter
這兩個適配器類分別提供了 ChannelInboundHandler
和 ChannelOutboundHandler
的基本實現, 它們繼承了共同的父接口 ChannelHandler
的方法, 擴展了抽象類 ChannelHandlerAdapter
.
ChannelHandlerAdapter
還提供了實用方法 isSharable()
.
若是其對應的實現被標註爲 Sharable
, 那麼這個方法將返回 true
, 表示它能夠被添加到多個 ChannelPipeline
中.
ChannelInboundHandlerAdapter
和 ChannelOutboundHandlerAdapter
中所提供的方法體調用了其相關聯的 ChannelHandlerContext
上的等效方法, 從而將事件轉發到了 ChannelPipeline
中的 ChannelHandler
中.
ChannelPipeline
將多個 ChannelHandler
連接在一塊兒來讓事件在其中傳播處理. 一個 ChannelPipeline
中可能不只有入站處理器, 還有出站處理器, 入站處理器只會處理入站的事件, 而出站處理器只會處理出站的數據.
每個新建立的 Channel
都將會分配一個新的 ChannelPipeline
, 不能附加另外一個 ChannelPipeline
, 也不能分離當前的.
經過調用 ChannelHandlerContext
實現, 它將被轉發給同一個超類型的下一個 ChannelHandler
.
從事件途徑 ChannelPilpeline
的角度來看, ChannelPipeline
的頭部和尾端取決於該事件是入站的仍是出站的.
而 Netty 老是將 ChannelPilpeline
的入站口 (左側) 做爲頭部, 將出站口 (右側) 做爲尾端.
當經過調用 ChannelPilpeline.add*()
方法將入站處理器和出站處理器混合添加到 ChannelPilpeline
以後, 每個 ChannelHandler
從頭部到尾端的順序就是咱們添加的順序.
在 ChannelPilpeline
傳播事件時, 它會測試 ChannelPilpeline
中的下一個 ChannelHandler
的類型是否和事件的運動方向相匹配. 若是不匹配, ChannelPilpeline
將跳過該 ChannelHandler
並前進到下一個, 直到它找到和該事件指望的方向相匹配的爲止.
修改 ChannelPipeline
這裏指修改 ChannelPipeline
中的 ChannelHandler
的編排.
經過調用 ChannelPipeline
上的相關方法, ChannelHandler
能夠添加, 刪除或者替換其餘的 ChannelHandler
, 從而實時地修改 ChannelPipeline
的佈局.
addFirst // 將 ChannelHandler 插入第一個位置 addBefore // 在某個 ChannelHandler 以前添加一個 addAfter // 在某個 ChannelHandler 以後添加一個 addLast // 將 ChannelHandler 插入最後一個位置 remove // 移除某個 ChannelHandler replace // 將某個 ChannelHandler 替換成指定 ChannelHandler
ChannelHandlerContext
表明了 ChanelHandler
和 ChannelPipeline
之間的關聯, 每當有 ChanelHandler
添加到 ChannelPipeline
中, 都會建立 ChannelHandlerContext
.
ChannelHandlerContext
的主要功能是管理它所關聯的 ChannelPipeline
和同一個 ChannelPipeline
中的其餘 ChanelHandler
之間的交互.
ChannelHandlerContext
有不少的方法, 其中一些方法也存在於 Channel
和 ChannelPipeline
上, 可是有一點重要的不一樣.
若是調用 Channel
和 ChannelPipeline
上的這些方法將沿着 ChannelPipeline
進行傳播(從頭或尾開始).
而調用位於 ChannelHandlerContext
上的相同方法, 則將從當前所關聯的 ChannelHandler
開始, 而且只會傳播給位於該 ChannelPipeline
中的下一個可以處理該事件的 ChannelHandler
.
這樣作能夠減小 ChannelHandler
的調用開銷.
上圖爲 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之間的關係.