1 public class TimeClientHandler extends ChannelHandlerAdapter { 2 3 private static final Logger LOGGER = LoggerFactory.getLogger(TimeClientHandler.class); 4 5 private int counter; 6 7 private byte[] req; 8 9 public TimeClientHandler() { 10 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); 11 } 12 13 @Override 14 public void channelActive(ChannelHandlerContext ctx) throws Exception { 15 ByteBuf message = null; 16 for (int i = 0; i < 100; i++) { 17 message = Unpooled.buffer(req.length); 18 message.writeBytes(req); 19 ctx.writeAndFlush(message); 20 } 21 } 22 23 @Override 24 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 25 ByteBuf buf = (ByteBuf)msg; 26 byte[] req = new byte[buf.readableBytes()]; 27 buf.readBytes(req); 28 29 String body = new String(req, "UTF-8"); 30 System.out.println("Now is:" + body + "; the counter is:" + ++counter); 31 } 32 33 @Override 34 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 35 LOGGER.warn("Unexcepted exception from downstream:" + cause.getMessage()); 36 ctx.close(); 37 } 38 39 }
TimeClientHandler的變化是,以前是發送一次"QUERY TIME ORDER"到服務端,如今變爲發送100次"QUERY TIME ORDER"+標準換行符到服務端,並在客戶端增長一個計數器,記錄從服務端收到的響應次數。
1 public class TimeServerHandler extends ChannelHandlerAdapter { 2 3 private int counter; 4 5 @Override 6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 7 ByteBuf buf = (ByteBuf)msg; 8 byte[] req = new byte[buf.readableBytes()]; 9 buf.readBytes(req); 10 11 String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length()); 12 System.out.println("The time server receive order:" + body + "; the counter is:" + ++counter); 13 14 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 15 currentTime = currentTime + System.getProperty("line.separator"); 16 17 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 18 ctx.writeAndFlush(resp); 19 } 20 21 @Override 22 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 23 ctx.close(); 24 } 25 26 }
按照設計,服務端應該會打印出100次"Time time server...",客戶端應當會打印出100次"Now is ...",由於客戶端向服務端發送了100次"QUERY TIME ORDER"的請求,實際運行起來呢?先看一下服務端的打印:
The time server receive order:QUERY TIME ORDER QUERY TIME ORDER ...省略,這裏有55個 QUERY TIME ORD; the counter is:1 The time server receive order: ...省略,這裏有42個 QUERY TIME ORDER; the counter is:2
Now is:BAD ORDER BAD ORDER ; the counter is:1
由於服務端只收到了2條消息,所以客戶端也只會收到2條消息,由於服務端兩次收到的內容都不知足"QUERY TIME ORDER",所以返回"BAD ORDER"到客戶端,可是爲何客戶端的counter=1呢?回過頭來仔細想一想,所以服務端發送給客戶端的消息也發生了粘包。所以這裏簡單得出一個結論:粘包/拆包不只僅發生在客戶端給服務端發送數據,服務端回數據給客戶端一樣有可能發生粘包/拆包。
爲了解決TCP粘包/拆包致使的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包,針對上面發送"QUERY TIME ORDER"+標準換行符的這種場景,簡單使用LineBasedFrameDecoder就能夠解決上面發生的粘包問題。
1 public class TimeServer { 2 3 public void bind(int port) throws Exception { 4 // NIO線程組 5 EventLoopGroup bossGroup = new NioEventLoopGroup(); 6 EventLoopGroup workerGroup = new NioEventLoopGroup(); 7 8 try { 9 ServerBootstrap b = new ServerBootstrap(); 10 b.group(bossGroup, workerGroup) 11 .channel(NioServerSocketChannel.class) 12 .option(ChannelOption.SO_BACKLOG, 1024) 13 .childHandler(new ChildChannelHandler()); 14 15 // 綁定端口,同步等待成功 16 ChannelFuture f = b.bind(port).sync(); 17 // 等待服務端監聽端口關閉 18 f.channel().closeFuture().sync(); 19 } finally { 20 // 優雅退出,釋放線程池資源 21 bossGroup.shutdownGracefully(); 22 workerGroup.shutdownGracefully(); 23 } 24 } 25 26 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 27 @Override 28 protected void initChannel(SocketChannel arg0) throws Exception { 29 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); 30 arg0.pipeline().addLast(new StringDecoder()); 31 arg0.pipeline().addLast(new TimeServerHandler()); 32 } 33 } 34 35 }
1 public class TimeServerHandler extends ChannelHandlerAdapter { 2 3 private int counter; 4 5 @Override 6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 7 String body = (String)msg; 8 System.out.println("The time server receive order:" + body + "; the counter is:" + ++counter); 9 10 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 11 currentTime = currentTime + System.getProperty("line.separator"); 12 13 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 14 ctx.writeAndFlush(resp); 15 } 16 17 @Override 18 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 19 ctx.close(); 20 } 21 22 }
1 public class TimeClient { 2 3 public void connect(int port, String host) throws Exception { 4 EventLoopGroup group = new NioEventLoopGroup(); 5 try { 6 Bootstrap b = new Bootstrap(); 7 8 b.group(group) 9 .channel(NioSocketChannel.class) 10 .option(ChannelOption.TCP_NODELAY, true) 11 .handler(new ChannelInitializer<SocketChannel>() { 12 protected void initChannel(SocketChannel ch) throws Exception { 13 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); 14 ch.pipeline().addLast(new StringDecoder()); 15 ch.pipeline().addLast(new TimeClientHandler()); 16 }; 17 }); 18 19 // 發起異步鏈接操做 20 ChannelFuture f = b.connect(host, port).sync(); 21 // 等待客戶端鏈接關閉 22 f.channel().closeFuture().sync(); 23 } finally { 24 // 優雅退出,釋放NIO線程組 25 group.shutdownGracefully(); 26 } 27 } 28 29 }
1 public class TimeClientHandler extends ChannelHandlerAdapter { 2 3 private static final Logger LOGGER = LoggerFactory.getLogger(TimeClientHandler.class); 4 5 private int counter; 6 7 private byte[] req; 8 9 public TimeClientHandler() { 10 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); 11 } 12 13 @Override 14 public void channelActive(ChannelHandlerContext ctx) throws Exception { 15 ByteBuf message = null; 16 for (int i = 0; i < 100; i++) { 17 message = Unpooled.buffer(req.length); 18 message.writeBytes(req); 19 ctx.writeAndFlush(message); 20 } 21 } 22 23 @Override 24 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 25 String body = (String)msg; 26 System.out.println("Now is:" + body + "; the counter is:" + ++counter); 27 } 28 29 @Override 30 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 31 LOGGER.warn("Unexcepted exception from downstream:" + cause.getMessage()); 32 ctx.close(); 33 } 34 35 }
The time server receive order:QUERY TIME ORDER; the counter is:1 The time server receive order:QUERY TIME ORDER; the counter is:2 The time server receive order:QUERY TIME ORDER; the counter is:3 The time server receive order:QUERY TIME ORDER; the counter is:4 The time server receive order:QUERY TIME ORDER; the counter is:5 ... The time server receive order:QUERY TIME ORDER; the counter is:98 The time server receive order:QUERY TIME ORDER; the counter is:99 The time server receive order:QUERY TIME ORDER; the counter is:100
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:1 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:2 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:3 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:4 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:5 ... Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:98 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:99 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:100