假設客戶端分別發送了兩個數據包D1和D2給服務器,因爲服務器端一次讀取到的字節數是不肯定的,因此可能發生四種狀況:java
一、服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包。bootstrap
二、服務端一次接收到了兩個數據包,D1和D2粘合在一塊兒,被稱爲TCP粘包。服務器
三、服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部份內容,第二次讀取到了D2包的剩餘內容,這被稱爲TCP拆包。網絡
四、服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部份內容D1_1,第二次讀取到了D1包的剩餘內容D1_2和D2包的整包。併發
若是此時服務端TCP接收滑窗很是小,而數據包D1和D2比較大,頗有可能會發生第五種可能,即服務端分屢次才能將D1和D2包接收徹底,期間發生屢次拆包。 異步
那麼在Netty中可以使用LineBasedFrameDecoder和StringDecodersocket
LineBasedFrameDecoder的工做原理是一次遍歷ByteBuf中的可讀字節,判斷看是否有"\n"或者"\r\n",若是有,就以此位置爲結束位置,從可讀索引到結束位置區間的字節就組成了一行。它是以換行符爲結束標誌的解碼器,支持攜帶結束符或者不攜帶結束符兩種解碼方式,同時支持配置單行的最大長度。ide
StringDecoder將接收到的對象轉換成字符串,而後繼續調用後面的handler。oop
利用LineBasedFrameDecoder解決TCP粘包問題:編碼
1 package netty; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.LineBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 15 public class TimeServer { 16 17 public void bind(int port) throws Exception{ 18 //配置服務端的NIO線程組 19 EventLoopGroup bossGroup = new NioEventLoopGroup(); 20 EventLoopGroup workerGroup = new NioEventLoopGroup(); 21 try{ 22 ServerBootstrap b = new ServerBootstrap(); 23 b.group(bossGroup,workerGroup) 24 .channel(NioServerSocketChannel.class) 25 .option(ChannelOption.SO_BACKLOG, 1024) 26 .childHandler(new ChildChannelHandler()); 27 //綁定端口,同步等待成功 28 ChannelFuture f = b.bind(port).sync(); 29 //等待服務器監聽端口關閉 30 f.channel().closeFuture().sync(); 31 }finally{ 32 //優雅退出,釋放線程池資源 33 bossGroup.shutdownGracefully(); 34 workerGroup.shutdownGracefully(); 35 } 36 } 37 38 private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ 39 @Override 40 protected void initChannel(SocketChannel arg0) throws Exception{ 41 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); 42 arg0.pipeline().addLast(new StringDecoder()); 43 arg0.pipeline().addLast(new TimeServerHandler()); 44 } 45 } 46 47 public static void main(String args[]) throws Exception{ 48 int port = 10001; 49 if(args != null && args.length > 0){ 50 try{ 51 port = Integer.valueOf(args[0]); 52 }catch(NumberFormatException e){ 53 //採用默認值 54 } 55 } 56 new TimeServer().bind(port); 57 } 58 }
TimeServerHandler, msg是刪除回車換行符後的請求消息,不須要額外考慮處理半包問題,也不須要對請求消息進行編碼:
1 import java.io.IOException; 2 import io.netty.buffer.ByteBuf; 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelHandlerAdapter; 5 import io.netty.channel.ChannelHandlerContext; 6 7 public class TimeServerHandler extends ChannelHandlerAdapter{ 8 9 private int counter; 10 11 public void channelRead(ChannelHandlerContext ctx,Object msg) throws IOException{ 12 13 String body = (String)msg; 14 System.out.println("The time server receive order:" + body + "; the counter is :" + ++counter); 15 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)? new java.util.Date( 16 System.currentTimeMillis()).toString() : "BAD ORDER"; 17 currentTime = currentTime + System.getProperty("line.separator"); 18 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 19 ctx.writeAndFlush(resp); 20 } 21 22 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{ 23 ctx.flush(); 24 } 25 26 @Override 27 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ 28 ctx.close(); 29 } 30 }
TimeClient,在TimeClientHandler以前新增lineBasedFrameDecoder和StringDecoder解碼器:
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder; 10 import io.netty.handler.codec.string.StringDecoder; 11 12 public class TimeClient { 13 14 public void connect(int port,String host) throws Exception{ 15 //建立客戶端處理I/O讀寫的NioEventLoopGroup Group線程組 16 EventLoopGroup group = new NioEventLoopGroup(); 17 try{ 18 //建立客戶端輔助啓動類Bootstrap 19 Bootstrap b = new Bootstrap(); 20 b.group(group).channel(NioSocketChannel.class) 21 .option(ChannelOption.TCP_NODELAY, true) 22 .handler(new ChannelInitializer<SocketChannel>(){ 23 //將ChannelHandler設置到ChannelPipleline中,用於處理網絡I/O事件 24 @Override 25 protected void initChannel(SocketChannel ch) throws Exception { 26 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); 27 ch.pipeline().addLast(new StringDecoder()); 28 ch.pipeline().addLast(new TimeClientHandler()); 29 } 30 }); 31 //發起異步鏈接操做,而後調用同步方法等待鏈接成功。 32 ChannelFuture f = b.connect(host,port).sync(); 33 34 //等待客戶端鏈路關閉 35 f.channel().closeFuture().sync(); 36 }finally{ 37 //優雅退出,釋放NIO線程組 38 group.shutdownGracefully(); 39 } 40 } 41 42 public static void main(String[] args) throws Exception{ 43 int port = 10001; 44 if(args != null && args.length > 0){ 45 try{ 46 port = Integer.valueOf(args[0]); 47 }catch(NumberFormatException e){ 48 //採用默認值 49 } 50 } 51 new TimeClient().connect(port, "0.0.0.0"); 52 } 53 54 }
TimeClientHandler,拿到的msg已是解碼成字符串以後的應答消息:
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 6 import java.util.logging.Logger; 7 8 public class TimeClientHandler extends ChannelHandlerAdapter{ 9 10 private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName()); 11 12 private int counter; 13 14 private byte[]req; 15 16 public TimeClientHandler(){ 17 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); 18 } 19 20 //當客戶端與服務端TCP鏈路簡歷成功後,Netty的NIO線程會調用該方法,發送查詢時間的指令給服務器 21 public void channelActive(ChannelHandlerContext ctx){ 22 //將請求消息發送給服務端 23 ByteBuf message = null; 24 for(int i = 0;i<100;i++){ 25 message = Unpooled.buffer(req.length); 26 message.writeBytes(req); 27 ctx.writeAndFlush(message); 28 } 29 } 30 31 //當服務器返回應答消息時,該方法被調用 32 public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{ 33 String body = (String)msg; 34 System.out.println("Now is:" + body + "; the counter is :" + ++counter); 35 } 36 37 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ 38 39 //釋放資源 40 logger.warning("Unexpected exception from downstream :" + cause.getMessage()); 41 ctx.close(); 42 } 43 }
運行結果:
發現。。就木有粘包或拆包的問題啦~~~~
機緣巧合,同事也一塊兒實現了Scala版~
clientHandler:
1 package main.nettyscala 2 3 import io.netty.buffer.{ByteBuf, Unpooled} 4 import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter} 5 6 /** 7 * Created by root on 2016/11/18. 8 */ 9 class ClientHandler extends ChannelInboundHandlerAdapter { 10 override def channelActive(ctx: ChannelHandlerContext): Unit = { 11 println("channelActive") 12 //val content = "hello server" 13 val content = Console.readLine() 14 ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8"))) 15 //發送case class 不在發送字符串了,封裝一個字符串 16 // ctx.writeAndFlush(RegisterMsg("hello server")) 17 } 18 19 override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { 20 println("channelRead") 21 val byteBuf = msg.asInstanceOf[ByteBuf] 22 val bytes = new Array[Byte](byteBuf.readableBytes()) 23 byteBuf.readBytes(bytes) 24 val message = new String(bytes, "UTF-8") 25 println(message) 26 } 27 28 override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { 29 println("channeReadComplete") 30 ctx.flush() 31 } 32 //發送異常時關閉 33 override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { 34 println("exceptionCaught") 35 ctx.close() 36 } 37 38 }
NettyClient:
1 package main.nettyscala 2 3 import io.netty.bootstrap.Bootstrap 4 import io.netty.channel.ChannelInitializer 5 import io.netty.channel.nio.NioEventLoopGroup 6 import io.netty.channel.socket.SocketChannel 7 import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel} 8 import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder} 9 10 11 object NettyClient { 12 def main(args: Array[String]) { 13 val host = args(0) 14 val port = args(1).toInt 15 val client = new NettyClient 16 client.connect(host, port) 17 } 18 } 19 20 class NettyClient { 21 def connect(host: String, port: Int): Unit = { 22 //建立客戶端NIO線程組 23 val eventGroup = new NioEventLoopGroup 24 //建立客戶端輔助啓動類 25 val bootstrap = new Bootstrap 26 try { 27 //將NIO線程組傳入到Bootstrap 28 bootstrap.group(eventGroup) 29 //建立NioSocketChannel 30 .channel(classOf[NioSocketChannel]) 31 //綁定I/O事件處理類 32 .handler(new ChannelInitializer[SocketChannel] { 33 override def initChannel(ch: SocketChannel): Unit = { 34 ch.pipeline().addLast( 35 // new ObjectEncoder, 36 // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)), 37 new ClientHandler 38 ) 39 } 40 }) 41 //發起異步鏈接操做 42 val channelFuture = bootstrap.connect(host, port).sync() 43 //等待服務關閉 44 channelFuture.channel().closeFuture().sync() 45 } finally { 46 //優雅的退出,釋放線程池資源 47 eventGroup.shutdownGracefully() 48 } 49 } 50 }
NettyServer:
1 package main.nettyscala 2 3 /** 4 * Created by root on 12/8/16. 5 */ 6 import io.netty.bootstrap.ServerBootstrap 7 import io.netty.channel.ChannelInitializer 8 import io.netty.channel.nio.NioEventLoopGroup 9 import io.netty.channel.socket.SocketChannel 10 import io.netty.channel.socket.nio.NioServerSocketChannel 11 import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder} 12 13 14 object NettyServer { 15 def main(args: Array[String]) { 16 val host = args(0) 17 val port = args(1).toInt 18 val server = new NettyServer 19 server.bind(host, port) 20 } 21 } 22 class NettyServer { 23 def bind(host: String, port: Int): Unit = { 24 //配置服務端線程池組 25 //用於服務器接收客戶端鏈接 26 val bossGroup = new NioEventLoopGroup() 27 //用戶進行SocketChannel的網絡讀寫 28 val workerGroup = new NioEventLoopGroup() 29 30 try { 31 //是Netty用戶啓動NIO服務端的輔助啓動類,下降服務端的開發複雜度 32 val bootstrap = new ServerBootstrap() 33 //將兩個NIO線程組做爲參數傳入到ServerBootstrap 34 bootstrap.group(bossGroup, workerGroup) 35 //建立NioServerSocketChannel 36 .channel(classOf[NioServerSocketChannel]) 37 //綁定I/O事件處理類 38 .childHandler(new ChannelInitializer[SocketChannel] { 39 override def initChannel(ch: SocketChannel): Unit = { 40 ch.pipeline().addLast( 41 // new ObjectEncoder, 42 // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)), 43 new ServerHandler 44 ) 45 } 46 }) 47 //綁定端口,調用sync方法等待綁定操做完成 48 val channelFuture = bootstrap.bind(host, port).sync() 49 //等待服務關閉 50 channelFuture.channel().closeFuture().sync() 51 } finally { 52 //優雅的退出,釋放線程池資源 53 bossGroup.shutdownGracefully() 54 workerGroup.shutdownGracefully() 55 } 56 } 57 }
ServerHandler:
1 package main.nettyscala 2 3 /** 4 * Created by root on 12/8/16. 5 */ 6 import io.netty.buffer.{Unpooled, ByteBuf} 7 import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} 8 9 /** 10 * Created by root on 2016/11/18. 11 */ 12 class ServerHandler extends ChannelInboundHandlerAdapter { 13 /** 14 * 有客戶端創建鏈接後調用 15 */ 16 override def channelActive(ctx: ChannelHandlerContext): Unit = { 17 println("channelActive invoked") 18 } 19 20 /** 21 * 接受客戶端發送來的消息 22 */ 23 override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { 24 println("channelRead invoked") 25 val byteBuf = msg.asInstanceOf[ByteBuf] 26 val bytes = new Array[Byte](byteBuf.readableBytes()) 27 byteBuf.readBytes(bytes) 28 val message = new String(bytes, "UTF-8") 29 println(message) 30 val back = "received message: " + message 31 val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8")) 32 println(msg) 33 ctx.write(resp) 34 } 35 36 /** 37 * 將消息對列中的數據寫入到SocketChanne併發送給對方 38 */ 39 override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { 40 println("channekReadComplete invoked") 41 ctx.flush() 42 } 43 44 45 }
RegisterMsg:
1 package main.nettyscala 2 3 /** 4 * Created by root on 12/8/16. 5 */ 6 case class RegisterMsg(content: String) extends Serializable
運行結果: