【轉】Netty之解決TCP粘包拆包(自定義協議)

一、什麼是粘包/拆包html

       通常所謂的TCP粘包是在一次接收數據不能徹底地體現一個完整的消息數據。TCP通信爲什麼存在粘包呢?主要緣由是TCP是以流的方式來處理數據,再加上網絡上MTU的每每小於在應用處理的消息數據,因此就會引起一次接收的數據沒法知足消息的須要,致使粘包的存在。處理粘包的惟一方法就是制定應用層的數據通信協議,經過協議來規範現有接收的數據是否知足消息數據的須要。java

二、解決辦法編程

     2.一、消息定長,報文大小固定長度,不夠空格補全,發送和接收方遵循相同的約定,這樣即便粘包了經過接收方編程實現獲取定長報文也能區分。bootstrap

     2.二、包尾添加特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字符做爲報文分隔符,接收方經過特殊分隔符切分報文區分。數組

     2.三、將消息分爲消息頭和消息體,消息頭中包含表示信息的總長度(或者消息體長度)的字段服務器

三、自定義協議,來實現TCP的粘包/拆包問題網絡

      3.0  自定義協議,開始標記           異步

              

      3.1  自定義協議的介紹socket

             

      3.2  自定義協議的類的封裝tcp

             

      3.3  自定義協議的編碼器

             

      3.4  自定義協議的解碼器

          

四、協議相關的實現

      4.1  協議的封裝

 

[java]  view plain  copy
 
 print?
  1. import java.util.Arrays;  
  2.   
  3. /** 
  4.  * <pre> 
  5.  * 本身定義的協議 
  6.  *  數據包格式 
  7.  * +——----——+——-----——+——----——+ 
  8.  * |協議開始標誌|  長度             |   數據       | 
  9.  * +——----——+——-----——+——----——+ 
  10.  * 1.協議開始標誌head_data,爲int類型的數據,16進製表示爲0X76 
  11.  * 2.傳輸數據的長度contentLength,int類型 
  12.  * 3.要傳輸的數據 
  13.  * </pre> 
  14.  */  
  15. public class SmartCarProtocol {  
  16.     /** 
  17.      * 消息的開頭的信息標誌 
  18.      */  
  19.     private int head_data = ConstantValue.HEAD_DATA;  
  20.     /** 
  21.      * 消息的長度 
  22.      */  
  23.     private int contentLength;  
  24.     /** 
  25.      * 消息的內容 
  26.      */  
  27.     private byte[] content;  
  28.   
  29.     /** 
  30.      * 用於初始化,SmartCarProtocol 
  31.      *  
  32.      * @param contentLength 
  33.      *            協議裏面,消息數據的長度 
  34.      * @param content 
  35.      *            協議裏面,消息的數據 
  36.      */  
  37.     public SmartCarProtocol(int contentLength, byte[] content) {  
  38.         this.contentLength = contentLength;  
  39.         this.content = content;  
  40.     }  
  41.   
  42.     public int getHead_data() {  
  43.         return head_data;  
  44.     }  
  45.   
  46.     public int getContentLength() {  
  47.         return contentLength;  
  48.     }  
  49.   
  50.     public void setContentLength(int contentLength) {  
  51.         this.contentLength = contentLength;  
  52.     }  
  53.   
  54.     public byte[] getContent() {  
  55.         return content;  
  56.     }  
  57.   
  58.     public void setContent(byte[] content) {  
  59.         this.content = content;  
  60.     }  
  61.   
  62.     @Override  
  63.     public String toString() {  
  64.         return "SmartCarProtocol [head_data=" + head_data + ", contentLength="  
  65.                 + contentLength + ", content=" + Arrays.toString(content) + "]";  
  66.     }  
  67.   
  68. }  

 

      4.2  協議的編碼器

 

