netty中Pipeline的ChannelHandler執行順序案例詳解

1、netty的Pipeline模型bootstrap

netty的Pipeline模型用的是責任鏈設計模式,當boss線程監控到綁定端口上有accept事件,此時會爲該socket鏈接實例化Pipeline,並將InboundHandler和OutboundHandler按序加載到Pipeline中,而後將該socket鏈接(也就是Channel對象)掛載到selector上。一個selector對應一個線程,該線程會輪詢全部掛載在他身上的socket鏈接有沒有read或write事件,而後經過線程池去執行Pipeline的業務流。selector如何查詢哪些socket鏈接有read或write事件,主要取決於調用操做系統的哪一種IO多路複用內核,若是是select(注意,此處的select是指操做系統內核的select IO多路複用,不是netty的seletor對象),那麼將會遍歷全部socket鏈接,依次詢問是否有read或write事件,最終操做系統內核將全部IO事件的socket鏈接返回給netty進程,當有不少socket鏈接時,這種方式將會大大下降性能,由於存在大量socket鏈接的遍歷和內核內存的拷貝。若是是epoll,性能將會大幅提高,由於他基於完成端口事件,已經維護好有IO事件的socket鏈接列表,selector直接取走,無需遍歷,也少掉內核內存拷貝帶來的性能損耗。設計模式

Pipeline的責任鏈是經過ChannelHandlerContext對象串聯的,ChannelHandlerContext對象裏封裝了ChannelHandler對象,經過prev和next節點實現雙向鏈表。Pipeline的首尾節點分別是head和tail,當selector輪詢到socket有read事件時,將會觸發Pipeline責任鏈,從head開始調起第一個InboundHandler的ChannelRead事件,接着經過fire方法依次觸發Pipeline上的下一個ChannelHandler,以下圖:promise

ChannelHandler分爲InbounHandler和OutboundHandler,InboundHandler用來處理接收消息,OutboundHandler用來處理髮送消息。head的ChannelHandler既是InboundHandler又是OutboundHandler,不管是read仍是write都會通過head,因此head封裝了unsafe方法,用來操做socket的read和write。tail的ChannelHandler只是InboundHandler,read的Pipleline處理將會最終到達tail。socket

2、經過六組實驗驗證InboundHandler和OutboundHandler的執行順序ide

在作實驗以前,先把實驗代碼貼出來。oop

EchoServer類:性能

 1 package com.wisdlab.nettylab;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 
12 /**
13  * @ClassName EchoServer
14  * @Description TODO
15  * @Author felix
16  * @Date 2019/9/26 10:37
17  * @Version 1.0
18  **/
19 public class EchoServer {
20     private int port;
21 
22     public EchoServer(int port) {
23         this.port = port;
24     }
25 
26     private void run() {
27         EventLoopGroup bossGroup = new NioEventLoopGroup();
28         EventLoopGroup workGroup = new NioEventLoopGroup();
29 
30         try {
31             ServerBootstrap serverBootstrap = new ServerBootstrap();
32             serverBootstrap.group(bossGroup, workGroup)
33                     .channel(NioServerSocketChannel.class)
34                     .childHandler(new ChannelInitializer<SocketChannel>() {
35                         @Override
36                         protected void initChannel(SocketChannel socketChannel) throws Exception {
37                             //outboundhandler必定要放在最後一個inboundhandler以前
38                             //不然outboundhandler將不會執行到
39                             socketChannel.pipeline().addLast(new EchoOutboundHandler3());
40                             socketChannel.pipeline().addLast(new EchoOutboundHandler2());
41                             socketChannel.pipeline().addLast(new EchoOutboundHandler1());
42 
43                             socketChannel.pipeline().addLast(new EchoInboundHandler1());
44                             socketChannel.pipeline().addLast(new EchoInboundHandler2());
45                             socketChannel.pipeline().addLast(new EchoInboundHandler3());
46                         }
47                     })
48                     .option(ChannelOption.SO_BACKLOG, 10000)
49                     .childOption(ChannelOption.SO_KEEPALIVE, true);
50             System.out.println("EchoServer正在啓動.");
51 
52             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
53             System.out.println("EchoServer綁定端口" + port);
54 
55             channelFuture.channel().closeFuture().sync();
56             System.out.println("EchoServer已關閉.");
57         } catch (Exception e) {
58             e.printStackTrace();
59         } finally {
60             bossGroup.shutdownGracefully();
61             workGroup.shutdownGracefully();
62         }
63     }
64 
65     public static void main(String[] args) {
66         int port = 8080;
67         if (args != null && args.length > 0) {
68             try {
69                 port = Integer.parseInt(args[0]);
70             } catch (Exception e) {
71                 e.printStackTrace();
72             }
73         }
74 
75         EchoServer server = new EchoServer(port);
76         server.run();
77     }
78 }
View Code

