ChannelPipeline 和 ChannelHandler 是 Netty 重要的組件之一,經過這篇文章,重點了解這些組件是如何驅動數據流動和處理的。html
在上一篇的總體架構圖裏能夠看到,ChannelHandler 負責處理入站和出站的數據。對於入站和出站,ChannelHandler 由不一樣類型的 Handler 進行處理。下面經過一個示例來演示,將上一篇文章裏的 Demo 作一些修改:
增長如下類:java
// OneChannelInBoundHandler.java package com.niklai.demo.handler.inbound; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class OneChannelInBoundHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(OneChannelInBoundHandler.class.getSimpleName()); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("channel active....."); ctx.fireChannelActive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8)); ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.write(Unpooled.copiedBuffer("OneChannelInBoundHandler answer...", CharsetUtil.UTF_8)); ctx.fireChannelReadComplete(); } }
// TwoChannelInBoundHandler.java package com.niklai.demo.handler.inbound; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TwoChannelInBoundHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(TwoChannelInBoundHandler.class.getSimpleName()); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("channel active....."); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.write(Unpooled.copiedBuffer("TwoChannelInBoundHandler answer...", CharsetUtil.UTF_8)); ctx.close().addListener(ChannelFutureListener.CLOSE); } }
// OneChannelOutBoundHandler.java package com.niklai.demo.handler.outbound; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class OneChannelOutBoundHandler extends ChannelOutboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(OneChannelOutBoundHandler.class.getSimpleName()); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = (ByteBuf) msg; logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(msg, promise); } }
修改 Server.java 類初始化的 childHandler 邏輯:promise
// Server.java // 省略部分代碼 public static void init() { try { ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); serverBootstrap.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress("localhost", 9999)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加ChannelHandler socketChannel.pipeline().addLast(new OneChannelOutBoundHandler()); socketChannel.pipeline().addLast(new OneChannelInBoundHandler()); socketChannel.pipeline().addLast(new TwoChannelInBoundHandler()); } }); ChannelFuture future = serverBootstrap.bind().sync(); future.channel().closeFuture().sync(); group.shutdownGracefully().sync(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // 省略部分代碼
在上面的例子裏,咱們聲明瞭 OneChannelInBoundHandler 和 TwoChannelInBoundHandler 兩個類繼承 ChannelInBoundHandlerAdapter 處理入站數據,一個 OneChannelOutBoundHandler 類繼承 ChannelOutBoundHandlerAdapter 處理出站數據,依次添加到 ChannelPipeline 裏。兩個 ChannelInBoundHandler 類都重寫了 channelActive、channelRead 和 channelReadComplete 方法,OneChannelOutBoundHandler 類重寫了 write 方法。架構
運行單元測試,控制檯獲得以下結果:
經過日誌輸出結果,咱們能夠看到 Client 發送消息後,OneChannelInBoundHandler 的 channelRead 方法被觸發先得到消息內容,調用 ctx.fireChannelRead(msg)方法後 TwoChannelInBoundHandler 的 channelRead 方法被觸發再次得到到消息內容,此時消息已經到達隊尾。在 channelReadComplete 方法裏調用 ctx.write(obj)方法依次寫入應答消息後,消息將會反向出站,OneChannelOutBoundHandler 的 write 被觸發得到應答消息內容,在這個方法裏調用 ctx.writeAndFlush(msg, promise)將應答消息繼續發送出去。框架
注意兩個 ChannelInBoundHandler 獲取消息是有前後順序的,順序取決於添加到 ChannelPipeline 的前後,而且只有當前 ChannelInBoundHandler 的 channelRead 方法裏調用了 ctx.fireChannelRead(msg)方法後,消息才能被傳遞到後面的 ChannelInBoundHandler 的 channelRead 方法,channelReadComplete 方法同理。而在出站時,ChannelOutBoundHandler 的 write 方法會獲取到將要寫出的消息,能夠選擇是否對消息進行再次處理後發送出去。異步
ChannelHandler 相關的類關係圖以下,ChannelInBoundHandlerAdapter 和 ChannelOutBoundHandlerAdapter 分別實現了 ChannelInBoundHandler 和 ChannelOutBoundHandler。接口通常經過繼承 ChannelInBoundHandlerAdapter 和 ChannelOutBoundHandlerAdapter 來實現業務數據處理:
如下兩個接口部分事件方法,更多方法能夠查閱官方文檔socket
方法 | 描述 |
---|---|
channelActive | Channel 已經鏈接就緒時被調用 |
channelRead | 當從 Channel 讀取數據時被調用 |
channelReadComplete | 當 Channel 的讀取操做完成時被調用 |
exceptionCaught | 當入站事件處理過程當中出現異常時被調用 |
方法 | 描述 |
---|---|
write | 當經過 Channel 寫數據時被調用 |
read | 當從 Channel 讀取數據時被調用 |
從上面的例子能夠看到,加入到 ChannelPipeline 的一系列 ChannelHandler 組成了一個有序的鏈。每個新建立的 Channel 都將被分配一個 ChannelPipeline,Channel 不能本身附加另一個 ChannelPipeline,也不能取消當前的,這個是由框架決定的,不須要開發人員干預。
從上圖能夠看到,事件消息會從頭部傳遞到尾部,而後再從尾部傳遞到頭部。在傳遞過程當中,將會識別 ChannelHandler 的類型,入站事件由 InBoundHandler 處理,出站事件由 OutBoundHandler 處理,若是傳遞到下一個 ChannelHandler 時發現類型與當前方向不匹配,將會直接跳過並前進到下一個。若是某個 ChannelHandler 同時實現了 ChannelInBoundHandler 和 ChannelOutBundHandler 接口,那麼當前 ChannelHandler 將會同時處理入站和出站事件。
如下是 ChannelPipeline 的一些主要方法:ide
方法 | 說明 |
---|---|
addFirst addLast |
添加 ChannelHander 到當前 ChannelPipeline 的頭/尾部 |
addBefore addAfter |
添加 ChannelHander 到當前 ChannelPipeline 裏某個 ChannelHandler 的前/後面 |
remove | 將某個 ChannelHandler 從當前 ChannelPipeline 裏移除 |
replace | 將當前 ChannelPipeline 裏的某個 ChannelHandler 替換成另一個 ChannelHandler |
除此以外,ChannelPipeline 也有一些觸發事件的方法,如下列出跟當前演示例子相關的事件方法,更多方法能夠查閱官方文檔oop
方法 | 描述 |
---|---|
fireChannelActive | 調用 ChannelPipeline 裏下一個 ChannelInBoundHandler 的 channelActive 方法 |
fireChannelRead | 調用 ChannelPipeline 裏下一個 ChannelInBoundHandler 的 ChannelRead 方法 |
write | 調用 ChannelPipeline 裏下一個 ChannelOutBoundHandler 的 write 方法 |
咱們修改一下 Demo 中的例子:單元測試
// OneChannelInBoundHandler.java // 省略代碼 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8)); ctx.pipeline().fireChannelRead(msg); // 調用ChannelPipeline的fireChannelRead方法 } // 省略代碼
運行單元測試查看控制檯日誌,發現事件會反覆觸發 OneChannelInBoundHandler 的 channelRead 方法,直到死循環。對比以前的運行結果能夠看到,ChannelPipeline 的 fireChannelRead 方法會將事件從新從頭部開始向後傳遞,而 ctx.fireChannelRead 方法會將事件從當前的下一個 ChannelHandler 開始向後傳遞。
ChannelHandlerContext 是一個接口,它維護了 ChannelHandler 和 ChannelPipeline 二者之間的關係。當一個 ChannelHandler 加入到 ChannelPipeline 裏時,就會建立一個 ChannelHandlerContext 關聯它們。下圖展現了它們之間的關係,當調用 ChannelHandlerContext 的 fire...方法時,事件都將會被傳遞到它關聯的 ChannelHandler 的下一個 ChannelHandler 上
ChannelHandlerContext 部分的 API 以下,更多 API 能夠查閱官方文檔
方法 | 描述 |
---|---|
pipeline | 獲取關聯的 ChannelPipeline |
handler | 獲取關聯的 ChannelHandler |
fireChannelRead | 觸發下一個 ChannelInBoundHandler 的 channelRead 方法 |
若是在處理入站事件過程當中發生了異常,則該異常將會從它所在的 ChannelInBoundHandler 開始傳遞直到 ChannelPipeline 尾部。經過重寫 exceptionCaught 方法,能夠處理異常。
修改一下 Demo,增長 exceptionCaught
// OneChannelInBoundHandler.java // 省略部分代碼 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.write(Unpooled.copiedBuffer("OneChannelInBoundHandler answer...", CharsetUtil.UTF_8)); ctx.fireChannelReadComplete(); throw new Exception("This is an exception!"); // 模擬拋出一個異常 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("OneChannelInBoundHandler exception:{}....", cause.getMessage(), cause); } // 省略部分代碼
運行測試,能夠看到異常信息已經打印到控制檯日誌:
再次修改 Demo,調用 ChannelHandlerContext 的 fireExceptionCaught 方法將異常繼續傳遞下去
// OneChannelInBoundHandler.java // 省略部分代碼 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("OneChannelInBoundHandler exception:{}....", cause.getMessage(), cause); ctx.fireExceptionCaught(cause); // 將異常傳遞下去 } // 省略部分代碼
// TwoChannelInBoundHandler.java // 省略部分代碼 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("TwoChannelInBoundHandler exception:{}....", cause.getMessage(), cause); } // 省略部分代碼
運行測試,查看控制檯日誌,兩個 ChannelInBoundHandler 都會打印異常日誌:
若是,兩個 ChannelInBoundHandler 都不重寫 exceptionCaught 方法處理異常,會怎樣?修改 Demo,刪除 exceptionCaught 後再次運行測試,查看控制檯日誌:
控制檯輸出一條日誌信息:An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
若是異常發生可是沒有被處理,異常將會一直傳遞到 ChannelPipeline 並記錄爲未處理異常,以 WARN 級別日誌輸出。
出站操做的相關方法是異步的,處理異常信息都是基於通知機制。處理方式有兩種:
第一種是經過在方法返回值上註冊 listener:
// OneChannelOutBoundHandler.java // 省略部分代碼 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = (ByteBuf) msg; logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8)); ctx.close(); // 在發送消息以前關閉channel,後序寫入數據將會引起異常。 ChannelFuture channelFuture = ctx.writeAndFlush(msg); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (!f.isSuccess()) { logger.error("OneChannelOutBoundHandler cause:{}.......", f.cause().getMessage(), f.cause()); } } }); } // 省略部分代碼
第二種是在傳入的參數 promise 上註冊 listener:
// OneChannelOutBoundHandler.java // 省略部分代碼 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = (ByteBuf) msg; logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8)); ctx.close(); // 在發送消息以前關閉channel,後序寫入數據將會引起異常。 promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (!f.isSuccess()) { logger.error("OneChannelOutBoundHandler cause:{}.......", f.cause().getMessage(), f.cause()); } } }); ctx.writeAndFlush(msg, promise); } // 省略部分代碼