Netty實踐(二):TCP拆包、粘包問題

什麼是TCP拆包、粘包?
java

在網絡通訊中,數據在底層都是以字節流形式在流動,那麼發送方和接受方理應有一個約定(協議),只有這樣接受方纔知道須要接受多少數據,哪些數據須要在一塊兒處理;若是沒有這個約定,就會出現本應該一塊兒處理的數據,被TCP劃分爲多個包發給接收方進行處理,以下圖:網絡

wKiom1hyNFiS8DVlAAAzdREUGHM079.png




看一個TCP拆包、粘包的實例app

客戶端Handler:框架

wKioL1hyOHHCXVUFAABCJ7hliZE421.png


服務端Handler:socket

wKiom1hyOLjzpcpNAABcEHVaL9U858.png

運行結果:ide

wKioL1hyOO2hu7BeAAAakzcyQBA400.png

上面的程序本意是CLIENT發送3次消息給SERVER,SERVER端理應處理3次,但是結果SERVER卻將3條消息一次處理了。oop


那麼如何解決TCP拆包、粘包問題呢?其實思路不外乎有3種:ui

第一種:發定長數據this

接收方拿固定長度的數據,發送方發送固定長度的數據便可。可是這樣的缺點也是顯而易見的:若是發送方的數據長度不足,須要補位,浪費空間。編碼

第二種:在包尾部增長特殊字符進行分割

發送方發送數據時,增長特殊字符;在接收方以特殊字符爲準進行分割

第三種:自定義協議

相似於HTTP協議中的HEAD信息,好比咱們也能夠在HEAD中,告訴接收方數據的元信息(數據類型、數據長度等)



Netty如何解決TCP拆包、粘包問題?

《Java通訊實戰:編寫自定義通訊協議實現FTP服務》中,涉及到了JAVA SOCKET這方面的處理,你們能夠參考。接下來,咱們來看Netty這個框架是如何幫助咱們解決這個問題的。本篇博客的代碼在《Netty實踐(一):輕鬆入門》基礎上進行。


方式一:定長消息

Server啓動類:

wKiom1hyOyuijnRKAAA7dsHZoPw259.png


Client Handler:

wKioL1hyO1yxWMODAAA5nDq5z_Y711.png


運行結果:

wKiom1hyPnfygbyyAAAg2qklApM237.pngwKiom1hyPoGxkhVxAAAR_4YedkE745.png

利用FixedLengthFrameDecoder,加入到管道流處理中,長度夠了接收方纔能收到。



方式二:自定義分隔符

Server啓動類:

wKiom1hzioeA7jlPAABKa74vH_8193.png


Client Handler:

wKioL1hzitzSTj-EAABBINFILFU201.png


運行結果:

wKioL1hziw_yhakhAAAdzh0MBAk277.png

wKioL1hzizbSHyUFAAAbdBWJdq8033.png


方式三:自定義協議

下面咱們將簡單實現一個自定義協議:

HEAD信息中包含:數據長度、數據版本

數據內容


MyHead

public class MyHead {

    //數據長度
    private int length;

    //數據版本
    private int version;


    public MyHead(int length, int version) {
        this.length = length;
        this.version = version;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public int getVersion() {
        return version;
    }

    public void setVersion(int version) {
        this.version = version;
    }


}


MyMessage

public class MyMessage {
    //消息head
    private MyHead head;
    //消息body
    private String content;

    public MyMessage(MyHead head, String content) {
        this.head = head;
        this.content = content;
    }

    public MyHead getHead() {
        return head;
    }

    public void setHead(MyHead head) {
        this.head = head;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return String.format("[length=%d,version=%d,content=%s]",head.getLength(),head.getVersion(),content);
    }
}


編碼器

/**
 * Created by Administrator on 17-1-9.
 * 編碼器 將自定義消息轉化成ByteBuff
 */
public class MyEncoder extends MessageToByteEncoder<MyMessage> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception {

        int length = myMessage.getHead().getLength();
        int version = myMessage.getHead().getVersion();
        String content = myMessage.getContent();

        byteBuf.writeInt(length);
        byteBuf.writeInt(version);
        byteBuf.writeBytes(content.getBytes(Charset.forName("UTF-8")));

    }
}


×××

/**
 * Created by Administrator on 17-1-9.
 * ×××  將ByteBuf數據轉化成自定義消息
 */
public class MyDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

        int length = byteBuf.readInt();
        int version = byteBuf.readInt();

        byte[] body = new byte[length];
        byteBuf.readBytes(body);

        String content = new String(body, Charset.forName("UTF-8"));

        MyMessage myMessage = new MyMessage(new MyHead(length,version),content);

        list.add(myMessage);
    }
}


Server啓動類

public class Main {

    public static void main(String[] args) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // (2)
        int port = 8867;
        try {
            ServerBootstrap b = new ServerBootstrap(); // (3)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (4)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (5)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {

                    ch.pipeline().addLast(new MyEncoder())
                    .addLast(new MyDecoder())
                    .addLast(new ServerHandler());
                }
            })
            .option(ChannelOption.SO_BACKLOG, 128)          // (6)
            .childOption(ChannelOption.SO_KEEPALIVE, true); // (7)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (8)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            System.out.println("start server....");
            f.channel().closeFuture().sync();
            System.out.println("stop server....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("exit server....");
        }

    }
}


Server Handler

public class ServerHandler  extends ChannelHandlerAdapter {

    //每當從客戶端收到新的數據時,這個方法會在收到消息時被調用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        MyMessage in = (MyMessage) msg;
        try {
            // Do something with msg
            System.out.println("server get :" + in);

        } finally {
            //ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放
            //or ((ByteBuf)msg).release();
            ReferenceCountUtil.release(msg);
        }

    }

    //exceptionCaught()事件處理方法是當出現Throwable對象纔會被調用
    //當Netty因爲IO錯誤或者處理器在處理事件時拋出的異常時
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();

    }

}


Client啓動類

public class Client {

    public static void main(String[] args) {

        EventLoopGroup group = new NioEventLoopGroup();

        try {
              Bootstrap b = new Bootstrap();
              b.group(group)
               .channel(NioSocketChannel.class)
               .handler(new ChannelInitializer<SocketChannel>() {
                   @Override
                   public void initChannel(SocketChannel ch) throws Exception {
                       ChannelPipeline p = ch.pipeline();
                       p.addLast(new MyDecoder());
                       p.addLast(new MyEncoder());
                       p.addLast(new ClientHandler());
                   }
               });

              // Start the client.
              ChannelFuture f = b.connect("127.0.0.1", 8867).sync();

              // Wait until the connection is closed.
              f.channel().closeFuture().sync();

          } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
              // Shut down the event loop to terminate all threads.
              group.shutdownGracefully();
          }

    }

}


Client Handler

public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        ctx.writeAndFlush(new MyMessage(new MyHead("abcd".getBytes("UTF-8").length,1),"abcd"));

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf in = (ByteBuf) msg;
        try {
            // Do something with msg
            System.out.println("client get :" + in.toString(CharsetUtil.UTF_8));

            ctx.close();
        } finally {
            //ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放
            //or ((ByteBuf)msg).release();
            ReferenceCountUtil.release(msg);
        }
    }
}


運行結果

wKioL1hzlXeC1dvnAAAa_zDNVxs631.png


到這裏,你會發現Netty處理TCP拆包、粘包問題很簡單,經過編解碼技術支持,讓咱們編寫自定義協議也很方便,在後續的Netty博客中,我將繼續爲你們介紹Netty在實際中的一些應用(好比實現心跳檢測),See You~

相關文章
相關標籤/搜索