第六章:ChannelHandler

本章介紹java

    ChannelPipelinepromise

    ChannelHandlerContext安全

    ChannelHandler框架

    Inbound vs outbound(入站和出站)ide

Netty提供了一個強大的處理這些事情的功能,容許用戶自定義ChannelHandler的實現來處理數據。使得ChannelHandler更強大的是能夠鏈接每一個ChannelHandler來實現任務,這有助於代碼的整潔和重用。oop

6.1 ChannelPipelinethis

ChannelPipeline是ChannelHandler實例的列表,用於處理或截獲通道的接收和發送數據。spa

ChannelPipeline提供了一種高級的截取過濾器模式,讓用戶能夠在ChannelPipeline中徹底控制一個事件及如何處理ChannelHandler與ChannelPipeline的交互。線程

對於每一個新的通道,會建立一個新的ChannelPipeline並附加至通道。一旦鏈接,Channel和ChannelPipeline之間的耦合是永久性的。Channel不能附加其餘的ChannelPipeline或從ChannelPipeline分離。netty

下圖描述了ChannelHandler在ChannelPipeline中的I/O處理,一個I/O操做能夠由一個ChannelInboundHandler或ChannelOutboundHandler進行處理,並經過調用ChannelInboundHandler處理入站IO或經過ChannelOutboundHandler處理出站IO。

ChannelPipeline是ChannelHandler的一個列表;若是一個入站I/O事件被觸發,這個事件會從第一個開始依次經過ChannelPipeline中的ChannelHandler;

如果一個入站I/O事件,則會從最後一個開始依次經過ChannelPipeline中的ChannelHandler。

ChannelHandler能夠處理事件並檢查類型,若是某個ChannelHandler不能處理則會跳過,並將事件傳遞到下一個ChannelHandler。

ChannelPipeline能夠動態添加、刪除、替換其中的ChannelHandler,這樣的機制能夠提升靈活性。

修改ChannelPipeline的方法:

    addFirst(...),添加ChannelHandler在ChannelPipeline的第一個位置

    addBefore(...),在ChannelPipeline中指定的ChannelHandler名稱以前添加ChannelHandler

    addAfter(...),在ChannelPipeline中指定的ChannelHandler名稱以後添加ChannelHandler

    addLast(ChannelHandler...),在ChannelPipeline的末尾添加ChannelHandler

    remove(...),刪除ChannelPipeline中指定的ChannelHandler

    replace(...),替換ChannelPipeline中指定的ChannelHandler

ChannelPipeline pipeline = ch.pipeline(); 
FirstHandler firstHandler = new FirstHandler(); 
pipeline.addLast("handler1", firstHandler); 
pipeline.addFirst("handler2", new SecondHandler()); 
pipeline.addLast("handler3", new ThirdHandler()); 
pipeline.remove("“handler3“"); 
pipeline.remove(firstHandler); 
pipeline.replace("handler2", "handler4", new FourthHandler());

被添加到ChannelPipeline的ChannelHandler將經過IO-Thread處理事件,這意味了必須不能有其餘的IO-Thread阻塞來影響IO的總體處理;有時候可能須要阻塞,例如JDBC。所以,Netty容許經過一個EventExecutorGroup到每個ChannelPipeline.add*方法,自定義的事件會被包含在EventExecutorGroup中的EventExecutor來處理,默認的實現是DefaultEventExecutorGroup。

6.2 ChannelHandlerContext

每一個ChannelHandler被添加到ChannelPipeline後,都會建立一個ChannelHandlerContext並與之建立的ChannelHandler關聯綁定。ChannelHandlerContext容許ChannelHandler與其餘的ChannelHandler實現進行交互,這是相同ChannelPipeline的一部分。

ChannelHandlerContext不會改變添加到其中的ChannelHandler,所以它是安全的。

6.2.1 通知下一個ChannelHandler

在相同的ChannelPipeline中經過調用ChannelInboundHandler和ChannelOutboundHandler中各個方法中的一個方法來通知最近的handler,通知開始的地方取決你如何設置。下圖顯示了ChannelHandlerContext、ChannelHandler、ChannelPipeline的關係:

若是你想有一些事件流所有經過ChannelPipeline,有兩個不一樣的方法能夠作到:

    調用Channel的方法

    調用ChannelPipeline的方法

這兩個方法均可以讓事件流所有經過ChannelPipeline。不管從頭部仍是尾部開始,由於它主要依賴於事件的性質。若是是一個「入站」事件,它開始於頭部;如果一個「出站」事件,則開始於尾部。

下面的代碼顯示了一個寫事件如何經過ChannelPipeline從尾部開始:

