本文內容主要參考<<Netty In Action>> 和Netty
的文檔和源碼,偏筆記向.java
先簡略瞭解一下ChannelPipeline
和ChannelHandler
.api
想象一個流水線車間.當組件從流水線頭部進入,穿越流水線,流水線上的工人按順序對組件進行加工,到達流水線尾部時商品組裝完成.數組
能夠將ChannelPipeline
當作流水線,ChannelHandler
當作流水線工人.源頭的組件當作event,如read,write等等.promise
Channel
鏈接了網絡套接字或可以進行I/O操做的組件,如read, write, connect, bind.
緩存
咱們能夠經過Channel
獲取一些信息.安全
Channel
的當前狀態(如,是否鏈接,是否打開)Channel
的配置參數,如buffer的sizeChannelPipeline
和與通道相關的請求Channel
接口定義了一組和ChannelInboundHandler
API密切相關的狀態模型.網絡
當
Channel
的狀態改變,會生成對應的event.這些event會轉發給ChannelPipeline
中的ChannelHandler
,handler會對其進行響應.併發
下面列出了 interface ChannelHandler 定義的生命週期操做, 在 ChannelHandler被添加到 ChannelPipeline 中或者被從 ChannelPipeline 中移除時會調用這些操做。這些方法中的每個都接受一個 ChannelHandlerContext 參數異步
ChannelInboundHandler
處理入站數據以及各類狀態變化,當Channel
狀態發生改變會調用ChannelInboundHandler
中的一些生命週期方法.這些方法與Channel
的生命密切相關.socket
入站數據,就是進入socket
的數據.下面展現一些該接口的生命週期API
當某個
ChannelInboundHandler
的實現重寫channelRead()
方法時,它將負責顯式地
釋放與池化的 ByteBuf 實例相關的內存。 Netty 爲此提供了一個實用方法ReferenceCountUtil.release()
.
@Sharable public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } }
這種方式還挺繁瑣的,Netty提供了一個SimpleChannelInboundHandler
,重寫channelRead0()
方法,就能夠在調用過程當中會自動釋放資源.
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // 不用調用ReferenceCountUtil.release(msg)也會釋放資源 } }
原理就是這樣,channelRead
方法包裝了channelRead0
方法.
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
出站操做和數據將由 ChannelOutboundHandler 處理。它的方法將被 Channel、 ChannelPipeline 以及 ChannelHandlerContext 調用。
ChannelOutboundHandler 的一個強大的功能是能夠按需推遲操做或者事件,這使得能夠經過一些複雜的方法來處理請求。例如, 若是到遠程節點的寫入被暫停了, 那麼你能夠推遲沖刷操做並在稍後繼續。
ChannelPromise與ChannelFuture: ChannelOutboundHandler中的大部分方法都須要一個ChannelPromise參數, 以便在操做完成時獲得通知。 ChannelPromiseChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(), 從而使ChannelFuture不可變.
ChannelHandlerAdapter顧名思義,就是handler的適配器.你須要知道什麼是適配器模式,假設有一個A接口,咱們須要A的subclass實現功能,可是B類中正好有咱們須要的功能,不想複製粘貼B中的方法和屬性了,那麼能夠寫一個適配器類Adpter繼承B實現A,這樣一來Adpter是A的子類而且能直接使用B中的方法,這種模式就是適配器模式.
就好比Netty中的SslHandler
類,想使用ByteToMessageDecoder
中的方法進行解碼,可是必須是ChannelHandler
子類對象才能加入到ChannelPipeline
中,經過以下簽名和其實現細節(SslHandler
實現細節就不貼了)就可以做爲一個Handler去處理消息了.
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
下圖是ChannelHandler和Adpter的UML圖示.
ChannelHandlerAdapter提供了一些實用方法
isSharable()
若是其對應的實現被標註爲 Sharable, 那麼這個方法將返回 true, 表示它能夠被添加到多個 ChannelPipeline中 .若是想在本身的ChannelHandler中使用這些適配器類,只須要擴展他們,重寫那些想要自定義的方法便可.
在使用ChannelInboundHandler.channelRead()
或ChannelOutboundHandler.write()
方法處理數據時要避免資源泄露,ByteBuf那篇文章提到過引用計數,當使用完某個ByteBuf以後記得調整引用計數.
Netty提供了一個class ResourceLeakDetector
來幫助診斷資源泄露,這可以幫助你判斷應用的運行狀況,可是若是但願提升吞吐量(好比搞一些競賽),關閉內存診斷能夠提升吞吐量.
泄露檢測級別能夠經過將下面的 Java 系統屬性設置爲表中的一個值來定義:
java -Dio.netty.leakDetectionLevel=ADVANCED
若是帶着該 JVM 選項從新啓動你的應用程序,你將看到本身的應用程序最近被泄漏的緩衝
區被訪問的位置。下面是一個典型的由單元測試產生的泄漏報告:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest 15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. Recent access records: 1 #1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString( AdvancedLeakAwareByteBuf.java:697) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml( XmlFrameDecoderTest.java:157) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages( XmlFrameDecoderTest.java:133) ...
消費入站消息釋放資源
@Sharable public class DiscardInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg);// 用於釋放資源的工具類 } }
SimpleChannelInboundHandler
中的channelRead0()會消費消息以後自動釋放資源.
出站釋放資源
@Sharable public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { // 仍是經過util工具類釋放資源 ReferenceCountUtil.release(msg); // 通知ChannelPromise,消息已經處理 promise.setSuccess(); } }
重要的是, 不只要釋放資源,還要通知 ChannelPromise。不然可能會出現 ChannelFutureListener 收不到某個消息已經被處理了的通知的狀況。總之,若是一個消息被消費或者丟棄了, 而且沒有傳遞給 ChannelPipeline 中的下一個ChannelOutboundHandler, 那麼用戶就有責任調用ReferenceCountUtil.release()。若是消息到達了實際的傳輸層, 那麼當它被寫入時或者 Channel 關閉時,都將被自動釋放。
每個新建立的 Channel 都將會被分配一個新的 ChannelPipeline。這項關聯是永久性的; Channel 既不能附加另一個 ChannelPipeline,也不能分離其當前的。在 Netty 組件的生命週期中,這是一項固定的操做,不須要開發人員的任何干預。
根據事件的起源,事件將會被 ChannelInboundHandler 或者 ChannelOutboundHandler 處理。隨後, 經過調用 ChannelHandlerContext 實現,它將被轉發給同一超類型的下一個ChannelHandler。
ChannelHandlerContext使得ChannelHandler可以和它的ChannelPipeline以及其餘的ChannelHandler 交 互 。 ChannelHandler 可 以 通 知 其 所 屬 的 ChannelPipeline 中 的 下 一 個ChannelHandler,甚至能夠動態修改它所屬的ChannelPipeline.
這是一個同時具備入站和出站 ChannelHandler 的 ChannelPipeline 的佈局,而且印證了咱們以前的關於 ChannelPipeline 主要由一系列的 ChannelHandler 所組成的說法。 ChannelPipeline 還提供了經過 ChannelPipeline 自己傳播事件的方法。若是一個入站事件被觸發,它將被從 ChannelPipeline 的頭部開始一直被傳播到 Channel Pipeline 的尾端。
你可能會說, 從事件途經 ChannelPipeline 的角度來看, ChannelPipeline 的頭部和尾端取決於該事件是入站的仍是出站的。然而 Netty 老是將 ChannelPipeline 的入站口(圖 的左側)做爲頭部,而將出站口(該圖的右側)做爲尾端。
當你完成了經過調用 ChannelPipeline.add*()方法將入站處理器( ChannelInboundHandler)和 出 站 處 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline 之 後 , 每 一 個ChannelHandler 從頭部到尾端的順序位置正如同咱們方纔所定義它們的同樣。所以,若是你將圖 6-3 中的處理器( ChannelHandler)從左到右進行編號,那麼第一個被入站事件看到的 ChannelHandler 將是1,而第一個被出站事件看到的 ChannelHandler 將是 5。在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配。若是不匹配, ChannelPipeline 將跳過該ChannelHandler 並前進到下一個,直到它找到和該事件所指望的方向相匹配的爲止。 (固然, ChannelHandler 也能夠同時實現ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)
修改指的是添加或刪除ChannelHandler
代碼示例
ChannelPipeline pipeline = ..; FirstHandler firstHandler = new FirstHandler(); // 先添加一個Handler到ChannelPipeline中 pipeline.addLast("handler1", firstHandler); // 這個Handler放在了first,意味着放在了handler1以前 pipeline.addFirst("handler2", new SecondHandler()); // 這個Handler被放到了last,意味着在handler1以後 pipeline.addLast("handler3", new ThirdHandler()); ... // 經過名稱刪除 pipeline.remove("handler3"); // 經過對象刪除 pipeline.remove(firstHandler); // 名稱"handler2"替換成名稱"handler4",並切handler2的實例替換成了handler4的實例 pipeline.replace("handler2", "handler4", new ForthHandler());
這種方式很是靈活,按照須要更換或插入handler
達到咱們想要的效果.
ChannelHandler的執行和阻塞
一般 ChannelPipeline 中的每個 ChannelHandler 都是經過它的 EventLoop( I/O 線程)來處理傳遞給它的事件的。因此相當重要的是不要阻塞這個線程,由於這會對總體的 I/O 處理產生負面的影響。
但有時可能須要與那些使用阻塞 API 的遺留代碼進行交互。對於這種狀況, ChannelPipeline 有一些接受一個 EventExecutorGroup 的 add()方法。若是一個事件被傳遞給一個自定義的 EventExecutorGroup ,它將被包含在這個 EventExecutorGroup 中的某個 EventExecutor 所處理,從而被從該Channel 自己的 EventLoop 中移除。對於這種用例, Netty 提供了一個叫 DefaultEventExecutorGroup 的默認實現。
pipeline對handler的操做
入站
出站
每當有ChannelHandler
添加到ChannelPipeline
中,都會建立ChannelHandlerContext
.若是調用Channel
或ChannelPipeline
上的方法,會沿着整個ChannelPipeline
傳播,若是調用ChannelHandlerContext
上的相同方法,則會從對應的當前ChannelHandler
進行傳播.
ChannelHandlerContext
和 ChannelHandler
之間的關聯(綁定)是永遠不會改變的,因此緩存對它的引用是安全的;ChannelHandlerContext
的方法將產生更短的事件流, 應該儘量地利用這個特性來得到最大的性能。從ChannelHandlerContext訪問channel
ChannelHandlerContext ctx = ..; // 獲取channel引用 Channel channel = ctx.channel(); // 經過channel寫入緩衝區 channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
從ChannelHandlerContext訪問ChannelPipeline
ChannelHandlerContext ctx = ..; // 獲取ChannelHandlerContext ChannelPipeline pipeline = ctx.pipeline(); // 經過ChannelPipeline寫入緩衝區 pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
有時候咱們不想從頭傳遞數據,想跳過幾個handler,從某個handler開始傳遞數據.咱們必須獲取目標handler以前的handler關聯的ChannelHandlerContext.
ChannelHandlerContext ctx = ..; // 直接經過ChannelHandlerContext寫數據,發送到下一個handler ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
好了,ChannelHandlerContext的基本使用應該掌握了,可是你真的理解ChannelHandlerContext,ChannelPipeline和Channelhandler之間的關係了嗎.咱們老看一下Netty的源碼.
先看一下AbstractChannelHandlerContext
類,這個類像不像雙向鏈表中的一個Node,
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; ... }
再來看一看DefaultChannelPipeline
,ChannelPipeline
中擁有ChannelHandlerContext
這個節點的head和tail,
並且DefaultChannelPipeline
類中並無ChannelHandler
成員或handler數組.
public class DefaultChannelPipeline implements ChannelPipeline { ... final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ...
因此addFirst
向pipeline中添加了handler到底添加到哪了呢.看一下pipeline中的addFirst方法
@Override public final ChannelPipeline addFirst(String name, ChannelHandler handler) { return addFirst(null, name, handler); } @Override public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { // 檢查handler是否具備複用能力,不重要 checkMultiplicity(handler); // 名稱,不重要. name = filterName(name, handler); // 這個方法建立了DefaultChannelHandlerContext,handler是其一個成員屬性 // 你如今應該明白了上面說的添加handler會建立handlerContext了吧 newCtx = newContext(group, name, handler); // 這個方法 addFirst0(newCtx);
// 這個方法是調整pipeline中HandlerContext的指針, // 就是更新HandlerContext鏈表節點之間的位置 private void addFirst0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext nextCtx = head.next; newCtx.prev = head; newCtx.next = nextCtx; head.next = newCtx; nextCtx.prev = newCtx; }
簡單總結一下,pipeline擁有context(自己像一個鏈表的節點)組成的節點的雙向鏈表首尾,能夠看作pipeline擁有一個context鏈表,context擁有成員handler,這即是三者之間的關係.實際上,handler做爲消息處理的主要組件,實現了和pipeline的解耦,咱們能夠只有一個handler,可是被封裝進不一樣的context可以被不一樣的pipeline使用.
緩存ChannelHandlerContext引用
@Sharable public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) { this.ctx = ctx; } public void send(String msg) { ctx.writeAndFlush(msg); } }
由於一個 ChannelHandler 能夠從屬於多個 ChannelPipeline,因此它也能夠綁定到多個 ChannelHandlerContext 實例。 對於這種用法指在多個ChannelPipeline 中共享同一個 ChannelHandler, 對應的 ChannelHandler 必需要使用@Sharable 註解標註; 不然,試圖將它添加到多個 ChannelPipeline 時將會觸發異常。
@Sharable錯誤用法
@Sharable public class UnsharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { count++; System.out.println("channelRead(...) called the " + count + " time"); ctx.fireChannelRead(msg); } }
這段代碼的問題在於它擁有狀態 , 即用於跟蹤方法調用次數的實例變量count。將這個類的一個實例添加到ChannelPipeline將極有可能在它被多個併發的Channel訪問時致使問題。(固然,這個簡單的問題能夠經過使channelRead()方法變爲同步方法來修正。)
總之,只應該在肯定了你的 ChannelHandler 是線程安全的時才使用@Sharable 註解。
處理入站事件的過程當中有異常被拋出,那麼它將從它在ChannelInboundHandler裏被觸發的那一點開始流經 ChannelPipeline。要想處理這種類型的入站異常,你須要在你的 ChannelInboundHandler 實現中重寫下面的方法。
public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause) throws Exception // 基本處理方式 public class InboundExceptionHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
由於異常將會繼續按照入站方向流動(就像全部的入站事件同樣), 因此實現了前面所示邏輯的 ChannelInboundHandler 一般位於 ChannelPipeline 的最後。這確保了全部的入站異常都老是會被處理,不管它們可能會發生在ChannelPipeline 中的什麼位置。
ChannelHandler.exceptionCaught()的默認實現是簡單地將當前異常轉發給ChannelPipeline 中的下一個 ChannelHandler;
要想定義自定義的處理邏輯,你須要重寫 exceptionCaught()方法。而後你須要決定是否須要將該異常傳播出去。
ChannelPromise setSuccess(); ChannelPromise setFailure(Throwable cause);
1.添加ChannelFutureListener到ChannelFuture
ChannelFuture future = channel.write(someMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } });
2.添加ChannelFutureListener到ChannelPromise
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } }); } }