Netty源碼分析 ----- 拆包器之LengthFieldBasedFrameDecoder

 

 

正文java

本篇文章主要是介紹使用LengthFieldBasedFrameDecoder解碼器自定義協議。一般,協議的格式以下:api

LengthFieldBasedFrameDecoder是netty解決拆包粘包問題的一個重要的類,主要結構就是header+body結構。咱們只須要傳入正確的參數就能夠發送和接收正確的數據,那麼重點就在於這幾個參數的意義。下面咱們就具體瞭解一下這幾個參數的意義。先來看一下LengthFieldBasedFrameDecoder主要的構造方法:服務器

public LengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip)

那麼這幾個重要的參數以下:less

  • maxFrameLength:最大幀長度。也就是能夠接收的數據的最大長度。若是超過,這次數據會被丟棄。
  • lengthFieldOffset:長度域偏移。就是說數據開始的幾個字節可能不是表示數據長度,須要後移幾個字節纔是長度域。
  • lengthFieldLength:長度域字節數。用幾個字節來表示數據長度。
  • lengthAdjustment:數據長度修正。由於長度域指定的長度可使header+body的整個長度,也能夠只是body的長度。若是表示header+body的整個長度,那麼咱們須要修正數據長度。
  • initialBytesToStrip:跳過的字節數。若是你須要接收header+body的全部數據,此值就是0,若是你只想接收body數據,那麼須要跳過header所佔用的字節數。

下面咱們根據幾個例子的使用來具體說明這幾個參數的使用。ide

LengthFieldBasedFrameDecoder 的用法

需求1

長度域爲2個字節,咱們要求發送和接收的數據以下所示:函數

複製代碼
發送的數據 (14 bytes)          接收到數據 (14 bytes)
+--------+----------------+      +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
|  12    | "HELLO, WORLD" |      |   12   | "HELLO, WORLD" |
+--------+----------------+      +--------+----------------+
複製代碼

留心的你確定發現了,長度域只是實際內容的長度,不包括長度域的長度。下面是參數的值:oop

  • lengthFieldOffset=0:開始的2個字節就是長度域,因此不須要長度域偏移。
  • lengthFieldLength=2:長度域2個字節。
  • lengthAdjustment=0:數據長度修正爲0,由於長度域只包含數據的長度,因此不須要修正。
  • initialBytesToStrip=0:發送和接收的數據徹底一致,因此不須要跳過任何字節。

需求2

長度域爲2個字節,咱們要求發送和接收的數據以下所示:post

複製代碼
發送的數據 (14 bytes)        接收到數據 (12 bytes)
+--------+----------------+      +----------------+
| Length | Actual Content |----->| Actual Content |
|  12    | "HELLO, WORLD" |      | "HELLO, WORLD" |
+--------+----------------+      +----------------+
複製代碼

參數值以下:this

  • lengthFieldOffset=0:開始的2個字節就是長度域,因此不須要長度域偏移。
  • lengthFieldLength=2:長度域2個字節。
  • lengthAdjustment=0:數據長度修正爲0,由於長度域只包含數據的長度,因此不須要修正。
  • initialBytesToStrip=2:咱們發現接收的數據沒有長度域的數據,因此要跳過長度域的2個字節。

需求3

長度域爲2個字節,咱們要求發送和接收的數據以下所示:

複製代碼
BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
+--------+----------------+      +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 14     | "HELLO, WORLD" |      |  14    | "HELLO, WORLD" |
+--------+----------------+      +--------+----------------+
複製代碼

留心的你確定又發現了,長度域表示的長度是總長度 也就是header+body的總長度。參數以下:

  • lengthFieldOffset=0:開始的2個字節就是長度域,因此不須要長度域偏移。
  • lengthFieldLength=2:長度域2個字節。
  • lengthAdjustment=-2:由於長度域爲總長度,因此咱們須要修正數據長度,也就是減去2。
  • initialBytesToStrip=0:咱們發現接收的數據沒有長度域的數據,因此要跳過長度域的2個字節。

需求4

長度域爲2個字節,咱們要求發送和接收的數據以下所示:

