Netty解決粘包半包問題

一.什麼是TCP粘包半包

客戶端發送數據包給服務端,因服務端一次讀取到的字節數是不肯定的,有好幾種狀況.java

  1. 服務端分兩次讀取到了兩個獨立的數據包,沒有粘包和拆包;
  2. 服務端一次接收到了兩個數據包,粘合在一塊兒,被稱爲 TCP 粘包;
  3. 服務端分兩次讀取到了兩個數據包, 第一次讀取到了完整的包和另一個包的部份內容,第二次讀取到了另外一個包的剩餘內容, 這被稱爲 TCP 拆包;
  4. 服務端分兩次讀取到了兩個數據包, 第一次讀取到了包的部份內容 , 第二次讀取到了以前未讀完的包剩餘內容和另外一個包,發生了拆包和粘包。
  5. 服務端 TCP 接收滑動窗口很小,數據包比較大, 即服務端分屢次才能將 包接收徹底,發生屢次拆包

二.粘包半包的緣由

1.粘包

TCP協議:自己是 面向鏈接的可靠地協議-三次握手機制。算法

客戶端與服務器會維持一個鏈接(Channel) ,在鏈接不斷開的狀況下, 能夠將多個數據包發往服務器,可是發送的網絡數據包過小, 那麼自己會啓用 Nagle 算法(可配置是否啓用) 對較小的數據包進行合併(所以,TCP 的網絡延遲要 UDP 的高些)而後再發送(超時或者包大小足夠)。bootstrap

服務器在接收到消息(數據流)的時候就沒法區分哪些數據包是客戶端本身分開發送的,這樣產生了粘包;
服務器在接收到數據後,放到緩衝區中,若是消息沒有被及時從緩存區取走,下次在取數據的時候可能就會出現一次取出多個
數據包的狀況,形成粘包現象。緩存


UDP: 自己做爲無鏈接的不可靠的傳輸協議(適合頻繁發送較小的數據包) , 他不會對數據包進行合併發送,直接是一端發送什麼數據, 直接就發出去了, 既然他不會對數據合併, 每個數據包都是完整的(數據+UDP 頭+IP 頭等等發一 次數據封裝一次) 也就沒有粘包了。服務器


2.半包

分包產生的緣由:多是IP分片傳輸致使的, 也多是傳輸過程當中丟失部 分包致使出現的半包, 還有可能就是一個包可能被分紅了兩次傳輸, 在取數據的時候,先取到了一部分(還可能與接收的緩衝區大小有關係) , 總之就是一個數據包被分紅了屢次接收。
更具體的緣由有三個, 分別以下。網絡

  1. 應用程序寫入數據的字節大小大於套接字發送緩衝區的大小
  2. 進行 MSS 大小的 TCP 分段。 MSS 是最大報文段長度的縮寫。 MSS 是 TCP 報文段中的數據字段的最大長度。 數據字段加上 TCP 首部纔等於整個的 TCP 報文段。 因此 MSS 並非

TCP 報文段的最大長度, 而是: MSS=TCP 報文段長度-TCP 首部長度併發

  1. 以太網的 payload 大於 MTU 進行 IP 分片。 MTU 指: 一種通訊協議的某一層上面所能

經過的最大數據包大小。 若是 IP 層有一個數據包要傳, 並且數據的長度比鏈路層的 MTU 大,
那麼 IP 層就會進行分片, 把數據包分紅託乾片, 讓每一片都不超過 MTU。 注意, IP 分片可
以發生在原始發送端主機上, 也能夠發生在中間路由器上。框架


3.解決粘包半包問題異步

因爲底層的 TCP 沒法理解上層的業務數據, 因此在底層是沒法保證數據包不被拆分和重組的, 這個問題只能經過上層的應用協議棧設計來解決。業界的主流協議的解決方案,能夠概括以下。
(1) 在包尾增長分割符, 好比回車換行符進行分割, 例如 FTP 協議;
(2) 消息定長, 例如每一個報文的大小爲固定長度 200 字節, 若是不夠, 空位補空格;
(3) 將消息分爲消息頭和消息體, 消息頭中包含表示消息總長度(或者消息體長度)的字段, 一般設計思路爲消息頭的第一個字段使用 int32 來表示消息的總長度,LengthFieldBasedFrameDecoder。socket