EchoInboundHandler1類:測試

 1 package com.wisdlab.nettylab;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerContext;
 6 import io.netty.channel.ChannelInboundHandlerAdapter;
 7 import io.netty.util.CharsetUtil;
 8 
 9 /**
10  * @ClassName EchoInboundHandler1
11  * @Description TODO
12  * @Author felix
13  * @Date 2019/9/26 11:15
14  * @Version 1.0
15  **/
16 public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {
17     @Override
18     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
19         System.out.println("進入 EchoInboundHandler1.channelRead");
20 
21         String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
22         System.out.println("EchoInboundHandler1.channelRead 收到數據:" + data);
23         ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8));
24 
25         System.out.println("退出 EchoInboundHandler1 channelRead");
26     }
27 
28     @Override
29     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
30         System.out.println("[EchoInboundHandler1.channelReadComplete]");
31     }
32 
33     @Override
34     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
35         System.out.println("[EchoInboundHandler1.exceptionCaught]" + cause.toString());
36     }
37 }
View Code

EchoInboundHandler2類:this

 1 package com.wisdlab.nettylab;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerContext;
 6 import io.netty.channel.ChannelInboundHandlerAdapter;
 7 import io.netty.util.CharsetUtil;
 8 
 9 /**
10  * @ClassName EchoInboundHandler2
11  * @Description TODO
12  * @Author felix
13  * @Date 2019/9/27 15:35
14  * @Version 1.0
15  **/
16 public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {
17     @Override
18     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
19         System.out.println("進入 EchoInboundHandler2.channelRead");
20 
21         String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
22         System.out.println("EchoInboundHandler2.channelRead 接收到數據:" + data);
23         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
24         ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8));
25         ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
26 
27         System.out.println("退出 EchoInboundHandler2 channelRead");
28     }
29 
30     @Override
31     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
32         System.out.println("[EchoInboundHandler2.channelReadComplete]讀取數據完成");
33     }
34 
35     @Override
36     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
37         System.out.println("[EchoInboundHandler2.exceptionCaught]");
38     }
39 }
View Code

EchoInboundHandler3類:spa

 1 package com.wisdlab.nettylab;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerContext;
 6 import io.netty.channel.ChannelInboundHandlerAdapter;
 7 import io.netty.util.CharsetUtil;
 8 
 9 /**
10  * @ClassName EchoInboundHandler3
11  * @Description TODO
12  * @Author felix
13  * @Date 2019/10/23 13:43
14  * @Version 1.0
15  **/
16 public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter {
17     @Override
18     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
19         System.out.println("進入 EchoInboundHandler3.channelRead");
20 
21         String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
22         System.out.println("EchoInboundHandler3.channelRead 接收到數據:" + data);
23         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] " + data, CharsetUtil.UTF_8));
24         ctx.fireChannelRead(msg);
25 
26         System.out.println("退出 EchoInboundHandler3 channelRead");
27     }
28 
29     @Override
30     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
31         System.out.println("[EchoInboundHandler3.channelReadComplete]讀取數據完成");
32     }
33 
34     @Override
35     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
36         System.out.println("[EchoInboundHandler3.exceptionCaught]");
37     }
38 
39 
40 }
View Code

EchoOutboundHandler1類:

 1 package com.wisdlab.nettylab;
 2 
 3 import io.netty.buffer.Unpooled;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.channel.ChannelOutboundHandlerAdapter;
 6 import io.netty.channel.ChannelPromise;
 7 import io.netty.util.CharsetUtil;
 8 
 9 /**
10  * @ClassName EchoOutboundHandler1
11  * @Description TODO
12  * @Author felix
13  * @Date 2019/9/27 15:36
14  * @Version 1.0
15  **/
16 public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {
17     @Override
18     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
19         System.out.println("進入 EchoOutboundHandler1.write");
20 
21         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8));
22         ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler裏測試一下channel().writeAndFlush", CharsetUtil.UTF_8));
23         ctx.write(msg);
24 
25         System.out.println("退出 EchoOutboundHandler1.write");
26     }
27 }
View Code

