接Netty整理 java
如今咱們來驗證一下channel的生命週期。web
咱們將EchoServerHandler修改以下,增長所有的監聽事件,並打印事件方法名稱。算法
/** * 事件處理器 */ @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 監聽讀取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(data); } /** * 監聽讀取完畢事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 監聽異常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 將channel註冊到EventLoop的Selector多路複用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未註冊到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有鏈接,變爲活躍狀態 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 沒有鏈接,非活躍狀態 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
啓動EchoServer,打開telnet鏈接到端口,咱們能夠看到bootstrap
admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfs
sdfs
^]
telnet> quit
Connection closed.數組
整個過程爲鏈接,發送字符串sdfs,退出鏈接promise
服務端日誌爲緩存
2019-10-01 05:33:36.960 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelRegistered
2019-10-01 05:33:36.960 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelActive
2019-10-01 05:33:54.439 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : sdfs服務器
2019-10-01 05:33:54.442 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-10-01 05:34:22.527 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-10-01 05:34:22.529 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelInactive
2019-10-01 05:34:22.529 INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelUnregisteredwebsocket
整個生命週期正如前面寫到同樣網絡
Channel的生命週期爲:(1)channelRegistered->(3)channelActive->(4)channelInactive->(2)channelUnregistered
ChannelPipeline:
比如廠裏的流水線同樣,能夠在上面添加多個ChannelHanler,也可當作是一串 ChannelHandler 實例,攔截穿過 Channel 的輸入輸出 event, ChannelPipeline 實現了攔截器的一種高級形式,使得用戶能夠對事件的處理以及ChannelHanler之間交互得到徹底的控制權。
咱們來看一下ChannelPipeline的源碼
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> { /** * 在管道的首位置添加一個channelhandler */ ChannelPipeline addFirst(String name, ChannelHandler handler); /** * 同上,多了一個線程池參數 */ ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); /** * 在管道的最末端添加一個channelhandler */ ChannelPipeline addLast(String name, ChannelHandler handler); /** * 同上,多了一個線程池參數 */ ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); /** * 在一個管道中已存在的channelhandler以前插入另一個channelhandler */ ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); /** * 同上,多了一個線程池參數 */ ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); /** * 在管道已有多一個channelhandler以後插入另一個channelhandler */ ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); /** * 同上,多了一個線程池參數 */ ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); /** * 在該管道的首位置放入一組channelhandler * */ ChannelPipeline addFirst(ChannelHandler... handlers); /** * 同上,多了一個線程池參數 * */ ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); /** * 在管道的最末端放入一組channelhandler * */ ChannelPipeline addLast(ChannelHandler... handlers); /** * 同上,多了一個線程池參數 * */ ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); /** * 從管道中移除一個channelhandler */ ChannelPipeline remove(ChannelHandler handler); /** * 根據名字在管道中移除一個channelhandler */ ChannelHandler remove(String name); /** * 根據類名在管道中移除一個channelhandler */ <T extends ChannelHandler> T remove(Class<T> handlerType); /** * 移除管道中首個channelhandler */ ChannelHandler removeFirst(); /** * 移除管道中末個channelhandler */ ChannelHandler removeLast(); /** * 在管道中用新的channelhandler替換舊的channelhandler,中間參數都是替換者的名字 */ ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler); /** * 在管道中用新的channelhandler替換舊的channelhandler,中間參數都是替換者的名字 */ ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler); /** * 在管道中用新的channelhandler替換舊的channelhandler,中間參數都是替換者的名字 */ <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler); /** * 返回管道中首個channelhandler */ ChannelHandler first(); /** * 獲取第一個管道處理器上下文 */ ChannelHandlerContext firstContext(); /** * 獲取管道中最後一個channelhandler */ ChannelHandler last(); /** * 獲取管道中最後一個管道處理器上下文 */ ChannelHandlerContext lastContext(); /** * 根據名字獲取管道中的一個channelhandler */ ChannelHandler get(String name); /** * 根據類獲取一個channelhandler */ <T extends ChannelHandler> T get(Class<T> handlerType); /** * 根據channelhandler獲取一個管道處理器上下文 */ ChannelHandlerContext context(ChannelHandler handler); /** * 根據名字獲取一個管道處理器上下文 */ ChannelHandlerContext context(String name); /** * 根據一個channelhandler的類名獲取一個管道處理器上下文 */ ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType); /** * 返回一個管道 */ Channel channel(); /** * 返回管道中的channelhandler的名稱列表 */ List<String> names(); /** * Converts this pipeline into an ordered {@link Map} whose keys are * handler names and whose values are handlers. */ Map<String, ChannelHandler> toMap(); @Override ChannelPipeline fireChannelRegistered(); @Override ChannelPipeline fireChannelUnregistered(); @Override ChannelPipeline fireChannelActive(); @Override ChannelPipeline fireChannelInactive(); @Override ChannelPipeline fireExceptionCaught(Throwable cause); @Override ChannelPipeline fireUserEventTriggered(Object event); @Override ChannelPipeline fireChannelRead(Object msg); @Override ChannelPipeline fireChannelReadComplete(); @Override ChannelPipeline fireChannelWritabilityChanged(); @Override ChannelPipeline flush(); }
ChannelHandlerContext模塊的做用和分析
咱們在ChannelHandler的方法中會看到有一個參數。如
/** * 監聽讀取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(data); }
這個ChannelHandlerContext就是一個處理器的上下文。
一、ChannelHandlerContext是鏈接ChannelHandler和ChannelPipeline的橋樑
ChannelHandlerContext部分方法和Channel及ChannelPipeline重合,比如調用write方法,
Channel、ChannelPipeline、ChannelHandlerContext 均可以調用此方法,前二者都會在整個管道流裏傳播,而ChannelHandlerContext就只會在後續的Handler裏面傳播
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //第一種,會在整個管道中傳播 Channel channel = ctx.channel(); channel.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8)); //第二種,會在整個管道中傳播 ChannelPipeline pipeline = ctx.pipeline(); pipeline.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8)); //第三種,只會在後續的channelhandler中傳播 ctx.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8)); // ByteBuf data = (ByteBuf) msg; // log.info(data.toString(CharsetUtil.UTF_8)); // ctx.writeAndFlush(data); }
二、AbstractChannelHandlerContext類
雙向鏈表結構,next/prev分別是後繼節點,和前驅節點
volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev;
三、DefaultChannelHandlerContext 是實現類,可是大部分都是父類那邊完成,這個只是簡單的實現一些方法
主要就是判斷Handler的類型
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
ChannelInboundHandler之間的傳遞,主要經過調用ctx裏面的FireXXX()方法來實現下個handler的調用
咱們修改一下EchoServerHandler,再增長一個EchoServerHandler2
/** * 事件處理器 */ @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 監聽讀取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // //第一種,會在整個管道中傳播 // Channel channel = ctx.channel(); // channel.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8)); // //第二種,會在整個管道中傳播 // ChannelPipeline pipeline = ctx.pipeline(); // pipeline.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8)); // //第三種,只會在後續的channelhandler中傳播 // ctx.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8)); ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("我是第一個handler", CharsetUtil.UTF_8)); ctx.fireChannelRead(msg); } /** * 監聽讀取完畢事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 監聽異常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 將channel註冊到EventLoop的Selector多路複用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未註冊到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有鏈接,變爲活躍狀態 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 沒有鏈接,非活躍狀態 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
@Slf4j public class EchoServerHandler2 extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("我是第二個handler", CharsetUtil.UTF_8)); } /** * 監聽讀取完畢事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 監聽異常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 將channel註冊到EventLoop的Selector多路複用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未註冊到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有鏈接,變爲活躍狀態 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 沒有鏈接,非活躍狀態 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } }
將EchoServerHandler2添加到管道中
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 socketChannel.pipeline().addLast(new EchoServerHandler()); socketChannel.pipeline().addLast(new EchoServerHandler2()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
咱們能夠看到EchoServerHandler2是添加到EchoServerHandler後面的。
啓動EchoServer,打開telnet
admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
kdfaf
我是第一個handler我是第二個handler^]
telnet> quit
Connection closed.
服務端的日誌爲
2019-09-29 22:42:42.626 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelRegistered
2019-09-29 22:42:42.627 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelActive
2019-09-29 22:42:48.108 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : kdfaf
2019-09-29 22:42:48.114 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler2 : kdfaf
2019-09-29 22:42:48.114 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-09-29 22:44:09.799 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-09-29 22:44:09.802 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelInactive
2019-09-29 22:44:09.802 INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelUnregistered
多個入站出站ChannelHandler的執行順序
一、通常的項目中,inboundHandler和outboundHandler有多個,在Pipeline中的執行順序?
InboundHandler順序執行,OutboundHandler逆序執行
問題:ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
ch.pipeline().addLast(new InboundHandler2());
或者:
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
如今咱們以實際代碼來驗證一下
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8)); //會傳遞到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class InboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler2 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8)); //在此結束調用,不會在管道中繼續傳遞 ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class OutboundHandler1 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("OutBoundHandler1 write : " + data.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("OutBoundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
@Slf4j public class OutboundHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("OutboundHaneler2 write : " + data.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("OutboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
啓動EchoServer,打開telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
dsfs
InboundHandler2 InboundHandler1 dsfs
服務端日誌
2019-10-02 09:51:44.865 INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服務端收到數據:dsfs
2019-10-02 09:51:44.869 INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服務端收到數據:InboundHandler1 dsfs
咱們能夠看到此時,事件在InboundHandler中就結束了,並無傳遞到OutboundHandler中。這是由於OutboundHandler在InboundHandler以後纔開始監聽,InboundHandler在處理中,並無監聽寫出站事件,因此不會執行到OutboundHandler之中的代碼。
可是若是把監聽事件的順序調整一下,在入站以前就開始監聽出站事件。
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
運行EchoServer,打開telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sfdfg
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg
^]
telnet> quit
Connection closed.
從右到左爲依次執行順序,先是InboundHandler1->InboundHandler2->OutboundHandler2->OutboundHandler1
服務端日誌爲
2019-10-02 10:27:03.683 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服務端收到數據:sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服務端收到數據:InboundHandler1 sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sfdfg
2019-10-02 10:27:03.686 INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg
由於沒有監聽活躍,斷開事件,因此不會打印相關日誌。這裏之因此OutboundHandler會執行是由於在入站以前就開始監聽寫出事件,而InboundHandler會先執行是由於只有接收到客戶端的消息的時候纔會進行寫出操做,這個時候纔會被OutboundHandler監聽到,進行相關操做,可是因爲OutboundHandler是從後往前執行,因此會先執行OutboundHandler2,再執行OutboundHandler1。而整個數據傳輸是貫穿管道始終的。
執行順序是:
InboundHandler1 channelRead
InboundHandler2 channelRead
OutboundHandler2 write
OutboundHandler1 write
如今把InboundHandler1提到最前又是什麼狀況呢?
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 // socketChannel.pipeline().addLast(new EchoServerHandler()); // socketChannel.pipeline().addLast(new EchoServerHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); socketChannel.pipeline().addLast(new InboundHandler2()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
由分析可知,OutboundHandler出站是不會監聽InboudHandler1的寫出事件的,但能夠監聽到InboundHandler2的寫出事件。
咱們修改一下兩個InboundHandler來加以驗證。
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //會傳遞到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
@Slf4j public class InboundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler2 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8)); //在此結束調用,不會在管道中繼續傳遞 ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 alone \n".getBytes())); ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
重啓EchoServer,打開telnet
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfljioldf
InboundHandler1 sdfljioldf //此處InboundHandler1只是簡單的輸出了,沒有被OutboundHandler監聽。
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone //此處InboundHandler2被OutboundHandler監聽,因此會追加輸出OutBoundHandler1 OutboundHandler2
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf //此處也是InboundHandler2被OutboundHandler監聽輸出的。
服務端日誌
2019-10-02 11:04:09.345 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服務端收到數據:sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服務端收到數據:InboundHandler1 sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 alone
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sdfljioldf
2019-10-02 11:04:09.351 INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf
跟咱們分析的同樣。
但若是咱們修改一下InboundHandler1的代碼以下
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8)); ctx.channel().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //會傳遞到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
或者
@Slf4j public class InboundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("InboundHandler1 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8)); ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); //會傳遞到inbounhandler2中 ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
如今不管InboundHandler1的位置放前、放後,是否被OutboundHandler監聽,它都會流通整個管道。
admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler1 alone ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg
服務端日誌爲
2019-10-02 11:18:10.761 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1 : InboundHandler1 channelRead 服務端收到數據:ljlfrg
2019-10-02 11:18:10.764 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler1 alone ljlfrg
2019-10-02 11:18:10.764 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler1 alone ljlfrg
2019-10-02 11:18:10.767 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2 : InboundHandler2 channelRead 服務端收到數據:InboundHandler1 ljlfrg
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 alone
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2 : OutboundHaneler2 write : InboundHandler2 InboundHandler1 ljlfrg
2019-10-02 11:18:10.768 INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1 : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg
結論:
1)InboundHandler順序執行,OutboundHandler逆序執行
2)InboundHandler之間傳遞數據,經過ctx.fireChannelRead(msg)
3)InboundHandler經過ctx.write(msg),則會傳遞到outboundHandler
4) 使用ctx.write(msg)傳遞消息,Inbound須要放在結尾,在Outbound以後,否則outboundhandler會不執行;
可是使用channel.write(msg)、pipline.write(msg)狀況會不一致,都會執行
5) outBound和Inbound誰先執行,針對客戶端和服務端而言,客戶端是發起請求再接受數據,先outbound再inbound,服務端則相反
七、Netty異步操做模塊ChannelFuture
Netty中的全部I/O操做都是異步的,這意味着任何I/O調用都會當即返回,而ChannelFuture會提供有關的信息I/O操做的結果或狀態。
ChannelFuture的使用主要是在EchoServer中
//綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync();
1)ChannelFuture狀態:
未完成:當I/O操做開始時,將建立一個新的對象,新的最初是未完成的 - 它既沒有成功,也沒有被取消,由於I/O操做還沒有完成。
已完成:當I/O操做完成,無論是成功、失敗仍是取消,Future都是標記爲已完成的, 失敗的時候也有具體的信息,例如緣由失敗,但請注意,即便失敗和取消屬於完成狀態。
ChannelFuture的四種狀態
* +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+
注意:不要在IO線程內調用future對象的sync或者await方法
不能在channelHandler中調用sync或者await方法
2)ChannelPromise:繼承於ChannelFuture,進一步拓展用於設置IO操做的結果
其中的繼承關係圖如上所示,第一個Future是JDK自帶的,第二個Future是netty中增長的。關於future通常的使用方法能夠參考Fork/Join框架原理和使用探祕
Netty編寫的網絡數據傳輸中的編碼和解碼
前面說的:高性能RPC框架的3個要素:IO模型、數據協議、線程模型
最開始接觸的編碼碼:java序列化/反序列化(就是編解碼)、url編碼、base64編解碼
爲啥jdk有編解碼,還要netty本身開發編解碼?
java自帶序列化的缺點
1)沒法跨語言
2) 序列化後的碼流太大,也就是數據包太大
3) 序列化和反序列化性能比較差
業界裏面也有其餘編碼框架: google的 protobuf(PB)、Facebook的Trift、Jboss的Marshalling、Kyro等,關於Kyro的使用方法能夠參考淺析克隆 。
Netty裏面的編解碼:
解碼器:負責處理「入站 InboundHandler」數據
編碼器:負責「出站 OutboundHandler」 數據
Netty裏面提供默認的編解碼器,也支持自定義編解碼器
Encoder:編碼器
Decoder:解碼器
Codec:編解碼器
Netty的解碼器Decoder和使用場景
Decoder對應的就是ChannelInboundHandler,主要就是字節數組轉換爲消息對象,在咱們以前的樣例中,都是處理一些簡單的字符串來進行消息傳遞,但若是咱們須要轉換的是Java的對象的話,則須要使用到Decoder。
主要是兩個方法
decode (經常使用)
decodeLast (用於最後的幾個字節處理,也就是channel關閉的時候,產生的最後一個消息)
抽象解碼器
1)ByteToMessageDecoder
用於將字節轉爲消息,須要檢查緩衝區是否有足夠的字節,經過源碼可知,ByteToMessageDecoder實際上就是一個ChannelInboundHandler。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
2)ReplayingDecoder
繼承ByteToMessageDecoder,不須要檢查緩衝區是否有足夠的字節,可是ReplayingDecoder速度略慢於ByteToMessageDecoder,不是全部的ByteBuf都支持
選擇:項目複雜性高則使用ReplayingDecoder,不然使用 ByteToMessageDecoder
解碼器具體的實現,用的比較多的是(更可能是爲了解決TCP底層的粘包和拆包問題)
DelimiterBasedFrameDecoder: 指定消息分隔符的解碼器
LineBasedFrameDecoder: 以換行符爲結束標誌的解碼器
FixedLengthFrameDecoder:固定長度解碼器
LengthFieldBasedFrameDecoder:message = header+body, 基於長度解碼的通用解碼器
StringDecoder:文本解碼器,將接收到的對象轉化爲字符串,通常會與上面的進行配合,而後在後面添加業務handle
Netty編碼器Encoder
Encoder對應的就是ChannelOutboundHandler,消息對象轉換爲字節數組
Netty自己未提供和解碼同樣的編碼器,是由於場景不一樣,二者非對等的
1)MessageToByteEncoder
消息轉爲字節數組,調用write方法,會先判斷當前編碼器是否支持須要發送的消息類型,若是不支持,則透傳;其自己其實就是一個ChannelOutboundHandler。
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter
2)MessageToMessageEncoder
用於從一種消息編碼爲另一種消息(例如POJO到POJO)
Netty編解碼器類Codec
組合解碼器和編碼器,以此提供對於字節和消息都相同的操做(通常不經常使用)
優勢:成對出現,編解碼都是在一個類裏面完成
缺點:耦合在一塊兒,拓展性不佳
Codec:組合編解碼
1)ByteToMessageCodec
2)MessageToMessageCodec
decoder:解碼
1)ByteToMessageDecoder
2)MessageToMessageDecoder
encoder:編碼
1)ByteToMessageEncoder
2)MessageToMessageEncoder
什麼是TCP粘包拆包
1)TCP拆包: 一個完整的包可能會被TCP拆分爲多個包進行發送
2)TCP粘包: 把多個小的包封裝成一個大的數據包發送, client發送的若干數據包 Server接收時粘成一包
發送方和接收方均可能出現這個緣由
發送方的緣由:TCP默認會使用Nagle算法
接收方的緣由: TCP接收到數據放置緩存中,應用程序從緩存中讀取
UDP: 是沒有粘包和拆包的問題,有邊界協議
二、TCP半包讀寫常看法決方案
簡介:講解TCP半包讀寫常見的解決辦法
發送方:能夠關閉Nagle算法
接受方: TCP是無界的數據流,並無處理粘包現象的機制, 且協議自己沒法避免粘包,半包讀寫的發生須要在應用層進行處理
應用層解決半包讀寫的辦法
1)設置定長消息 (10字符)
xdclass000xdclass000xdclass000xdclass000
2)設置消息的邊界 ($$ 切割)
sdfafwefqwefwe$$dsafadfadsfwqehidwuehfiw$$879329832r89qweew$$
3)使用帶消息頭的協議,消息頭存儲消息開始標識及消息的長度信息
Header+Body
三、Netty自帶解決TCP半包讀寫方案
DelimiterBasedFrameDecoder: 指定消息分隔符的解碼器
LineBasedFrameDecoder: 以換行符爲結束標誌的解碼器
FixedLengthFrameDecoder:固定長度解碼器
LengthFieldBasedFrameDecoder:message = header+body, 基於長度解碼的通用解碼器
如今咱們來作一個實驗,若是不使用解碼器,會產生什麼樣的效果。
服務端的入站事件處理器
@Slf4j public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; byte[] bytes = new byte[data.readableBytes()]; //將收到到字節流放入內存字節數組 data.readBytes(bytes); //將字節數組轉成字符串,並截取該字符串從0到換行符(不包含換行符)到子串 //System.getProperty("line.separator")爲獲取操做系統的換行符,每種操做系統可能各不相同 String body = new String(bytes, CharsetUtil.UTF_8) .substring(0,bytes.length - System.getProperty("line.separator").length()); log.info("服務端收到消息內容爲:" + body + "收到消息次數:" + ++counter); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客戶端入站事件處理器
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; //定義一段包含換行符的字符串,並轉化爲字節數組 byte[] req = ("guanjian.net" + System.getProperty("line.separator")).getBytes(); //連續發送10條該字符串到服務端 for (int i = 0;i < 10;i++) { //申請一段內存緩衝區 msg = Unpooled.buffer(req.length); //將字節數組寫入緩衝區 msg.writeBytes(req); //發送該字符串到服務端 ctx.writeAndFlush(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
分別啓動服務端和客戶端
咱們能夠看到服務端日誌爲
2019-10-05 08:41:11.733 INFO 614 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net收到消息次數:1
雖然客戶端是分了10次發送到數據,可是服務端倒是隻接收了一次,收到消息次數1,這明顯是一次粘包。
如今咱們給客戶端加上.option(ChannelOption.TCP_NODELAY,true)來看一下是什麼狀況
@AllArgsConstructor public class EchoClient { private String host; private int port; public void run() throws InterruptedException { //客戶端處理線程組(其實就是一個線程池) EventLoopGroup group = new NioEventLoopGroup(); try { //客戶端netty啓動對象 Bootstrap bootstrap = new Bootstrap(); //將客戶端線程組添加到啓動對象中 bootstrap.group(group) //給啓動對象添加Socket管道 .channel(NioSocketChannel.class) //主動鏈接到遠程服務器IP端口 .remoteAddress(new InetSocketAddress(host,port)) .option(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); //鏈接到服務端,connect是異步鏈接,在調用同步等待sync,等待鏈接成功 ChannelFuture channelFuture = bootstrap.connect().sync(); //阻塞直到客戶端通道關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 group.shutdownGracefully(); } } }
服務端日誌爲
2019-10-05 08:45:52.654 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:1
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net
guanjian.net收到消息次數:2
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net
guanjian.net收到消息次數:3
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net
guanjian.net收到消息次數:4
2019-10-05 08:45:52.655 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:5
2019-10-05 08:45:52.656 INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net
guanjian.net收到消息次數:6
咱們能夠看到,雖然加了.option(ChannelOption.TCP_NODELAY,true),但並不能保證它不產生粘包,有些包包含了兩條字符串,收到的消息也就不是10次了。
如今咱們把換行解碼器LineBasedFrameDecoder放入
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 //這個1024是在1024個字節內去尋找換行符,若是在1024個字節內沒有找到換行符,就會報錯 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
再次運行,服務端日誌
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:1
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:2
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:3
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:4
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:5
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:6
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:7
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:8
2019-10-05 09:04:15.693 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:9
2019-10-05 09:04:15.694 INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.ne收到消息次數:10
LineBasedFrameDecoder是繼承於ByteToMessageDecoder,是一個ChannelInboundHandler
public class LineBasedFrameDecoder extends ByteToMessageDecoder
由上面可見,粘包通過LineBasedFrameDecoder處理,再逐條發往下一個ChannelInboundHandler,而不是從客戶端逐條發送過來的。
若是咱們以爲ServerHandler的channelRead方法太麻煩了,還要由字節數組轉成字符串,那咱們直接將收到的信息強制轉成字符串會怎麼樣呢?
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; log.info("服務端收到消息內容爲:" + body + "收到消息次數:" + ++counter); }
啓動後服務端報錯以下(部分)
java.lang.ClassCastException: io.netty.buffer.PooledSlicedByteBuf cannot be cast to java.lang.String
at com.guanjian.websocket.netty.packet.ServerHandler.channelRead(ServerHandler.java:29)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
說明此處沒法將收到的信息直接強制轉換成字符串,這個時候咱們能夠加入字符串解碼器StringDecoder.
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 //這個1024是在1024個字節內去尋找換行符,若是在1024個字節內沒有找到換行符,就會報錯 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); //字符串解碼器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
啓動運行,服務端日誌爲
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:1
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:2
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:3
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:4
2019-10-05 10:32:13.774 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:5
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:6
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:7
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:8
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:9
2019-10-05 10:32:13.775 INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:guanjian.net收到消息次數:10
使用解碼器LineBasedFrameDecoder解決半包讀寫問題
1)LineBaseFrameDecoder 以換行符爲結束標誌的解碼器 ,構造函數裏面的數字表示最長遍歷的幀數
2)StringDecoder解碼器將對象轉成字符串
如今咱們修改一下客戶端處理器ClientHandler。
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)"; ByteBuf buf = null; //申請一段內存緩衝區 buf = Unpooled.buffer(message.getBytes().length); //將字節數組寫入緩衝區 buf.writeBytes(message.getBytes()); //發送字節數組到服務端 ctx.writeAndFlush(buf); }
咱們能夠看到message字符串中都帶有&_字符,咱們將在服務端以該字符爲分隔符進行解碼。
修改EchoServer到代碼以下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 //使用指定消息分隔符解碼器進行解碼 //1024的意思是在1024個字節內查找&_,若是找不到就會拋出異常 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); //字符串解碼器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
分別啓動服務端,客戶端。服務端到日誌以下
2019-10-06 12:55:36.078 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:Starting EchoclientApplication 收到消息次數:1
2019-10-06 12:55:36.078 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:on admindeMBP.lan with PID 741 收到消息次數:2
2019-10-06 12:55:36.079 INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次數:3
對比message
"Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)"
咱們會發現最後一句/Users/admin/Downloads/nettyecho)被丟棄了,因此要所有拿到咱們須要的消息,咱們須要在最後一段字符串中加入&_
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "Starting EchoclientApplication &_" + "on admindeMBP.lan with PID 741 &_" + "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" + "/Users/admin/Downloads/nettyecho)&_"; ByteBuf buf = null; //申請一段內存緩衝區 buf = Unpooled.buffer(message.getBytes().length); //將字節數組寫入緩衝區 buf.writeBytes(message.getBytes()); //發送字節數組到服務端 ctx.writeAndFlush(buf); }
再次啓動客戶端,服務端日誌爲
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:Starting EchoclientApplication 收到消息次數:1
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:on admindeMBP.lan with PID 741 收到消息次數:2
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次數:3
2019-10-06 13:18:51.705 INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:/Users/admin/Downloads/nettyecho)收到消息次數:4
其實DelimiterBasedFrameDecoder有不少個構造器。咱們這裏使用的兩參構造器其實調用到是一個三參構造器。
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) { this(maxFrameLength, true, delimiter); }
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) { this(maxFrameLength, stripDelimiter, true, delimiter); }
而三參構造器實際上是調用了一個四參構造器
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf delimiter) { this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[] { delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())}); }
咱們來講明一下這個四參構造器每一個參數到含義。
maxFrameLength:
表示一行最大的長度,若是超過這個長度依然沒有檢測自定義分隔符,將會拋出TooLongFrameException
stripDelimiter:
解碼後的消息是否去除掉分隔符(true去掉分隔符,flase保留分隔符)
failFast:
若是爲true,則超出maxLength後當即拋出TooLongFrameException,不進行繼續解碼
若是爲false,則等到完整的消息被解碼後,再拋出TooLongFrameException異常
delimiters:
分隔符,ByteBuf類型
如今咱們來看一下第二個參數到含義。
修改EchoServer以下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 //使用指定消息分隔符解碼器進行解碼,1024爲在1024個字節內查找分隔符(能夠本身任意定義),若是 //找不到會拋出異常 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,false,true,delimiter)); //字符串解碼器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
從新啓動服務端,客戶端。服務端到日誌以下
2019-10-06 13:33:24.806 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:Starting EchoclientApplication &_收到消息次數:1
2019-10-06 13:33:24.807 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:on admindeMBP.lan with PID 741 &_收到消息次數:2
2019-10-06 13:33:24.807 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_收到消息次數:3
2019-10-06 13:33:24.808 INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler : 服務端收到消息內容爲:/Users/admin/Downloads/nettyecho)&_收到消息次數:4
自定義長度半包讀寫器LengthFieldBasedFrameDecoder
LengthFieldBasedFrameDecoder也有不少構造器,通常咱們使用的是一個五參構造器
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this( maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); }
固然它調用的是一個六參構造器。
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { this( ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); }
這六個參數的含義以下
maxFrameLength 數據包的最大長度
lengthFieldOffset 長度字段的偏移位,長度字段開始的地方,意思是跳過指定長度個字節以後的纔是消息體字段
lengthFieldLength 長度字段佔的字節數, 幀數據長度的字段自己的長度
lengthAdjustment
通常 Header + Body,添加到長度字段的補償值,若是爲負數,開發人員認爲這個 Header的長度字段是整個消息包的長度,則Netty應該減去對應的數字
initialBytesToStrip 從解碼幀中第一次去除的字節數, 獲取完一個完整的數據包以後,忽略前面的指定位數的長度字節,應用解碼器拿到的就是不帶長度域的數據包
failFast 是否快速失敗
六參構造器調用到是一個七參構造器(不討論)。
如今咱們來設置一個存儲消息內容到消息類。(服務端和客戶端相同)
public class CCMessageHeader { @Getter @Setter private byte[] messageFlag = new byte[2]; //消息長度偏移量 @Getter @Setter private int length; //消息長度 @Getter @Setter private int type; //消息類型 @Getter private String data; //消息內容 public CCMessageHeader() { //170,這兩個數沒有實際意義,只用來描述消息長度偏移量而填充進去的 messageFlag[0] = (byte) 0xaa; //187 messageFlag[1] = (byte) 0xbb; } public void setData(String data) { this.data = data; //消息體的長度爲字符串data的長度加4,4爲type整形的長度 this.length = data.length() + 4; } }
服務端入站處理器
@Slf4j public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf body = (ByteBuf) msg; //若是字節流到可讀字節數小於等於0,轉給下一個InboundHandler處理 if (body.readableBytes() <= 0) { ctx.fireChannelRead(msg); } //初始化一個接收消息對象 CCMessageHeader recHd = new CCMessageHeader(); //字節流讀取到的第一個整形賦給nLength int nLength = body.readInt(); //字節流讀取到到第二個整形賦給nType int nType = body.readInt(); //獲取字節流的可讀字節數,並分配一個該大小的字節數組 int nDataSize = body.readableBytes(); byte[] aa = new byte[nDataSize]; //將字節流讀入字節數組 body.readBytes(aa); //將該字節數組轉成字符串 String myMsg = new String(aa, CharsetUtil.UTF_8); log.info("收到 " + ++counter + "次消息:[" + myMsg + "],類型爲[" + nType + "]"); //初始化一個發送消息對象 CCMessageHeader hd = new CCMessageHeader(); hd.setType(2); hd.setData("server data..."); //申請一段直接緩衝空間 ByteBuf echo = Unpooled.directBuffer(); //將該發送消息對象的各屬性寫入到緩衝空間中 echo.writeBytes(hd.getMessageFlag()); echo.writeInt(hd.getLength()); echo.writeInt(hd.getType()); echo.writeCharSequence(hd.getData(),CharsetUtil.UTF_8); //將該發送消息對象以字節流到形式發送到客戶端 ctx.writeAndFlush(echo); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EchoServer以下
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 //65535表示對自定義長度解碼器對最大處理長度, // 第二個2表示長度信息從第二個字節後開始獲取, //其實這個2是messageFlag的長度,即獲取長度字段的偏移量。 //4表示長度字段佔4個字節,即private int length的字節數,一個整數佔4個字節 //0表示添加到長度字段的補償值,這裏不須要補償 //最後一個2表示獲取咱們的消息體,要去掉2個長度字段的字節數,由於除了length字段還有一個 //type的整形字段,因此是2才能拿到data字段,即顯示消息體字段,但其實消息體包含了type字段的 socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2)); socketChannel.pipeline().addLast(new ServerHandler()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
客戶端入站處理器
@Slf4j public class ClientHandler2 extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //初始化一個發送消息對象 CCMessageHeader hd = new CCMessageHeader(); //設置類型爲1 hd.setType(1); for (int i = 0;i < 10;i++) { String strData = String.format("client data %d...",i + 1); //設置消息體 hd.setData(strData); //申請一段直接緩衝區 ByteBuf echo = Unpooled.directBuffer(); //將消息對象的各個屬性寫入該緩衝區 echo.writeBytes(hd.getMessageFlag()); echo.writeInt(hd.getLength()); echo.writeInt(hd.getType()); echo.writeCharSequence(hd.getData(), CharsetUtil.UTF_8); //將該緩衝區的字節流發送到服務端 ctx.writeAndFlush(echo); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf body = (ByteBuf) msg; //初始化接收消息對象 CCMessageHeader recHd = new CCMessageHeader(); //獲取第一個整形賦給nLength,即消息長度 int nLength = body.readInt(); //獲取第二個整形賦給nType int nType = body.readInt(); //獲取整個字節流的可讀字節數,並以此創建一個字節數組 int nDataSize = body.readableBytes(); byte[] aa = new byte[nDataSize]; //將字節流讀入該字節數組 body.readBytes(aa); //將該字節數組轉換爲字符串 String myMsg = new String(aa,CharsetUtil.UTF_8); log.info("收到" + ++counter + "次消息[" + myMsg + "],類型爲:[" + nType + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EchoClient代碼以下
@AllArgsConstructor public class EchoClient { private String host; private int port; public void run() throws InterruptedException { //客戶端處理線程組(其實就是一個線程池) EventLoopGroup group = new NioEventLoopGroup(); try { //客戶端netty啓動對象 Bootstrap bootstrap = new Bootstrap(); //將客戶端線程組添加到啓動對象中 bootstrap.group(group) //給啓動對象添加Socket管道 .channel(NioSocketChannel.class) //主動鏈接到遠程服務器IP端口 .remoteAddress(new InetSocketAddress(host,port)) .option(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2)); socketChannel.pipeline().addLast(new ClientHandler2()); } }); //鏈接到服務端,connect是異步鏈接,在調用同步等待sync,等待鏈接成功 ChannelFuture channelFuture = bootstrap.connect().sync(); //阻塞直到客戶端通道關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 group.shutdownGracefully(); } } }
運行服務端,客戶端。服務端日誌以下
2019-10-06 15:27:54.601 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 1次消息:[client data 1...],類型爲[1]
2019-10-06 15:27:54.604 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 2次消息:[client data 2...],類型爲[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 3次消息:[client data 3...],類型爲[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 4次消息:[client data 4...],類型爲[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 5次消息:[client data 5...],類型爲[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 6次消息:[client data 6...],類型爲[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 7次消息:[client data 7...],類型爲[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 8次消息:[client data 8...],類型爲[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 9次消息:[client data 9...],類型爲[1]
2019-10-06 15:27:54.605 INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler : 收到 10次消息:[client data 10...],類型爲[1]
客戶端日誌以下
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到1次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到2次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.611 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到3次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到4次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到5次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到6次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到7次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到8次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到9次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612 INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2 : 收到10次消息[server data...],類型爲:[2]
這樣就分別完成了自定義長度的半包解碼