Pipeline和ChannelHandler是Netty處理流程的重要組成部分,ChannelHandler對應一個個業務處理器,Pipeline則是負責將各個ChannelHandler串起來的「容器」,兩者結合起來一塊兒完成Netty的處理流程。java
每一個channel內部都會持有一個ChannelPipeline對象pipeline,pipeline默認實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext鏈表。socket
channel的讀寫操做都會走到DefaultChannelPipeline中,當channel完成register、active、read、readComplete等操做時,會觸發pipeline的相應方法。ide
DefaultChannelPipeline
是Netty默認pipeline實現,對應代碼以下:this
public class DefaultChannelPipeline implements ChannelPipeline { // head和tail是handler的處理鏈/上下文 final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel; protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } }
TailContext實現了ChannelOutboundHandler接口,HeadContext實現了ChannelInboundHandler和ChannelOutboundHandler接口,head和tail構成了一個鏈表。3d
對於Inbound操做,從head開始進行處理,向後遍歷;對於OutBound操做,從tail開始處理,向前遍歷。那麼哪些操做是Inbound哪些是OutBound操做呢?netty
注意,HeadContext實現了ChannelInboundHandler
和ChannelOutboundHandler
接口,對於OutBound操做,最後也是會走到HeadContext來處理的,其實TailContext只是一個淺封裝,實際邏輯並很少。HeadContext 中包含了一個netty底層的socket操做類,對於bind/connect/disconnect/close/deregister/beginRead/read/wirte/flush
操做都是由unsafe對象來完成的。code
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { // netty的底層socket操做類 private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } // ... }
channelPipeline的channelHandlerContext鏈表是「責任鏈」模式的體現,一個請求的處理可能會涉及到多個channelHandler,好比decodeHandler、自定義的業務channelHandler和encodeHandler。業務channelHandler示例以下:對象
public class EchoHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println(in.toString(CharsetUtil.UTF_8)); ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
在channelReadComplete方法中調用flush,其實會走到head.flush方法,最後調用unsafe.flush將數據發送出去。netty pipeline就是責任鏈(或者說是流水線)模式的體現,經過pipeline機制,使netty處理數據機制具備強大的擴展性和靈活性。blog
netty的channelHandler是channel處理器,基於netty的業務處理,無論多麼複雜,都是由channelHandler來作的,可能涉及到多個channelHandler,channelHandler分爲多種類型:encoder、decoder、業務處理等。接口
decoderHandler大都是接收到數據以後進行轉換或者處理的,基本都是ByteToMessageDecoder的子類,其類圖以下:
ByteToMessageDecoder中會有一個數據暫存緩衝區,若是接收到數據不完整,能夠先暫存下等到下次接收到數據時再處理。
encoderHandler大都是將message轉換成bytebuf數據,基本都是MessageToByteEncoder的子類,其類圖以下:
業務處理channelHanler就是用戶自定義的業務邏輯了,通常是在最後才addLast到channel.pipeline的,好比http處理邏輯以下:
ServerBootstrap boot = new ServerBootstrap(); boot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8080) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("decoder", new HttpRequestDecoder()) .addLast("encoder", new HttpResponseEncoder()) .addLast("aggregator", new HttpObjectAggregator(512 * 1024)) .addLast("handler", new HttpHandler()); } });
DefaultChannelPipeline中的headContext(實現了ChannelOutboundHandler和ChannelInboundHandler)、tailContext(實現了ChannelOutboundHandler)和自定義的channelHandler(decoderHandler、ecoderHandler、channelHandler等,通常實現ChannelInboundHandler),經過ChannelHandlerContext的連接,組成了一個請求處理鏈。
注意,ChannelOutboundHandler和ChannelInboundHandler的順序如何添加的,其實只要記住一條:ChannelOutboundHandler之間要保證順序,ChannelInboundHandler之間要保證順序,兩者之間無需保證順序。
channelHandler的運行流程圖:
TailContesxt
自己代碼很少而且挺多方法都是"空實現",不過它的channelRead方法內部會執行ReferenceCountUtil.release(msg)
釋放msg佔用的內存空間,也就是說在未定義用戶ChannelHandler
或者用戶ChannelHandler的channelRead繼續傳遞後續ChannelHandler的channelRead
時,到TailContext的channelRead時會自動釋放msg所佔用內存。
推薦閱讀
歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。