什麼是TCP拆包、粘包?
java
在網絡通訊中,數據在底層都是以字節流形式在流動,那麼發送方和接受方理應有一個約定(協議),只有這樣接受方纔知道須要接受多少數據,哪些數據須要在一塊兒處理;若是沒有這個約定,就會出現本應該一塊兒處理的數據,被TCP劃分爲多個包發給接收方進行處理,以下圖:網絡
看一個TCP拆包、粘包的實例app
客戶端Handler:框架
服務端Handler:socket
運行結果:ide
上面的程序本意是CLIENT發送3次消息給SERVER,SERVER端理應處理3次,但是結果SERVER卻將3條消息一次處理了。oop
那麼如何解決TCP拆包、粘包問題呢?其實思路不外乎有3種:ui
第一種:發定長數據this
接收方拿固定長度的數據,發送方發送固定長度的數據便可。可是這樣的缺點也是顯而易見的:若是發送方的數據長度不足,須要補位,浪費空間。編碼
第二種:在包尾部增長特殊字符進行分割
發送方發送數據時,增長特殊字符;在接收方以特殊字符爲準進行分割
第三種:自定義協議
相似於HTTP協議中的HEAD信息,好比咱們也能夠在HEAD中,告訴接收方數據的元信息(數據類型、數據長度等)
Netty如何解決TCP拆包、粘包問題?
在《Java通訊實戰:編寫自定義通訊協議實現FTP服務》中,涉及到了JAVA SOCKET這方面的處理,你們能夠參考。接下來,咱們來看Netty這個框架是如何幫助咱們解決這個問題的。本篇博客的代碼在《Netty實踐(一):輕鬆入門》基礎上進行。
方式一:定長消息
Server啓動類:
Client Handler:
運行結果:
利用FixedLengthFrameDecoder,加入到管道流處理中,長度夠了接收方纔能收到。
方式二:自定義分隔符
Server啓動類:
Client Handler:
運行結果:
方式三:自定義協議
下面咱們將簡單實現一個自定義協議:
HEAD信息中包含:數據長度、數據版本
數據內容
MyHead
public class MyHead { //數據長度 private int length; //數據版本 private int version; public MyHead(int length, int version) { this.length = length; this.version = version; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public int getVersion() { return version; } public void setVersion(int version) { this.version = version; } }
MyMessage
public class MyMessage { //消息head private MyHead head; //消息body private String content; public MyMessage(MyHead head, String content) { this.head = head; this.content = content; } public MyHead getHead() { return head; } public void setHead(MyHead head) { this.head = head; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public String toString() { return String.format("[length=%d,version=%d,content=%s]",head.getLength(),head.getVersion(),content); } }
編碼器
/** * Created by Administrator on 17-1-9. * 編碼器 將自定義消息轉化成ByteBuff */ public class MyEncoder extends MessageToByteEncoder<MyMessage> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception { int length = myMessage.getHead().getLength(); int version = myMessage.getHead().getVersion(); String content = myMessage.getContent(); byteBuf.writeInt(length); byteBuf.writeInt(version); byteBuf.writeBytes(content.getBytes(Charset.forName("UTF-8"))); } }
×××
/** * Created by Administrator on 17-1-9. * ××× 將ByteBuf數據轉化成自定義消息 */ public class MyDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int length = byteBuf.readInt(); int version = byteBuf.readInt(); byte[] body = new byte[length]; byteBuf.readBytes(body); String content = new String(body, Charset.forName("UTF-8")); MyMessage myMessage = new MyMessage(new MyHead(length,version),content); list.add(myMessage); } }
Server啓動類
public class Main { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); // (2) int port = 8867; try { ServerBootstrap b = new ServerBootstrap(); // (3) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (4) .childHandler(new ChannelInitializer<SocketChannel>() { // (5) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MyEncoder()) .addLast(new MyDecoder()) .addLast(new ServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (6) .childOption(ChannelOption.SO_KEEPALIVE, true); // (7) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (8) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. System.out.println("start server...."); f.channel().closeFuture().sync(); System.out.println("stop server...."); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("exit server...."); } } }
Server Handler
public class ServerHandler extends ChannelHandlerAdapter { //每當從客戶端收到新的數據時,這個方法會在收到消息時被調用 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MyMessage in = (MyMessage) msg; try { // Do something with msg System.out.println("server get :" + in); } finally { //ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放 //or ((ByteBuf)msg).release(); ReferenceCountUtil.release(msg); } } //exceptionCaught()事件處理方法是當出現Throwable對象纔會被調用 //當Netty因爲IO錯誤或者處理器在處理事件時拋出的異常時 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
Client啓動類
public class Client { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new MyDecoder()); p.addLast(new MyEncoder()); p.addLast(new ClientHandler()); } }); // Start the client. ChannelFuture f = b.connect("127.0.0.1", 8867).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }
Client Handler
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(new MyMessage(new MyHead("abcd".getBytes("UTF-8").length,1),"abcd")); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; try { // Do something with msg System.out.println("client get :" + in.toString(CharsetUtil.UTF_8)); ctx.close(); } finally { //ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放 //or ((ByteBuf)msg).release(); ReferenceCountUtil.release(msg); } } }
運行結果
到這裏,你會發現Netty處理TCP拆包、粘包問題很簡單,經過編解碼技術支持,讓咱們編寫自定義協議也很方便,在後續的Netty博客中,我將繼續爲你們介紹Netty在實際中的一些應用(好比實現心跳檢測),See You~