複製代碼
BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
+----------+----------+----------------+      +----------+----------+----------------+
| meta     |  Length  | Actual Content |----->| meta | Length | Actual Content |
|  0xCAFE  | 12       | "HELLO, WORLD" |      |  0xCAFE  | 12       | "HELLO, WORLD" |
+----------+----------+----------------+      +----------+----------+----------------+
複製代碼

咱們發現,數據的結構有點變化,變成了 meta+header+body的結構。meta通常表示元數據,魔數等。咱們定義這裏meta有三個字節。參數以下:

  • lengthFieldOffset=3:開始的3個字節是meta,而後纔是長度域,因此長度域偏移爲3。
  • lengthFieldLength=2:長度域2個字節。
  • lengthAdjustment=0:長度域指定的長度位數據長度,因此數據長度不須要修正。
  • initialBytesToStrip=0:發送和接收數據相同,不須要跳過數據。

需求5

長度域爲2個字節,咱們要求發送和接收的數據以下所示:

複製代碼
BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
+----------+----------+----------------+      +----------+----------+----------------+
|  Length  | meta     | Actual Content |----->| Length | meta | Actual Content |
|   12     |  0xCAFE  | "HELLO, WORLD" |      |    12    |  0xCAFE  | "HELLO, WORLD" |
+----------+----------+----------------+      +----------+----------+----------------+
複製代碼

咱們發現,數據的結構有點變化,變成了 header+meta+body的結構。meta通常表示元數據,魔數等。咱們定義這裏meta有三個字節。參數以下:

  • lengthFieldOffset=0:開始的2個字節就是長度域,因此不須要長度域偏移。
  • lengthFieldLength=2:長度域2個字節。
  • lengthAdjustment=3:咱們須要把meta+body當作body處理,因此數據長度須要加3。
  • initialBytesToStrip=0:發送和接收數據相同,不須要跳過數據。

需求6

長度域爲2個字節,咱們要求發送和接收的數據以下所示:

複製代碼
BEFORE DECODE (16 bytes)                    AFTER DECODE (13 bytes)
+------+--------+------+----------------+      +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+      +------+----------------+
複製代碼

咱們發現,數據的結構有點變化,變成了 hdr1+header+hdr2+body的結構。咱們定義這裏hdr1和hdr2都只有1個字節。參數以下:

  • lengthFieldOffset=1:開始的1個字節是長度域,因此須要設置長度域偏移爲1。
  • lengthFieldLength=2:長度域2個字節。
  • lengthAdjustment=1:咱們須要把hdr2+body當作body處理,因此數據長度須要加1。
  • initialBytesToStrip=3:接收數據不包括hdr1和長度域相同,因此須要跳過3個字節。

LengthFieldBasedFrameDecoder 源碼剖析

實現拆包抽象

在前面的文章中咱們知道,具體的拆包協議只須要實現

void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)

其中 in 表示目前爲止還未拆的數據,拆完以後的包添加到 out這個list中便可實現包向下傳遞,第一層實現比較簡單

複製代碼
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}
複製代碼

重載的protected函數decode作真正的拆包動做