@Override  
protected void initChannel(SocketChannel ch) throws Exception {  
    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
        @Override  
        public void channelActive(ChannelHandlerContext ctx) throws Exception {  
            //Event via Channel  
            Channel channel = ctx.channel();  
            channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));  
            //Event via ChannelPipeline  
            ChannelPipeline pipeline = ctx.pipeline();  
            pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));  
        }  
    });  
}

下圖表示經過Channel或ChannelPipeline的通知:

可能你想從ChannelPipeline的指定位置開始,不想流經整個ChannelPipeline,以下狀況:

    爲了節省開銷,不感興趣的ChannelHandler不讓經過

    排除一些ChannelHandler

在這種狀況下,你可使用ChannelHandlerContext的ChannelHandler通知起點。它使用ChannelHandlerContext執行下一個ChannelHandler。下面代碼顯示了直接使用ChannelHandlerContext操做:

// Get reference of ChannelHandlerContext  
ChannelHandlerContext ctx = ..;  
// Write buffer via ChannelHandlerContext  
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

該消息流經ChannelPipeline到下一個ChannelHandler,在這種狀況下使用ChannelHandlerContext開始下一個ChannelHandler。下圖顯示了事件流:

如上圖顯示的,從指定的ChannelHandlerContext開始,跳過前面全部的ChannelHandler,使用ChannelHandlerContext操做是常見的模式,最經常使用的是從ChannelHanlder調用操做,也能夠在外部使用ChannelHandlerContext,由於這是線程安全的。

6.2.2 修改ChannelPipeline

調用ChannelHandlerContext的pipeline()方法能訪問ChannelPipeline,能在運行時動態的增長、刪除、替換ChannelPipeline中的ChannelHandler。能夠保持ChannelHandlerContext供之後使用,如外部Handler方法觸發一個事件,甚至從一個不一樣的線程。

下面代碼顯示了保存ChannelHandlerContext供以後使用或其餘線程使用:

public class WriteHandler extends ChannelHandlerAdapter {  
    private ChannelHandlerContext ctx;  
    @Override  
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  
        this.ctx = ctx;  
    }  
    public void send(String msg){  
        ctx.write(msg);  
    }  
}

 請注意,ChannelHandler實例若是帶有@Sharable註解則能夠被添加到多個ChannelPipeline。

也就是說單個ChannelHandler實例能夠有多個ChannelHandlerContext,所以能夠調用不一樣ChannelHandlerContext獲取同一個ChannelHandler。

若是添加不帶@Sharable註解的ChannelHandler實例到多個ChannelPipeline則會拋出異常;使用@Sharable註解後的ChannelHandler必須在不一樣的線程和不一樣的通道上安全使用。怎麼是不安全的使用?看下面代碼:

@Sharable  
public class NotSharableHandler extends ChannelInboundHandlerAdapter {  
    private int count;  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        count++;  
        System.out.println("channelRead(...) called the " + count + " time“");  
        ctx.fireChannelRead(msg);  
    }  
      
}

上面是一個帶@Sharable註解的Handler,它被多個線程使用時,裏面count是不安全的,會致使count值錯誤。

爲何要共享ChannelHandler?使用@Sharable註解共享一個ChannelHandler在一些需求中仍是有很好的做用的,如使用一個ChannelHandler來統計鏈接數或來處理一些全局數據等等。

6.3 狀態模型

Netty有一個簡單但強大的狀態模型,並完美映射到ChannelInboundHandler的各個方法。下面是Channel生命週期四個不一樣的狀態:

    channelUnregistered

    channelRegistered

    channelActive

    channelInactive

Channel的狀態在其生命週期中變化,由於狀態變化須要觸發,下圖顯示了Channel狀態變化:

還能夠看到額外的狀態變化,由於用戶容許從EventLoop中註銷Channel暫停事件執行,而後再從新註冊。在這種狀況下,你會看到多個channelRegistered和channelUnregistered狀態的變化,而永遠只有一個channelActive和channelInactive的狀態,由於一個通道在其生命週期內只能鏈接一次,以後就會被回收;從新鏈接,則是建立一個新的通道。

下圖顯示了從EventLoop中註銷Channel後再從新註冊的狀態變化:

6.4 ChannelHandler和其子類

Netty中有3個實現了ChannelHandler接口的類,其中2個是接口,一個是抽象類。以下圖

6.4.1 ChannelHandler中的方法

Netty定義了良好的類型層次結構來表示不一樣的處理程序類型,全部的類型的父類是ChannelHandler。ChannelHandler提供了在其生命週期內添加或從ChannelPipeline中刪除的方法。

    handlerAdded,ChannelHandler添加到實際上下文中準備處理事件

    handlerRemoved,將ChannelHandler從實際上下文中刪除,再也不處理事件

    exceptionCaught,處理拋出的異常

