在前面的時間服務器例程中,咱們屢次強調並無考慮讀半包問題,這在功能測試時每每沒有問題,可是一旦壓力上來,或者發送大報文以後,就會存在粘包/拆包問題。若是代碼沒有考慮,每每就會出現解碼錯位或者錯誤,致使程序不能正常工做。以Netty 入門示例爲例。
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeServerHandler extends ChannelHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length()); System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
每讀到一條消息後,就計一次數,而後發送應答消息給客戶端。按照設計,服務端接收到的消息總數應該跟客戶端發送的消息總數相同,並且請求消息刪除回車換行符後應該爲"QUERY TIME ORDER"。
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeClientHandler extends ChannelHandlerAdapter { private int counter; private byte[] req; public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf message = null; for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("Now is : " + body + " ; the counter is : " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 釋放資源 ctx.close(); } }
The time server receive order : QUERY TIME ORDER
QUERY TIME ORDER ; the counter is : 1
The time server receive order :
QUERY TIME ORDER ; the counter is : 2
服務端運行結果代表它只接收到了兩條消息,第一條包含57條「QUERY TIME ORDER」指令,第二條包含了43條「QUERY TIME ORDER」指令,總數正好是100條。咱們期待的是收到100條消息,每條包含一條「QUERY TIME ORDER」指令。這說明發生了TCP粘包。
Now is : BAD ORDER
; the counter is : 1
按照設計初衷,客戶端應該收到100條當前系統時間的消息,但實際上只收到了一條。這不難理解,由於服務端只收到了2條請求消息,因此實際服務端只發送了2條應答,因爲請求消息不知足查詢條件,因此返回了2條「BAD ORDER」應答消息。可是實際上客戶端只收到了一條包含2條「BAD ORDER」指令的消息,說明服務端返回的應答消息也發生了粘包。因爲上面的例程沒有考慮TCP的粘包/拆包,因此當發生TCP粘包時,咱們的程序就不能正常工做。
爲了解決TCP粘包/拆包致使的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包,只要能熟練掌握這些類庫的使用,TCP粘包問題今後會變得很是容易,你甚至不須要關心它們,這也是其餘NIO框架和JDK原生的NIO API所沒法匹敵的。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer { @Override protected void initChannel(Channel arg0) throws Exception { arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new TimeServer().bind(port); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeServerHandler extends ChannelHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeClient { public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); // 發起異步鏈接操做 ChannelFuture f = b.connect(host, port).sync(); // 等待客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new TimeClient().connect(port, ""); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeClientHandler extends ChannelHandlerAdapter { private int counter; private byte[] req; public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")) .getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf message = null; for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("Now is : " + body + " ; the counter is : " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 釋放資源 ctx.close(); } }
The time server receive order : QUERY TIME ORDER ; the counter is : 1
The time server receive order : QUERY TIME ORDER ; the counter is : 100
Now is : Thu Feb 20 00:00:14 CST 2014 ; the counter is : 1
Now is : Thu Feb 20 00:00:14 CST 2014 ; the counter is : 100
StringDecoder的功能很是簡單,就是將接收到的對象轉換成字符串,而後繼續調用後面的handler。LineBasedFrameDecoder + StringDecoder組合就是按行切換的文本解碼器,它被設計用來支持TCP的粘包和拆包。