[java]  view plain  copy
 
 print?
  1. /** 
  2.  * <pre> 
  3.  * 本身定義的協議 
  4.  *  數據包格式 
  5.  * +——----——+——-----——+——----——+ 
  6.  * |協議開始標誌|  長度             |   數據       | 
  7.  * +——----——+——-----——+——----——+ 
  8.  * 1.協議開始標誌head_data,爲int類型的數據,16進製表示爲0X76 
  9.  * 2.傳輸數據的長度contentLength,int類型 
  10.  * 3.要傳輸的數據 
  11.  * </pre> 
  12.  */  
  13. public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> {  
  14.   
  15.     @Override  
  16.     protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg,  
  17.             ByteBuf out) throws Exception {  
  18.         // 寫入消息SmartCar的具體內容  
  19.         // 1.寫入消息的開頭的信息標誌(int類型)  
  20.         out.writeInt(msg.getHead_data());  
  21.         // 2.寫入消息的長度(int 類型)  
  22.         out.writeInt(msg.getContentLength());  
  23.         // 3.寫入消息的內容(byte[]類型)  
  24.         out.writeBytes(msg.getContent());  
  25.     }  
  26. }  

 

      4.3  協議的解碼器

 

[java]  view plain  copy
 
 print?
  1. import java.util.List;  
  2. import io.netty.buffer.ByteBuf;  
  3. import io.netty.channel.ChannelHandlerContext;  
  4. import io.netty.handler.codec.ByteToMessageDecoder;  
  5.   
  6. /** 
  7.  * <pre> 
  8.  * 本身定義的協議 
  9.  *  數據包格式 
  10.  * +——----——+——-----——+——----——+ 
  11.  * |協議開始標誌|  長度             |   數據       | 
  12.  * +——----——+——-----——+——----——+ 
  13.  * 1.協議開始標誌head_data,爲int類型的數據,16進製表示爲0X76 
  14.  * 2.傳輸數據的長度contentLength,int類型 
  15.  * 3.要傳輸的數據,長度不該該超過2048,防止socket流的攻擊 
  16.  * </pre> 
  17.  */  
  18. public class SmartCarDecoder extends ByteToMessageDecoder {  
  19.   
  20.     /** 
  21.      * <pre> 
  22.      * 協議開始的標準head_data,int類型,佔據4個字節. 
  23.      * 表示數據的長度contentLength,int類型,佔據4個字節. 
  24.      * </pre> 
  25.      */  
  26.     public final int BASE_LENGTH = 4 + 4;  
  27.   
  28.     @Override  
  29.     protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,  
  30.             List<Object> out) throws Exception {  
  31.         // 可讀長度必須大於基本長度  
  32.         if (buffer.readableBytes() >= BASE_LENGTH) {  
  33.             // 防止socket字節流攻擊  
  34.             // 防止,客戶端傳來的數據過大  
  35.             // 由於,太大的數據,是不合理的  
  36.             if (buffer.readableBytes() > 2048) {  
  37.                 buffer.skipBytes(buffer.readableBytes());  
  38.             }  
  39.   
  40.             // 記錄包頭開始的index  
  41.             int beginReader;  
  42.   
  43.             while (true) {  
  44.                 // 獲取包頭開始的index  
  45.                 beginReader = buffer.readerIndex();  
  46.                 // 標記包頭開始的index  
  47.                 buffer.markReaderIndex();  
  48.                 // 讀到了協議的開始標誌,結束while循環  
  49.                 if (buffer.readInt() == ConstantValue.HEAD_DATA) {  
  50.                     break;  
  51.                 }  
  52.   
  53.                 // 未讀到包頭,略過一個字節  
  54.                 // 每次略過,一個字節,去讀取,包頭信息的開始標記  
  55.                 buffer.resetReaderIndex();  
  56.                 buffer.readByte();  
  57.   
  58.                 // 當略過,一個字節以後,  
  59.                 // 數據包的長度,又變得不知足  
  60.                 // 此時,應該結束。等待後面的數據到達  
  61.                 if (buffer.readableBytes() < BASE_LENGTH) {  
  62.                     return;  
  63.                 }  
  64.             }  
  65.   
  66.             // 消息的長度  
  67.   
  68.             int length = buffer.readInt();  
  69.             // 判斷請求數據包數據是否到齊  
  70.             if (buffer.readableBytes() < length) {  
  71.                 // 還原讀指針  
  72.                 buffer.readerIndex(beginReader);  
  73.                 return;  
  74.             }  
  75.   
  76.             // 讀取data數據  
  77.             byte[] data = new byte[length];  
  78.             buffer.readBytes(data);  
  79.   
  80.             SmartCarProtocol protocol = new SmartCarProtocol(data.length, data);  
  81.             out.add(protocol);  
  82.         }  
  83.     }  
  84.   
  85. }  

 

      4.4  服務端加入協議的編/解碼器

            

      4.5  客戶端加入協議的編/解碼器

          

