一、Channel的生命週期java
Interface Channel定義了一組和ChannelInboundHandler API密切相關的簡單但功能強大的狀態模型,如下列出Channel的4個狀態。promise
ChannelUnregistered:Channel已經被建立,但還未註冊到EventLoop緩存
ChannelRegistered:Channel已經被註冊到了EventLoop安全
ChannelActive:Channel處於活動狀態(已經鏈接到它的遠程節點)。它如今能夠接收和發送數據了併發
ChannelInactive:Channel沒有鏈接到遠程節點異步
Channel的正常生命週期以下圖所示,當這些狀態發生改變時,將會生成對應的事件。這些事件將會被轉發給ChannelPipeline中的ChannelHandler,其能夠隨後對它們作出響應。
ide
二、ChannelHandler的生命週期工具
下表列出了interface ChannelHandler定義的生命週期操做,在ChannelHandler被添加到ChannelPipeline中或者被從ChannelPipeline中移除時會調用這些操做,這些方法中的每個都接受一個ChannelHandlerContext參數。oop
handlerAdded:當把ChannelHandler添加到ChannelPipeline中時被調用佈局
handlerRemoved:當從ChannelPipeline中移除ChannelHandler時被調用
exceptionCaught:當處理過程當中在ChannelPipeline中有錯誤產生時被調用
Netty定義了下面兩個重要的ChannelHandler子接口:
·ChannelInboundHandler——處理入站數據以及各類狀態變化
·ChannelOutboundHandler——處理出站數據而且容許攔截全部的操做
三、ChannelInboundHandler接口
當某個ChannelInboundHandler的實現重寫channelRead()方法時,它將負責顯式地釋放與池化ByteBuf實例相關的內存,Netty爲此提供了一個實用方法ReferenceCountUtil.release()。
@ChannelHandler.Sharable//擴展了ChannelInboundHandlerAdapterpublic class DiscardHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //丟棄已接收的消息 ReferenceCountUtil.release(msg); } }
Netty將使用WARN級別的日誌消息記錄未釋放的資源,使得能夠很是簡單地在代碼中發現違規的實例,可是以這種方式管理資源可能很繁瑣。一個更加簡單的方式是使用SimpleChannelInboundHandler。
@ChannelHandler.Sharable//擴展了SimpleChannelInboundHandlerpublic class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { //不須要任何顯式的資源釋放 //No need to do anything special } }
因爲SimpleChannelInboundHandler會自動釋放資源,因此你不該該存儲指向任何消息的引用供未來使用,由於這些引用都將會失效。
四、ChannelOutboundHandler接口
出站操做和數據將由ChannelOutboundHandler處理,它的方法將被Channel、ChannelPipeline以及ChannelHandlerContext調用。
ChannelOutboundHandler的一個強大的功能是能夠按需推遲操做或者事件,這使得能夠經過一些複雜的方法來處理請求。例如,若是到遠程節點的寫入被暫停了,那麼你能夠推遲沖刷並在稍後繼續。
ChannelPromise與ChannelFuture : ChannelOutboundHandler中的大部分方法都須要一個ChannelPromise參數,以便在操做完成時獲得通知。ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(),從而使ChannelFuture不可變。
五、ChannelHandler適配器
你可使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter類做爲本身的ChannelHandler的起始點。這兩個適配器分別提供了ChannelInboundHandler和ChannelOutboundHandler的基本實現,經過擴展抽象類ChannelHandlerAdapter,他們得到了他們共同的超接口ChannelHandler的方法。生成的類的層次結構以下圖。
ChannelHandlerAdapter還提供了使用方法isSharable(),若是其對應的實現被標註爲Sharable,那麼這個方法都將返回true,表示它能夠被添加到多個ChannelPipeline中。
在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法體調用了其相關聯的ChannelHandlerContext上的等效方法,從而將事件轉發到了ChannelPipeline中的下一個ChannelHandler中。
六、資源管理
每當經過調用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法來處理數據時,你都須要確保沒有任何的資源泄露。你可能還記得前面的章節中所提到的,Netty使用引用技術來處理池化的ByteBuf。因此在徹底使用完某個ByteBuf後,調整其引用計數是很重要的。
爲了幫助你診斷潛在的(資源泄露)問題,Netty提供了class ResourceLeakDetector,它將對你應用程序的緩衝區分配作大約1%的採樣來檢測內存泄露。相關的開銷是很是小的。
Netty定義了4種泄露檢測級別。
DISABLED——禁用泄露檢測
SIMPLE——使用1%的默認採樣率檢測並報告任何發現的泄露
ADVANCED——使用默認的採樣率,報告所發現的任何的泄露以及對應的消息被訪問的位置
PARANOID——相似於ADVANCED,可是其將會對每次訪問都進行採樣,這對性能將會有很大的影響,應該只在調試階段使用
泄露檢測級別能夠經過將下面的Java系統屬性設置爲表中的一個值來定義:
java -Dio.netty.leakDetectionLevel = ADVANCED
若是帶着該JVM選項從新啓動你的應用程序,你將看到本身的應用程序最近被泄露的緩衝區被訪問的位置。
實現ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法時,應該如何使用這個診斷工具來防止泄露呢?讓咱們看看你的channelRead()操做直接消費入站消息的狀況,也就是說,他不會經過調用ChannelHandlerContext.fireChannelRead()方法將入站消息轉發給下一個ChannelInboundHandler。
消費入站消息的簡單方式: 因爲消費入站數據是一項常規任務,因此Netty提供了一個特殊的被稱爲SimpleChannelInboundHandler的ChannelInboundHandler實現,這個實現會在消息被channelRead0()方法消費以後自動釋放消息。
在出站方向這邊,若是你處理了write()操做並丟棄了一個消息,那麼你也應該負責釋放它。如下代碼展現了一個丟棄全部的寫入數據的實現。
@ChannelHandler.Sharablepublic class DiscardoutBoundHandler extends ChannelOutboundHandlerAdapter{
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { //釋放資源 ReferenceCountUtil.release(msg); //通知ChannelPromise數據已經被處理了 promise.setSuccess(); } }
重要的是,不只要釋放資源,還要通知ChannelPromise。不然可能會出現ChannelFutureListener收不到某個消息已經被處理了的通知的消息。
總之,若是一個消息被消費或者丟棄了,而且沒有傳遞給ChannelPipeline中的下一個ChannelOutboundHandler,那麼用戶就有責任調用ReferenceCountUtil.release()。若是消息到達了實際的傳輸層,那麼當它被寫入時或者Channel關閉時,都將被自動釋放。
七、ChannelPipeline接口
若是你認爲ChannelPipeline是一個攔截流經Channel的入站和出站事件的ChannelHandler實例鏈,那麼就很容易看出這些ChannelHandler之間的交互式如何組成一個應用程序數據和時間處理邏輯的核心的。
每個新建立的Channel都將會被分配一個新的ChannelPipeline。這項關聯時永久的,Channel即不能附加另一個ChannelPipeline,也不能分離其當前的,在Netty組件的生命週期中,這是一項固定的操做,不須要開發人員的任何干預。
根據事件的起源,事件將會被ChannelInboundHandler或者ChannelOutboundHandler處理,隨後,經過調用ChannelHandlerContext實現,它將被轉發給同一個超類型的下一個ChannelHandler。
ChannelHandlerContext:ChannelHandlerContext使得ChannelHandler可以和它的ChannelPipeline以及其餘的ChannelHandler交互,ChannelHandler能夠通知其所屬的ChannelPipeline中的下一個ChannelHandler,甚至能夠動態修改它所屬的ChannelPipeline。ChannelHandlerContext具備豐富的用於處理事件和執行I/O操做的API。
下圖展現了一個典型的同時具備入站和出站ChannelHandler的ChannelPipeline的佈局,而且印證了咱們以前的關於ChannelPipeline主要由一系列的ChannelHandler所組成的說法,ChannelPipeline還提供了經過ChannelPipeline自己傳播事件的方法。若是一個入站事件被觸發,它將被從ChannelPipeline的頭部開始一直被傳播到ChannelPipeline的尾端。如圖所示,一個出站I/O事件將從ChannelPipeline的最右邊開始,而後向左傳播。
在ChannelPipeline傳播事件時,它會測試ChannelPipeline中的下一個ChannelHandler的類型是否和事件的運動方向相匹配。若是不匹配,ChannelPipeline將跳過該ChannelHandler並前進到下一個,直到它找到和該事件所指望的方向相匹配的爲止。
八、修改ChannelPipeline
經過調用ChannelPipeline上的相關方法,ChannelHandler能夠添加、刪除或者替換其餘的ChannelHandler,從而實時地修改ChannelPipeline的佈局。
ChannelPipeline pipeline = ...; FirstHandler firstHandler = new FirstHandler(); //將該實例做爲「handler1」添加到ChannelPipeline中 pipeline.addLast("handler1",firstHandler); //將一個SecondHandler的實例做爲「handler2」添加到ChannelPipeline的第一個槽中,這意味着它將被放置在已有的「handler1」以前 pipeline.addLast("handler2",new SecondHandler()); //將一個ThirdHandler的實例做爲「handler3」添加到ChannelPipeline的最後一個槽中 pipeline.addLast("handler3",new ThirdHandler()); ... //經過名稱移除「handler3」 pipeline.remove("handler3"); //經過引用移除FirstHandler pipeline.remove(firstHandler); //將SecondHandler(「handler2」)替換爲FourthHandler:"handler4" pipeline.replace("handler2","handler4",new ForthHandler());
ChannelHandler的執行和阻塞:一般ChannelPipeline中的每個ChannelHandler都是經過它的EventLoop(I/O線程)來處理傳遞給它的事件的。因此相當重要的是不要阻塞這個線程,由於這會對總體的I/O處理產生負面的影響。但有時可能須要與那些使用阻塞API的遺留代碼進行交互,對於這個狀況,ChannelPipeline有一些接受一個EventExecutorGroup的add()方法,若是一個事件被傳遞給一個自定義的EventExecutorGroup,它將被包含在這個EventExecutorGroup中的某個EventExecutor所處理,從而被從該Channel自己的EventLoop中移除,對於這種用例,Netty提供了一種叫DefaultEventExecutorGroup的默認實現。
——ChannelPipeline保存了與Channel相關聯的ChannelHandler
——ChannelPipeline能夠根據須要、經過添加或者刪除ChannelHandler來動態修改
——ChannelPipeline有着豐富的API用以被調用、以響應入站和出站事件
——ChannelHandlerContext和ChannelHandler之間的關聯(綁定)是永遠不會改變的,因此緩存對它的引用是安全的
九、使用ChannelHandlerContext
如下代碼,將經過ChannelHandlerContext獲取到Channel的引用,調用Channel上的write()方法將會致使寫入事件從尾端到頭部地流經ChannelPipeline。
如下代碼展現了一個相似的例子,可是這一次是寫入ChannelPipeline。咱們再次看到,(到ChannelPipeline的)引用是經過ChannelHandlerContext獲取的。
ChannelHandlerContext ctx = ..; //獲取到與ChannelHandlerContext相關聯的Channel的引用 Channel channel = ctx.channel(); //經過Channel寫入緩衝區
channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
爲何會想要從ChannelPipeline中的某個特定點開始傳播事件呢?
——爲了減小將事件傳經對它不感興趣的ChannelHandler所帶來的開銷
——爲了不將事件傳經那些可能會對它感興趣的ChannelHandler。
十、ChannelHandler和ChannelHandlerContext的高級用法
能夠經過將ChannelHandler添加到ChannelPipeline中來實現動態的協議切換,緩存到ChannelHandlerContext的引用以供稍後使用,這可能會發生在任何的ChannelHandler方法以外,甚至來自於不一樣的線程。
如下代碼,緩存到ChannelHandlerContext的引用
public class WriteHandler extends ChannelHandlerAdapter{ private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //存儲到ChannelHandlerContext的引用以供稍後使用 this.ctx = ctx; } public void send(String msg){ //使用以前存儲的到ChannelHandlerContext的引用來發送消息 ctx.writeAndFlush(msg); } }
由於一個ChannelHandler能夠從屬於多個ChannelPipeline,因此它也能夠綁定到多個ChannelHandlerContext實例,對於這種用法(指在多個ChannelPipeline中共享同一個ChannelHandler),對應的ChannelHandler必需要使用@Sharable註解標註;不然,試圖將它添加到多個ChannelPipeline時將會觸發異常,顯而易見,爲了安全地被用於多個併發的Channel(鏈接),這樣的ChannelHandler必須是線程安全的。
如下代碼,展現這種模式。
@ChannelHandler.Sharable//使用註解@Sharable標註public class SharableHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Channel read message: " + msg); //記錄方法調用,並轉發給下一個ChannelHandler ctx.fireChannelRead(msg); } }
前面的ChannelHandler實現了符合全部的將其加入到多個ChannelPipeline的需求,即它使用了註解@Sharable標註,而且也不持有任何的狀態。
如下代碼,演示@Sharable的錯誤用法
@ChannelHandler.Sharablepublic class UnSharableHandler extends ChannelInboundHandlerAdapter{ private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //將count字段值加1 count++; System.out.println("channelRead(...) called the " + count + " time"); //記錄方法調用,並轉發給下一個ChannelHandler ctx.fireChannelRead(msg); } }
這段代碼的問題在於它擁有狀態,即用於跟蹤方法調用次數的實例變量count。將這個類的一個實例添加到ChannelPipeline將極有可能在它被多個併發Channel訪問時致使問題。(能夠將ChannelRead()方法變爲同步方法)
總之,只應該在肯定了你的ChannelHandler是線程安全的時才使用@Sharable註解。
爲什麼要共享同一個ChannelHandler:在多個ChannelPipeline中安裝同一個ChannelHandler的一個常見的緣由是用於收集跨越多個Channel的統計信息。
十一、處理入站異常
異常處理是任何真實應用程序的重要組成部分,它也能夠經過多種方式來實現,所以,Netty提供了幾種方式用於處理入站或者出站處理過程當中所拋出的異常。
若是在處理入站事件的過程當中有異常被拋出,那麼它將從它在ChannelInboundHandler裏被觸發的那一點開始流經ChannelPipeline。要想處理這種類型的入站異常,你須要在你的ChannelInboundHandler實現exceptionCaught方法。
如下代碼,展現了其關閉Channel並打印了異常的棧跟蹤信息
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter{ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
由於異常將會繼續按照入站方向流動(就像全部入站事件同樣),因此實現了前面所示邏輯的ChannelInboundHandler一般位於ChannelPipeline的最後,這確保了全部的入站異常都老是會被處理,不管他們可能會發生在ChannelPipeline中的什麼位置。
你應該如何響應異常,可能很大程序上取決於你的應用程序,你可能想要關閉Channel(和鏈接),也可能會嘗試進行恢復。若是你不實現任何處理入站異常的邏輯,那麼Netty將會記錄該異常沒有被處理的事實。
——ChannelHandler.exceptionCaught()的默認實現是簡單地將當前異常轉發給ChannelPipeline中的下一個ChannelHandler
——若是異常到達了ChannelPipeline的尾端,它將會被記錄爲未處理
——要想定義自定義的處理邏輯,你須要重寫exceptionCaught方法,而後你須要決定是否須要將該異常傳播出去
十二、處理出站異常
——每一個出站操做都將返回一個ChannelFuture。註冊到ChannelFuture的ChannelFutureListener將在操做完成時被通知該操做是成功了仍是出錯了
——幾乎全部的ChannelOutboundHandler上的方法都會傳入一個ChannelPromise的實例,做爲ChannelFuture的子類,ChannelPromise也能夠被分配用於異步通知的監聽器,可是,ChannelPromise還具備提供當即通知的可寫方法。
如下代碼,添加channelFutureListener,它將打印棧跟蹤信息,而且隨後關閉Channel
ChannelFuture future = channel.wirte(someMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()){ channelFuture.cause().printStackTrace(); channelFuture.channel().close(); } } });
第二種方式是將ChannelFutrueListener添加到即將做爲參數傳遞給ChannelOutboundHandler的方法的ChannelPromise。
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter{ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (!f.isSuccess()){ f.cause().printStackTrace(); f.channel().close(); } } }); } }
ChannelPromise的可寫方法:經過調用ChannelPromise上的setSuccess()和setFailure()方法,可使一個操做的狀態在ChannelHandler的方法返回給其調用者時便即刻被感知到。