上面三個方法都須要傳遞ChannelHandlerContext參數,每一個ChannelHandler被添加到ChannelPipeline時會自動建立ChannelHandlerContext。ChannelHandlerContext容許在本地通道安全的存儲和檢索值。Netty還提供了一個實現了ChannelHandler的抽象類:ChannelHandlerAdapter。ChannelHandlerAdapter實現了父類的全部方法,基本上就是傳遞事件到ChannelPipeline中的下一個ChannelHandler直到結束。

6.4.2 ChannelInboundHandler

ChannelInboundHandler提供了一些方法再接收數據或Channel狀態改變時被調用。下面是ChannelInboundHandler的一些方法:

channelRegistered,ChannelHandlerContext的Channel被註冊到EventLoop;

channelUnregistered,ChannelHandlerContext的Channel從EventLoop中註銷

channelActive,ChannelHandlerContext的Channel已激活

channelInactive,ChannelHanderContxt的Channel結束生命週期

channelRead,從當前Channel的對端讀取消息

channelReadComplete,消息讀取完成後執行

userEventTriggered,一個用戶事件被處罰

channelWritabilityChanged,改變通道的可寫狀態,可使用Channel.isWritable()檢查

exceptionCaught,重寫父類ChannelHandler的方法,處理異常

Netty提供了一個實現了ChannelInboundHandler接口並繼承ChannelHandlerAdapter的類:ChannelInboundHandlerAdapter。ChannelInboundHandlerAdapter實現了ChannelInboundHandler的全部方法,做用就是處理消息並將消息轉發到ChannelPipeline中的下一個ChannelHandler。

ChannelInboundHandlerAdapter的channelRead方法處理完消息後不會自動釋放消息,若想自動釋放收到的消息,可使用SimpleChannelInboundHandler<I>。

看下面代碼:

ChannelInitializer用來初始化ChannelHandler,將自定義的各類ChannelHandler添加到ChannelPipeline中。/** 
 * 實現ChannelInboundHandlerAdapter的Handler,不會自動釋放接收的消息對象 
 * @author c.k 
 * 
 */  
public class DiscardHandler extends ChannelInboundHandlerAdapter {  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        //手動釋放消息  
        ReferenceCountUtil.release(msg);  
    }  
}
/** 
 * 繼承SimpleChannelInboundHandler,會自動釋放消息對象 
 * @author c.k 
 * 
 */  
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
        //不須要手動釋放  
    }  
}

若是須要其餘狀態改變的通知,能夠重寫Handler的其餘方法。一般自定義消息類型來解碼字節,能夠實現ChannelInboundHandler或ChannelInboundHandlerAdapter。有一個更好的解決方法,使用編解碼器的框架能夠很容的實現。使用ChannelInboundHandler、ChannelInboundHandlerAdapter、SimpleChannelInboundhandler這三個中的一個來處理接收消息,使用哪個取決於需求;大多數時候使用SimpleChannelInboundHandler處理消息,使用ChannelInboundHandlerAdapter處理其餘的「入站」事件或狀態改變。

ChannelInitializer用來初始化ChannelHandler,將自定義的各類ChannelHandler添加到ChannelPipeline中。

6.4.3 ChannelOutboundHandler

ChannelOutboundHandler用來處理「出站」的數據消息。ChannelOutboundHandler提供了下面一些方法:

bind,Channel綁定本地地址

connect,Channel鏈接操做

disconnect,Channel斷開鏈接

close,關閉Channel

deregister,註銷Channel

read,讀取消息,實際是截獲ChannelHandlerContext.read()

write,寫操做,實際是經過ChannelPipeline寫消息,Channel.flush()屬性到實際通道

flush,刷新消息到通道

ChannelOutboundHandler是ChannelHandler的子類,實現了ChannelHandler的全部方法。全部最重要的方法採起ChannelPromise,所以一旦請求中止從ChannelPipeline轉發參數則必須獲得通知。

Netty提供了ChannelOutboundHandler的實現:ChannelOutboundHandlerAdapter。ChannelOutboundHandlerAdapter實現了父類的全部方法,而且能夠根據須要重寫感興趣的方法。全部這些方法的實現,在默認狀況下,都是經過調用ChannelHandlerContext的方法將事件轉發到ChannelPipeline中下一個ChannelHandler。

看下面的代碼:

public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {  
    @Override  
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
        ReferenceCountUtil.release(msg);  
        promise.setSuccess();  
    }  
}

重要的是要記得釋放致遠並直通ChannelPromise,若ChannelPromise沒有被通知可能會致使其中一個ChannelFutureListener不被通知去處理一個消息。

若是消息被消費而且沒有被傳遞到ChannelPipeline中的下一個ChannelOutboundHandler,那麼就須要調用ReferenceCountUtil.release(message)來釋放消息資源。一旦消息被傳遞到實際的通道,它會自動寫入消息或在通道關閉是釋放。

相關文章
相關標籤/搜索