複製代碼
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (this.discardingTooLongFrame) {
        long bytesToDiscard = this.bytesToDiscard;
        int localBytesToDiscard = (int)Math.min(bytesToDiscard, (long)in.readableBytes());
        in.skipBytes(localBytesToDiscard);
        bytesToDiscard -= (long)localBytesToDiscard;
        this.bytesToDiscard = bytesToDiscard;
        this.failIfNecessary(false);
    }

    // 若是當前可讀字節還未達到長度長度域的偏移,那說明確定是讀不到長度域的,直接不讀
    if (in.readableBytes() < this.lengthFieldEndOffset) {
        return null;
    } else {
        // 拿到長度域的實際字節偏移,就是長度域的開始下標
        // 這裏就是需求4,開始的幾個字節並非長度域
        int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
        // 拿到實際的未調整過的包長度
        // 就是讀取長度域的十進制值,最原始傳過來的包的長度
        long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);
        // 若是拿到的長度爲負數,直接跳過長度域並拋出異常
        if (frameLength < 0L) {
            in.skipBytes(this.lengthFieldEndOffset);
            throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
        } else {
            // 調整包的長度
            frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
            // 整個數據包的長度尚未長度域長,直接拋出異常
            if (frameLength < (long)this.lengthFieldEndOffset) {
                in.skipBytes(this.lengthFieldEndOffset);
                throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + this.lengthFieldEndOffset);
            // 數據包長度超出最大包長度,進入丟棄模式
            } else if (frameLength > (long)this.maxFrameLength) {
                long discard = frameLength - (long)in.readableBytes();
                this.tooLongFrameLength = frameLength;
                if (discard < 0L) {
                    in.skipBytes((int)frameLength);
                } else {
                    this.discardingTooLongFrame = true;
                    this.bytesToDiscard = discard;
                    in.skipBytes(in.readableBytes());
                }

                this.failIfNecessary(true);
                return null;
            } else {
                int frameLengthInt = (int)frameLength;
                //當前可讀的字節數小於包中的length,什麼都不作,等待下一次解碼
                if (in.readableBytes() < frameLengthInt) {
                    return null;
                //跳過的字節不能大於數據包的長度,不然就拋出 CorruptedFrameException 的異常
                } else if (this.initialBytesToStrip > frameLengthInt) {
                    in.skipBytes(frameLengthInt);
                    throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);
                } else {
                    //根據initialBytesToStrip的設置來跳過某些字節
                    in.skipBytes(this.initialBytesToStrip);
                    //拿到當前累積數據的讀指針
                    int readerIndex = in.readerIndex();
                    //拿到待抽取數據包的實際長度
                    int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
                    //進行抽取
                    ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
                    //移動讀指針
                    in.readerIndex(readerIndex + actualFrameLength);
                    return frame;
                }
            }
        }
    }
}
複製代碼

下面分幾個部分來分析一下這個重量級函數

獲取frame長度

獲取須要待拆包的包大小

複製代碼
// 拿到長度域的實際字節偏移,就是長度域的開始下標
// 這裏就是需求4,開始的幾個字節並非長度域
int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
// 拿到實際的未調整過的包長度
// 就是讀取長度域的十進制值,最原始傳過來的包的長度
long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);
// 調整包的長度
frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
複製代碼

上面這一段內容有個擴展點 getUnadjustedFrameLength,若是你的長度域表明的值表達的含義不是正常的int,short等基本類型,你能夠重寫這個函數

複製代碼
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
    buf = buf.order(order);
    long frameLength;
    switch (length) {
    case 1:
        frameLength = buf.getUnsignedByte(offset);
        break;
    case 2:
        frameLength = buf.getUnsignedShort(offset);
        break;
    case 3:
        frameLength = buf.getUnsignedMedium(offset);
        break;
    case 4:
        frameLength = buf.getUnsignedInt(offset);
        break;
    case 8:
        frameLength = buf.getLong(offset);
        break;
    default:
        throw new DecoderException(
                "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
    }
    return frameLength;
}
複製代碼

跳過指定字節長度

複製代碼
int frameLengthInt = (int)frameLength;
//當前可讀的字節數小於包中的length,什麼都不作,等待下一次解碼
if (in.readableBytes() < frameLengthInt) {
    return null;
//跳過的字節不能大於數據包的長度,不然就拋出 CorruptedFrameException 的異常
} else if (this.initialBytesToStrip > frameLengthInt) {
    in.skipBytes(frameLengthInt);
    throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);
}
//根據initialBytesToStrip的設置來跳過某些字節
in.skipBytes(this.initialBytesToStrip);
複製代碼

先驗證當前是否已經讀到足夠的字節,若是讀到了,在下一步抽取一個完整的數據包以前,須要根據initialBytesToStrip的設置來跳過某些字節(見文章開篇),固然,跳過的字節不能大於數據包的長度,不然就拋出 CorruptedFrameException 的異常

抽取frame