EchoOutboundHandler2類:

 1 package com.wisdlab.nettylab;
 2 
 3 import io.netty.buffer.Unpooled;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.channel.ChannelOutboundHandlerAdapter;
 6 import io.netty.channel.ChannelPromise;
 7 import io.netty.util.CharsetUtil;
 8 
 9 /**
10  * @ClassName EchoOutboundHandler2
11  * @Description TODO
12  * @Author felix
13  * @Date 2019/9/27 15:36
14  * @Version 1.0
15  **/
16 public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {
17 
18     @Override
19     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
20         System.out.println("進入 EchoOutboundHandler2.write");
21 
22         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8));
23         ctx.write(msg);
24 
25         System.out.println("退出 EchoOutboundHandler2.write");
26     }
27 }
View Code

EchoOutboundHandler3類:

 1 package com.wisdlab.nettylab;
 2 
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelOutboundHandlerAdapter;
 5 import io.netty.channel.ChannelPromise;
 6 
 7 /**
 8  * @ClassName EchoOutboundHandler3
 9  * @Description TODO
10  * @Author felix
11  * @Date 2019/10/23 23:23
12  * @Version 1.0
13  **/
14 public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter {
15     @Override
16     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
17         System.out.println("進入 EchoOutboundHandler3.write");
18 
19         ctx.write(msg);
20 
21         System.out.println("退出 EchoOutboundHandler3.write");
22     }
23 
24 }
View Code

實驗一:在InboundHandler中不觸發fire方法,後續的InboundHandler還能順序執行嗎?

如上圖所示,InboundHandler2沒有調用fire方法:

1     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
2         System.out.println("進入 EchoInboundHandler1.channelRead");
3 
4         String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
5         System.out.println("EchoInboundHandler1.channelRead 收到數據:" + data);
6         //ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8));
7 
8         System.out.println("退出 EchoInboundHandler1 channelRead");
9     }

那麼InboundHandler中的代碼還會被執行到嗎?看一下執行結果:

由上圖可知,InboundHandler2沒有調用fire事件,InboundHandler3沒有被執行。

結論:InboundHandler是經過fire事件決定是否要執行下一個InboundHandler,若是哪一個InboundHandler沒有調用fire事件,那麼日後的Pipeline就斷掉了。

實驗二:InboundHandler和OutboundHandler的執行順序是什麼?

加入Pipeline的ChannelHandler的順序如上圖所示,那麼最後執行的順序如何呢?執行結果以下:

由上圖可知,執行順序爲:

InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3

因此,咱們獲得如下幾個結論:

一、InboundHandler是按照Pipleline的加載順序,順序執行。

二、OutboundHandler是按照Pipeline的加載順序,逆序執行。

實驗三:若是把OutboundHandler放在InboundHandler的後面,OutboundHandler會執行嗎?

執行結果以下:

因而可知,OutboundHandler沒有執行,爲何呢?由於Pipleline是執行完全部有效的InboundHandler,再返回執行在最後一個InboundHandler以前的OutboundHandler。注意,有效的InboundHandler是指fire事件觸達到的InboundHandler,若是某個InboundHandler沒有調用fire事件,後面的InboundHandler都是無效的InboundHandler。爲了印證這一點,咱們繼續作一個實驗,咱們把其中一個OutboundHandler放在最後一個有效的InboundHandler以前,看看這惟一的一個OutboundHandler是否會執行,其餘OutboundHandler是否不會執行。

執行結果以下:

因而可知,只執行了OutboundHandler1,其餘OutboundHandler沒有被執行。

因此,咱們獲得如下幾個結論:

一、有效的InboundHandler是指經過fire事件能觸達到的最後一個InboundHander。

二、若是想讓全部的OutboundHandler都能被執行到,那麼必須把OutboundHandler放在最後一個有效的InboundHandler以前。

三、推薦的作法是經過addFirst加載全部OutboundHandler,再經過addLast加載全部InboundHandler。

實驗四:若是其中一個OutboundHandler沒有執行write方法,那麼消息會不會發送出去?

咱們把OutboundHandler2的write方法注掉

1     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
2         System.out.println("進入 EchoOutboundHandler3.write");
3 
4         //ctx.write(msg);
5 
6         System.out.println("退出 EchoOutboundHandler3.write");
7     }

