《精通併發與Netty》學習筆記(14 - 解決TCP粘包拆包(二)Netty自定義協議解決粘包拆包)

1、Netty粘包和拆包解決方案

Netty提供了多個解碼器,能夠進行分包的操做,分別是: 
* LineBasedFrameDecoder (換行)
   LineBasedFrameDecoder是回車換行解碼器,若是用戶發送的消息以回車換行符做爲消息結束的標識,則能夠直接使用Netty的LineBasedFrameDecoder對消息進行解碼,只須要在初始化Netty服務端或者客戶端時將LineBasedFrameDecoder正確的添加到ChannelPipeline中便可,不須要本身從新實現一套換行解碼器。
   LineBasedFrameDecoder的工做原理是它依次遍歷ByteBuf中的可讀字節,判斷看是否有「\n」或者「\r\n」,若是有,就以此位置爲結束位置,從可讀索引到結束位置區間的字節就組成了一行。它是以換行符爲結束標誌的解碼器,支持攜帶結束符或者不攜帶結束符兩種解碼方式,同時支持配置單行的最大長度。若是連續讀取到最大長度後仍然沒有發現換行符,就會拋出異常,同時忽略掉以前讀到的異常碼流。防止因爲數據報沒有攜帶換行符致使接收到ByteBuf無限制積壓,引發系統內存溢出。java

* DelimiterBasedFrameDecoder(添加特殊分隔符報文來分包) 
   DelimiterBasedFrameDecoder是分隔符解碼器,用戶能夠指定消息結束的分隔符,它能夠自動完成以分隔符做爲碼流結束標識的消息的解碼。
   回車換行解碼器其實是一種特殊的DelimiterBasedFrameDecoder解碼器。spring

* FixedLengthFrameDecoder(使用定長的報文來分包) 
    FixedLengthFrameDecoder是固定長度解碼器,它可以按照指定的長度對消息進行自動解碼,開發者不須要考慮TCP的粘包/拆包等問題,很是實用。
    對於定長消息,若是消息實際長度小於定長,則每每會進行補位操做,它在必定程度上致使了空間和資源的浪費。可是它的優勢也是很是明顯的,編解碼比較簡單,所以在實際項目中仍然有必定的應用場景。bootstrap

* LengthFieldBasedFrameDecoder (自定義解碼器跟編碼器)服務器

   本文介紹的重點LengthFieldBasedFrameDecoder,通常包含了消息頭(head)、消息體(body):消息頭是固定的長度,通常有有如下信息 -> 是否壓縮(zip)、消息類型(type or cmdid)、消息體長度(body length);消息體長度不是固定的,其大小由消息頭記載,通常記載業務交互信息。dom

  netty對應來講就是編碼器(Encoder)跟解碼器(Decoder),通常其中會有一個基本消息類對外輸出socket

2、實例演示ide

首先編寫自定義協議類:oop

 

package com.spring.netty.handler2;

/**
 * 自定義Person協議
 */
public class PersonProtocol {
    private int length;
    private byte[] content;

    public int getLength() {
        return length;
    }
    public void setLength(int length) {
        this.length = length;
    }
    public byte[] getContent() {
        return content;
    }
    public void setContent(byte[] content) {
        this.content = content;
    }
}

 

新建服務端代碼:ui

 

package com.spring.netty.handler2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
                    childHandler(new MyServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

 

 

package com.spring.netty.handler2;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyPersonDecoder());
        pipeline.addLast(new MyPersonEncoder());

        pipeline.addLast(new MyServerHandler());
    }
}

 

package com.spring.netty.handler2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**
 * Person解碼器
 */
public class MyPersonDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyPersonDecoder decode invoked!");

        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);

        PersonProtocol personProtocol = new PersonProtocol();
        personProtocol.setLength(length);
        personProtocol.setContent(content);
        out.add(personProtocol);
    }
}

 

package com.spring.netty.handler2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 編碼器
 */
public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyPersonEncoder encode invoked!");

        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
}

 

package com.spring.netty.handler2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;
import java.util.UUID;

public class MyServerHandler extends SimpleChannelInboundHandler<PersonProtocol> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
        int length = msg.getLength();
        byte[] content = msg.getContent();
        System.out.println("服務端接收到的數據:");
        System.out.println("長度:"+length);
        System.out.println("內容:"+new String(content, Charset.forName("utf-8")));
        System.out.println("服務端接收到的消息數量:"+(++this.count));
        //服務端向客戶端返回uuid
        String responseMessage = UUID.randomUUID().toString();
        int responseLength = responseMessage.getBytes("utf-8").length;
        byte[] responseContent = responseMessage.getBytes("utf-8");

        PersonProtocol personProtocol = new PersonProtocol();
        personProtocol.setLength(length);
        personProtocol.setContent(content);
        ctx.writeAndFlush(responseContent);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

編寫客戶端程序:this

package com.spring.netty.handler2;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new MyPersonDecoder());
                            pipeline.addLast(new MyPersonEncoder());

                            pipeline.addLast(new MyClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

 

package com.spring.netty.handler2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;

public class MyClientHandler extends SimpleChannelInboundHandler<PersonProtocol> {
    private int count;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i=0;i<10;++i){
            String messageToBeSent = "sent from client";
            byte[] content = messageToBeSent.getBytes(Charset.forName("utf-8"));
            int length = messageToBeSent.getBytes(Charset.forName("utf-8")).length;

            PersonProtocol personProtocol = new PersonProtocol();
            personProtocol.setLength(length);
            personProtocol.setContent(content);

            ctx.writeAndFlush(personProtocol);
        }
    }

    //從服務器端接收數據
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
        int length = msg.getLength();
        byte[] content = msg.getContent();
        System.out.println("客戶端接收到的數據:");
        System.out.println("長度:"+length);
        System.out.println("內容:"+new String(content, Charset.forName("utf-8")));
        System.out.println("客戶端接收到的消息數量:"+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

分別啓動服務端和客戶端查看效果:

服務端效果如圖:

客戶端效果如圖:

相關文章
相關標籤/搜索