複製代碼
//根據initialBytesToStrip的設置來跳過某些字節
in.skipBytes(this.initialBytesToStrip);
//拿到當前累積數據的讀指針
int readerIndex = in.readerIndex();
//拿到待抽取數據包的實際長度
int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
//進行抽取
ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
//移動讀指針
in.readerIndex(readerIndex + actualFrameLength);
return frame;
複製代碼

到了最後抽取數據包其實就很簡單了,拿到當前累積數據的讀指針,而後拿到待抽取數據包的實際長度進行抽取,抽取以後,移動讀指針

protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
    return buffer.retainedSlice(index, length);
}

抽取的過程是簡單的調用了一下 ByteBuf 的retainedSliceapi,該api無內存copy開銷

自定義解碼器

協議實體的定義

複製代碼
public class MyProtocolBean {
    //類型  系統編號 0xA 表示A系統,0xB 表示B系統
    private byte type;

    //信息標誌  0xA 表示心跳包    0xB 表示超時包  0xC 業務信息包
    private byte flag;

    //內容長度
    private int length;

    //內容
    private String content;

    //省略get/set
}
複製代碼

服務器端

服務端的實現

複製代碼
public class Server {

    private static final int MAX_FRAME_LENGTH = 1024 * 1024;  //最大長度
    private static final int LENGTH_FIELD_LENGTH = 4;  //長度字段所佔的字節數
    private static final int LENGTH_FIELD_OFFSET = 2;  //長度偏移
    private static final int LENGTH_ADJUSTMENT = 0;
    private static final int INITIAL_BYTES_TO_STRIP = 0;

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
                            ch.pipeline().addLast(new ServerHandler());
                        };

                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 綁定端口,開始接收進來的鏈接
            ChannelFuture future = sbs.bind(port).sync();

            System.out.println("Server start listen at " + port );
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new Server(port).start();
    }
}
複製代碼

自定義解碼器MyProtocolDecoder

複製代碼
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {

    private static final int HEADER_SIZE = 6;

    /**
     * @param maxFrameLength  幀的最大長度
     * @param lengthFieldOffset length字段偏移的地址
     * @param lengthFieldLength length字段所佔的字節長
     * @param lengthAdjustment 修改幀數據長度字段中定義的值,能夠爲負數 由於有時候咱們習慣把頭部記入長度,若爲負數,則說明要推後多少個字段
     * @param initialBytesToStrip 解析時候跳過多少個長度
     * @param failFast 爲true,當frame長度超過maxFrameLength時當即報TooLongFrameException異常,爲false,讀取完整個幀再報異
     */

    public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {

        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);

    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //在這裏調用父類的方法,實現指獲得想要的部分,我在這裏所有都要,也能夠只要body部分
        in = (ByteBuf) super.decode(ctx,in);  

        if(in == null){
            return null;
        }
        if(in.readableBytes()<HEADER_SIZE){
            throw new Exception("字節數不足");
        }
        //讀取type字段
        byte type = in.readByte();
        //讀取flag字段
        byte flag = in.readByte();
        //讀取length字段
        int length = in.readInt();
        
        if(in.readableBytes()!=length){
            throw new Exception("標記的長度不符合實際長度");
        }
        //讀取body
        byte []bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);

        return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));

    }
}
複製代碼

服務端Hanlder

複製代碼
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MyProtocolBean myProtocolBean = (MyProtocolBean)msg;  //直接轉化成協議消息實體
        System.out.println(myProtocolBean.getContent());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
}
複製代碼

客戶端和客戶端Handler

複製代碼
public class Client {
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyProtocolEncoder());
                            ch.pipeline().addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(HOST, PORT).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}
複製代碼

客戶端編碼器

複製代碼
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {
        if(msg == null){
            throw new Exception("msg is null");
        }
        out.writeByte(msg.getType());
        out.writeByte(msg.getFlag());
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));
    }
}
複製代碼
  • 編碼的時候,只須要按照定義的順序依次寫入到ByteBuf中.

客戶端Handler

複製代碼
public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");
        ctx.writeAndFlush(myProtocolBean);

    }
}
複製代碼

 

 

 

 

 

 
分類:  netty源碼解析
相關文章
相關標籤/搜索