執行結果以下:

能夠看到,OutboundHandler3並無被執行到,另外,客戶端也沒有收到發送的消息。

因此,咱們獲得如下幾個結論:

一、OutboundHandler是經過write方法實現Pipeline的串聯的。

二、若是OutboundHandler在Pipeline的處理鏈上,其中一個OutboundHandler沒有調用write方法,最終消息將不會發送出去。

實驗五:ctx.writeAndFlush 的OutboundHandler的執行順序是什麼?

咱們設定ChannelHandler在Pipeline中的加載順序以下:

OutboundHandler3 => InboundHandler1 => OutboundHandler2 => InboundHandler2 => OutboundHandler1 => InboundHandler3

在InboundHander2中調用ctx.writeAndFlush:

 1     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 2         System.out.println("進入 EchoInboundHandler2.channelRead");
 3 
 4         String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
 5         System.out.println("EchoInboundHandler2.channelRead 接收到數據:" + data);
 6         ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
 7         //ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8));
 8         ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
 9 
10         System.out.println("退出 EchoInboundHandler2 channelRead");
11     }

執行結果以下:

由上圖可知,依次執行了OutboundHandler2和OutboundHandler3,爲何會這樣呢?由於ctx.writeAndFlush是從當前的ChannelHandler開始,向前依次執行OutboundHandler的write方法,因此分別執行了OutboundHandler2和OutboundHandler3:

OutboundHandler3 => InboundHandler1 => OutboundHandler2 => InboundHandler2 => OutboundHandler1 => InboundHandler3

因此,咱們獲得以下結論:

一、ctx.writeAndFlush是從當前ChannelHandler開始,逆序向前執行OutboundHandler。

二、ctx.writeAndFlush所在ChannelHandler後面的OutboundHandler將不會被執行。

實驗六:ctx.channel().writeAndFlush 的OutboundHandler的執行順序是什麼?

仍是實驗五的代碼,不一樣之處只是把ctx.writeAndFlush修改成ctx.channel().writeAndFlush。

 1     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 2         System.out.println("進入 EchoInboundHandler2.channelRead");
 3 
 4         String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
 5         System.out.println("EchoInboundHandler2.channelRead 接收到數據:" + data);
 6         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
 7         ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8));
 8         ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
 9 
10         System.out.println("退出 EchoInboundHandler2 channelRead");
11     }

執行結果以下:

由上圖可知,全部OutboundHandler都執行了,由此咱們獲得結論:

一、ctx.channel().writeAndFlush 是從最後一個OutboundHandler開始,依次逆序向前執行其餘OutboundHandler,即便最後一個ChannelHandler是OutboundHandler,在InboundHandler以前,也會執行該OutbondHandler。

二、千萬不要在OutboundHandler的write方法裏執行ctx.channel().writeAndFlush,不然就死循環了。

 3、總結

一、InboundHandler是經過fire事件決定是否要執行下一個InboundHandler,若是哪一個InboundHandler沒有調用fire事件,那麼日後的Pipeline就斷掉了。二、InboundHandler是按照Pipleline的加載順序,順序執行。三、OutboundHandler是按照Pipeline的加載順序,逆序執行。四、有效的InboundHandler是指經過fire事件能觸達到的最後一個InboundHander。五、若是想讓全部的OutboundHandler都能被執行到,那麼必須把OutboundHandler放在最後一個有效的InboundHandler以前。六、推薦的作法是經過addFirst加載全部OutboundHandler,再經過addLast加載全部InboundHandler。七、OutboundHandler是經過write方法實現Pipeline的串聯的。八、若是OutboundHandler在Pipeline的處理鏈上,其中一個OutboundHandler沒有調用write方法,最終消息將不會發送出去。九、ctx.writeAndFlush是從當前ChannelHandler開始,逆序向前執行OutboundHandler。十、ctx.writeAndFlush所在ChannelHandler後面的OutboundHandler將不會被執行。十一、ctx.channel().writeAndFlush 是從最後一個OutboundHandler開始,依次逆序向前執行其餘OutboundHandler,即便最後一個ChannelHandler是OutboundHandler,在InboundHandler以前,也會執行該OutbondHandler。十二、千萬不要在OutboundHandler的write方法裏執行ctx.channel().writeAndFlush,不然就死循環了。

相關文章
相關標籤/搜索