五、服務端的實現

 

[java]  view plain  copy
 
 print?
  1. import io.netty.bootstrap.ServerBootstrap;  
  2. import io.netty.channel.ChannelFuture;  
  3. import io.netty.channel.ChannelInitializer;  
  4. import io.netty.channel.ChannelOption;  
  5. import io.netty.channel.EventLoopGroup;  
  6. import io.netty.channel.nio.NioEventLoopGroup;  
  7. import io.netty.channel.socket.SocketChannel;  
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;  
  9. import io.netty.handler.logging.LogLevel;  
  10. import io.netty.handler.logging.LoggingHandler;  
  11.   
  12. public class Server {  
  13.   
  14.     public Server() {  
  15.     }  
  16.   
  17.     public void bind(int port) throws Exception {  
  18.         // 配置NIO線程組  
  19.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
  20.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  21.         try {  
  22.             // 服務器輔助啓動類配置  
  23.             ServerBootstrap b = new ServerBootstrap();  
  24.             b.group(bossGroup, workerGroup)  
  25.                     .channel(NioServerSocketChannel.class)  
  26.                     .handler(new LoggingHandler(LogLevel.INFO))  
  27.                     .childHandler(new ChildChannelHandler())//  
  28.                     .option(ChannelOption.SO_BACKLOG, 1024) // 設置tcp緩衝區 // (5)  
  29.                     .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)  
  30.             // 綁定端口 同步等待綁定成功  
  31.             ChannelFuture f = b.bind(port).sync(); // (7)  
  32.             // 等到服務端監聽端口關閉  
  33.             f.channel().closeFuture().sync();  
  34.         } finally {  
  35.             // 優雅釋放線程資源  
  36.             workerGroup.shutdownGracefully();  
  37.             bossGroup.shutdownGracefully();  
  38.         }  
  39.     }  
  40.   
  41.     /** 
  42.      * 網絡事件處理器 
  43.      */  
  44.     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {  
  45.         @Override  
  46.         protected void initChannel(SocketChannel ch) throws Exception {  
  47.             // 添加自定義協議的編解碼工具  
  48.             ch.pipeline().addLast(new SmartCarEncoder());  
  49.             ch.pipeline().addLast(new SmartCarDecoder());  
  50.             // 處理網絡IO  
  51.             ch.pipeline().addLast(new ServerHandler());  
  52.         }  
  53.     }  
  54.   
  55.     public static void main(String[] args) throws Exception {  
  56.         new Server().bind(9999);  
  57.     }  
  58. }  

六、服務端Handler的實現

 

 

[java]  view plain  copy
 
 print?
  1. import io.netty.channel.ChannelHandlerAdapter;  
  2. import io.netty.channel.ChannelHandlerContext;  
  3.   
  4. public class ServerHandler extends ChannelHandlerAdapter {  
  5.     // 用於獲取客戶端發送的信息  
  6.     @Override  
  7.     public void channelRead(ChannelHandlerContext ctx, Object msg)  
  8.             throws Exception {  
  9.         // 用於獲取客戶端發來的數據信息  
  10.         SmartCarProtocol body = (SmartCarProtocol) msg;  
  11.         System.out.println("Server接受的客戶端的信息 :" + body.toString());  
  12.   
  13.         // 會寫數據給客戶端  
  14.         String str = "Hi I am Server ...";  
  15.         SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length,  
  16.                 str.getBytes());  
  17.         // 當服務端完成寫操做後,關閉與客戶端的鏈接  
  18.         ctx.writeAndFlush(response);  
  19.         // .addListener(ChannelFutureListener.CLOSE);  
  20.   
  21.         // 當有寫操做時,不須要手動釋放msg的引用  
  22.         // 當只有讀操做時,才須要手動釋放msg的引用  
  23.     }  
  24.   
  25.     @Override  
  26.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
  27.             throws Exception {  
  28.         // cause.printStackTrace();  
  29.         ctx.close();  
  30.     }  
  31. }  

七、客戶端的實現

 

 

[java]  view plain  copy
 
 print?
  1. import io.netty.bootstrap.Bootstrap;  
  2. import io.netty.channel.ChannelFuture;  
  3. import io.netty.channel.ChannelInitializer;  
  4. import io.netty.channel.ChannelOption;  
  5. import io.netty.channel.EventLoopGroup;  
  6. import io.netty.channel.nio.NioEventLoopGroup;  
  7. import io.netty.channel.socket.SocketChannel;  
  8. import io.netty.channel.socket.nio.NioSocketChannel;  
  9.   
  10. public class Client {  
  11.   
  12.     /** 
  13.      * 鏈接服務器 
  14.      *  
  15.      * @param port 
  16.      * @param host 
  17.      * @throws Exception 
  18.      */  
  19.     public void connect(int port, String host) throws Exception {  
  20.         // 配置客戶端NIO線程組  
  21.         EventLoopGroup group = new NioEventLoopGroup();  
  22.         try {  
  23.             // 客戶端輔助啓動類 對客戶端配置  
  24.             Bootstrap b = new Bootstrap();  
  25.             b.group(group)//  
  26.                     .channel(NioSocketChannel.class)//  
  27.                     .option(ChannelOption.TCP_NODELAY, true)//  
  28.                     .handler(new MyChannelHandler());//  
  29.             // 異步連接服務器 同步等待連接成功  
  30.             ChannelFuture f = b.connect(host, port).sync();  
  31.   
  32.             // 等待連接關閉  
  33.             f.channel().closeFuture().sync();  
  34.   
  35.         } finally {  
  36.             group.shutdownGracefully();  
  37.             System.out.println("客戶端優雅的釋放了線程資源...");  
  38.         }  
  39.   
  40.     }  
  41.   
  42.     /** 
  43.      * 網絡事件處理器 
  44.      */  
  45.     private class MyChannelHandler extends ChannelInitializer<SocketChannel> {  
  46.         @Override  
  47.         protected void initChannel(SocketChannel ch) throws Exception {  
  48.             // 添加自定義協議的編解碼工具  
  49.             ch.pipeline().addLast(new SmartCarEncoder());  
  50.             ch.pipeline().addLast(new SmartCarDecoder());  
  51.             // 處理網絡IO  
  52.             ch.pipeline().addLast(new ClientHandler());  
  53.         }  
  54.   
  55.     }  
  56.   
  57.     public static void main(String[] args) throws Exception {  
  58.         new Client().connect(9999, "127.0.0.1");  
  59.   
  60.     }  
  61.   
  62. }  

八、客戶端Handler的實現

 

 

[java]  view plain  copy
 
 print?
  1. import io.netty.channel.ChannelHandlerAdapter;  
  2. import io.netty.channel.ChannelHandlerContext;  
  3. import io.netty.util.ReferenceCountUtil;  
  4.   
  5. //用於讀取客戶端發來的信息  
  6. public class ClientHandler extends ChannelHandlerAdapter {  
  7.   
  8.     // 客戶端與服務端,鏈接成功的售後  
  9.     @Override  
  10.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
  11.         // 發送SmartCar協議的消息  
  12.         // 要發送的信息  
  13.         String data = "I am client ...";  
  14.         // 得到要發送信息的字節數組  
  15.         byte[] content = data.getBytes();  
  16.         // 要發送信息的長度  
  17.         int contentLength = content.length;  
  18.   
  19.         SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content);  
  20.   
  21.         ctx.writeAndFlush(protocol);  
  22.     }  
  23.   
  24.     // 只是讀數據,沒有寫數據的話  
  25.     // 須要本身手動的釋放的消息  
  26.     @Override  
  27.     public void channelRead(ChannelHandlerContext ctx, Object msg)  
  28.             throws Exception {  
  29.         try {  
  30.             // 用於獲取客戶端發來的數據信息  
  31.             SmartCarProtocol body = (SmartCarProtocol) msg;  
  32.             System.out.println("Client接受的客戶端的信息 :" + body.toString());  
  33.   
  34.         } finally {  
  35.             ReferenceCountUtil.release(msg);  
  36.         }  
  37.     }  
  38.   
  39.     @Override  
  40.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
  41.             throws Exception {  
  42.         ctx.close();  
  43.     }  
  44.   
  45. }  

九、參考的博客地址

 

 

[java]  view plain  copy
 
 print?
  1. http://www.cnblogs.com/whthomas/p/netty-custom-protocol.html  
  2. http://www.cnblogs.com/fanguangdexiaoyuer/p/6131042.html  
相關文章
相關標籤/搜索