下面列舉一個包尾增長分隔符的例子:

服務端程序:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 入站處理器
 */
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {

    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicInteger completeCounter = new AtomicInteger(0);

    /*** 服務端讀取到網絡數據後的處理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        String resp = "Hello,"+request+". Welcome to Netty World!"
                + DelimiterEchoServer.DELIMITER_SYMBOL;
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
    }

    /*** 服務端讀取完成網絡數據後的處理*/
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        ctx.fireChannelReadComplete();
        System.out.println("the ReadComplete count is "
                +completeCounter.incrementAndGet());
    }

    /*** 發生異常後的處理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import java.net.InetSocketAddress;

/**
 * 服務端
 */
public class DelimiterEchoServer {

    public static final String DELIMITER_SYMBOL = "@~";
    public static final int PORT = 9997;

    public static void main(String[] args) throws InterruptedException {
        DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
        System.out.println("服務器即將啓動");
        delimiterEchoServer.start();
    }

    public void start() throws InterruptedException {
        final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
        try {
            ServerBootstrap b = new ServerBootstrap();/*服務端啓動必須*/
            b.group(group)/*將線程組傳入*/
                .channel(NioServerSocketChannel.class)/*指定使用NIO進行網絡傳輸*/
                .localAddress(new InetSocketAddress(PORT))/*指定服務器監聽端口*/
                /*服務端每接收到一個鏈接請求,就會新啓一個socket通訊,也就是channel,
                因此下面這段代碼的做用就是爲這個子channel增長handle*/
                .childHandler(new ChannelInitializerImp());
            ChannelFuture f = b.bind().sync();/*異步綁定到服務器,sync()會阻塞直到完成*/
            System.out.println("服務器啓動完成,等待客戶端的鏈接和數據.....");
            f.channel().closeFuture().sync();/*阻塞直到服務器的channel關閉*/
        } finally {
            group.shutdownGracefully().sync();/*優雅關閉線程組*/
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
                    .getBytes());
            ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
                    delimiter));
            ch.pipeline().addLast(new DelimiterServerHandler());
        }
    }

}

客戶端程序

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;


/**
 * 入站處理器
 */
public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 客戶端讀取到網絡數據後的處理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                +"] and the counter is:"+counter.incrementAndGet());
    }

    /*** 客戶端被通知channel活躍後,作事*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "Mark,Lison,Peter,James,Deer"
                +  DelimiterEchoServer.DELIMITER_SYMBOL;
        for(int i=0;i<10;i++){
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    /*** 發生異常後的處理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

/**
 * 客戶端
 */
public class DelimiterEchoClient {

    private final String host;

    public DelimiterEchoClient(String host) {
        this.host = host;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
        try {
            final Bootstrap b = new Bootstrap();;/*客戶端啓動必須*/
            b.group(group)/*將線程組傳入*/
                    .channel(NioSocketChannel.class)/*指定使用NIO進行網絡傳輸*/
                    .remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要鏈接服務器的ip地址和端口*/
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            System.out.println("已鏈接到服務器.....");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ByteBuf delimiter
                    = Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL
                    .getBytes());
            ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
                    delimiter));
            ch.pipeline().addLast(new DelimiterClientHandler());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new DelimiterEchoClient("127.0.0.1").start();
    }
}

關鍵代碼:
1.創建鏈接後,客戶端給服務端發數據包,每次發送已特殊字符`
@~結尾。
image.png

2.服務端收到數據包後通過DelimiterBasedFrameDecoder即分隔符基礎框架解碼器解碼爲一個個帶有分隔符的數據包。
image.png

3.再到服務端的業務層處理器DelimiterServerHandler
image.png

相關文章
相關標籤/搜索