粘包和拆包是TCP網絡編程中不可避免的,不管是服務端仍是客戶端,當咱們讀取或者發送消息的時候,都須要考慮TCP底層的粘包/拆包機制。編程
TCP是個「流」協議,所謂流,就是沒有界限的一串數據。TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的實際狀況進行包的劃分,因此在業務上認爲,一個完整的包可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。markdown
如圖所示,假設客戶端分別發送了兩個數據包D1和D2給服務端,因爲服務端一次讀取到的字節數是不肯定的,故可能存在如下4種狀況。網絡
若是此時服務端TCP接收滑窗很是小,而數據包D1和D2比較大,頗有可能會發生第五種可能,即服務端分屢次才能將D1和D2包接收徹底,期間發生屢次拆包。app
數據從發送方到接收方須要通過操做系統的緩衝區,而形成粘包和拆包的主要緣由就在這個緩衝區上。粘包能夠理解爲緩衝區數據堆積,致使多個請求數據粘在一塊兒,而拆包能夠理解爲發送的數據大於緩衝區,進行拆分處理。tcp
詳細來講,形成粘包和拆包的緣由主要有如下三個:ide
因爲底層的TCP沒法理解上層的業務數據,因此在底層是沒法保證數據包不被拆分和重組的,這個問題只能經過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,能夠概括以下。oop
針對上一小節描述的粘包和拆包的解決方案,對於拆包問題比較簡單,用戶能夠本身定義本身的編碼器進行處理,Netty並無提供相應的組件。對於粘包的問題,因爲拆包比較複雜,代碼比較處理比較繁瑣,Netty提供了4種解碼器來解決,分別以下:ui
以上解碼器在使用時只須要添加到Netty的責任鏈中便可,大多數狀況下這4種解碼器均可以知足了,固然除了以上4種解碼器,用戶也能夠自定義本身的解碼器進行處理。具體能夠參考如下代碼示例:編碼
// Server主程序 public class XNettyServer { public static void main(String[] args) throws Exception { // accept 處理鏈接的線程池 NioEventLoopGroup acceptGroup = new NioEventLoopGroup(); // read io 處理數據的線程池 NioEventLoopGroup readGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(acceptGroup, readGroup) .channel(NioServerSocketChannel.class) .childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 增長解碼器 pipeline.addLast(new XDecoder()); // 打印出內容 handdler pipeline.addLast(new XHandler()); } }); System.out.println("啓動成功,端口 7777"); serverBootstrap.bind(7777).sync().channel().closeFuture().sync(); } finally { acceptGroup.shutdownGracefully(); readGroup.shutdownGracefully(); } } } // 解碼器 public class XDecoder extends ByteToMessageDecoder { static final int PACKET_SIZE = 220; // 用來臨時保留沒有處理過的請求報文 ByteBuf tempMsg = Unpooled.buffer(); /** * @param ctx * @param in 請求的數據 * @param out 將粘在一塊兒的報文拆分後的結果保留起來 * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println(Thread.currentThread() + "收到了一次數據包,長度是:" + in.readableBytes()); // 合併報文 ByteBuf message = null; int tmpMsgSize = tempMsg.readableBytes(); // 若是暫存有上一次餘下的請求報文,則合併 if (tmpMsgSize > 0) { message = Unpooled.buffer(); message.writeBytes(tempMsg); message.writeBytes(in); System.out.println("合併:上一數據包餘下的長度爲:" + tmpMsgSize + ",合併後長度爲:" + message.readableBytes()); } else { message = in; } int size = message.readableBytes(); int counter = size / PACKET_SIZE; for (int i = 0; i < counter; i++) { byte[] request = new byte[PACKET_SIZE]; // 每次從總的消息中讀取220個字節的數據 message.readBytes(request); // 將拆分後的結果放入out列表中,交由後面的業務邏輯去處理 out.add(Unpooled.copiedBuffer(request)); } // 多餘的報文存起來 // 第一個報文: i+ 暫存 // 第二個報文: 1 與第一次 size = message.readableBytes(); if (size != 0) { System.out.println("多餘的數據長度:" + size); // 剩下來的數據放到tempMsg暫存 tempMsg.clear(); tempMsg.writeBytes(message.readBytes(size)); } } } // 處理器 public class XHandler extends ChannelInboundHandlerAdapter { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] content = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(content); System.out.println(Thread.currentThread() + ": 最終打印" + new String(content)); ((ByteBuf) msg).release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }