目錄html
正文java
本篇文章主要是介紹使用LengthFieldBasedFrameDecoder解碼器自定義協議。一般,協議的格式以下:api
LengthFieldBasedFrameDecoder是netty解決拆包粘包問題的一個重要的類,主要結構就是header+body結構。咱們只須要傳入正確的參數就能夠發送和接收正確的數據,那麼重點就在於這幾個參數的意義。下面咱們就具體瞭解一下這幾個參數的意義。先來看一下LengthFieldBasedFrameDecoder主要的構造方法:服務器
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip)
那麼這幾個重要的參數以下:less
- maxFrameLength:最大幀長度。也就是能夠接收的數據的最大長度。若是超過,這次數據會被丟棄。
- lengthFieldOffset:長度域偏移。就是說數據開始的幾個字節可能不是表示數據長度,須要後移幾個字節纔是長度域。
- lengthFieldLength:長度域字節數。用幾個字節來表示數據長度。
- lengthAdjustment:數據長度修正。由於長度域指定的長度可使header+body的整個長度,也能夠只是body的長度。若是表示header+body的整個長度,那麼咱們須要修正數據長度。
- initialBytesToStrip:跳過的字節數。若是你須要接收header+body的全部數據,此值就是0,若是你只想接收body數據,那麼須要跳過header所佔用的字節數。
下面咱們根據幾個例子的使用來具體說明這幾個參數的使用。ide
LengthFieldBasedFrameDecoder 的用法
需求1
長度域爲2個字節,咱們要求發送和接收的數據以下所示:函數
發送的數據 (14 bytes) 接收到數據 (14 bytes) +--------+----------------+ +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 12 | "HELLO, WORLD" | | 12 | "HELLO, WORLD" | +--------+----------------+ +--------+----------------+
留心的你確定發現了,長度域只是實際內容的長度,不包括長度域的長度。下面是參數的值:oop
- lengthFieldOffset=0:開始的2個字節就是長度域,因此不須要長度域偏移。
- lengthFieldLength=2:長度域2個字節。
- lengthAdjustment=0:數據長度修正爲0,由於長度域只包含數據的長度,因此不須要修正。
- initialBytesToStrip=0:發送和接收的數據徹底一致,因此不須要跳過任何字節。
需求2
長度域爲2個字節,咱們要求發送和接收的數據以下所示:post
發送的數據 (14 bytes) 接收到數據 (12 bytes) +--------+----------------+ +----------------+ | Length | Actual Content |----->| Actual Content | | 12 | "HELLO, WORLD" | | "HELLO, WORLD" | +--------+----------------+ +----------------+
參數值以下:this
- lengthFieldOffset=0:開始的2個字節就是長度域,因此不須要長度域偏移。
- lengthFieldLength=2:長度域2個字節。
- lengthAdjustment=0:數據長度修正爲0,由於長度域只包含數據的長度,因此不須要修正。
- initialBytesToStrip=2:咱們發現接收的數據沒有長度域的數據,因此要跳過長度域的2個字節。
需求3
長度域爲2個字節,咱們要求發送和接收的數據以下所示:
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) +--------+----------------+ +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 14 | "HELLO, WORLD" | | 14 | "HELLO, WORLD" | +--------+----------------+ +--------+----------------+
留心的你確定又發現了,長度域表示的長度是總長度 也就是header+body的總長度。參數以下:
- lengthFieldOffset=0:開始的2個字節就是長度域,因此不須要長度域偏移。
- lengthFieldLength=2:長度域2個字節。
- lengthAdjustment=-2:由於長度域爲總長度,因此咱們須要修正數據長度,也就是減去2。
- initialBytesToStrip=0:咱們發現接收的數據沒有長度域的數據,因此要跳過長度域的2個字節。
需求4
長度域爲2個字節,咱們要求發送和接收的數據以下所示:
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) +----------+----------+----------------+ +----------+----------+----------------+ | meta | Length | Actual Content |----->| meta | Length | Actual Content | | 0xCAFE | 12 | "HELLO, WORLD" | | 0xCAFE | 12 | "HELLO, WORLD" | +----------+----------+----------------+ +----------+----------+----------------+
咱們發現,數據的結構有點變化,變成了 meta+header+body的結構。meta通常表示元數據,魔數等。咱們定義這裏meta有三個字節。參數以下:
- lengthFieldOffset=3:開始的3個字節是meta,而後纔是長度域,因此長度域偏移爲3。
- lengthFieldLength=2:長度域2個字節。
- lengthAdjustment=0:長度域指定的長度位數據長度,因此數據長度不須要修正。
- initialBytesToStrip=0:發送和接收數據相同,不須要跳過數據。
需求5
長度域爲2個字節,咱們要求發送和接收的數據以下所示:
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) +----------+----------+----------------+ +----------+----------+----------------+ | Length | meta | Actual Content |----->| Length | meta | Actual Content | | 12 | 0xCAFE | "HELLO, WORLD" | | 12 | 0xCAFE | "HELLO, WORLD" | +----------+----------+----------------+ +----------+----------+----------------+
咱們發現,數據的結構有點變化,變成了 header+meta+body的結構。meta通常表示元數據,魔數等。咱們定義這裏meta有三個字節。參數以下:
- lengthFieldOffset=0:開始的2個字節就是長度域,因此不須要長度域偏移。
- lengthFieldLength=2:長度域2個字節。
- lengthAdjustment=3:咱們須要把meta+body當作body處理,因此數據長度須要加3。
- initialBytesToStrip=0:發送和接收數據相同,不須要跳過數據。
需求6
長度域爲2個字節,咱們要求發送和接收的數據以下所示:
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) +------+--------+------+----------------+ +------+----------------+ | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | +------+--------+------+----------------+ +------+----------------+
咱們發現,數據的結構有點變化,變成了 hdr1+header+hdr2+body的結構。咱們定義這裏hdr1和hdr2都只有1個字節。參數以下:
- lengthFieldOffset=1:開始的1個字節是長度域,因此須要設置長度域偏移爲1。
- lengthFieldLength=2:長度域2個字節。
- lengthAdjustment=1:咱們須要把hdr2+body當作body處理,因此數據長度須要加1。
- initialBytesToStrip=3:接收數據不包括hdr1和長度域相同,因此須要跳過3個字節。
LengthFieldBasedFrameDecoder 源碼剖析
實現拆包抽象
在前面的文章中咱們知道,具體的拆包協議只須要實現
void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
其中 in 表示目前爲止還未拆的數據,拆完以後的包添加到 out這個list中便可實現包向下傳遞,第一層實現比較簡單
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
重載的protected函數decode作真正的拆包動做
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (this.discardingTooLongFrame) { long bytesToDiscard = this.bytesToDiscard; int localBytesToDiscard = (int)Math.min(bytesToDiscard, (long)in.readableBytes()); in.skipBytes(localBytesToDiscard); bytesToDiscard -= (long)localBytesToDiscard; this.bytesToDiscard = bytesToDiscard; this.failIfNecessary(false); } // 若是當前可讀字節還未達到長度長度域的偏移,那說明確定是讀不到長度域的,直接不讀 if (in.readableBytes() < this.lengthFieldEndOffset) { return null; } else { // 拿到長度域的實際字節偏移,就是長度域的開始下標 // 這裏就是需求4,開始的幾個字節並非長度域 int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset; // 拿到實際的未調整過的包長度 // 就是讀取長度域的十進制值,最原始傳過來的包的長度 long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder); // 若是拿到的長度爲負數,直接跳過長度域並拋出異常 if (frameLength < 0L) { in.skipBytes(this.lengthFieldEndOffset); throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength); } else { // 調整包的長度 frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset); // 整個數據包的長度尚未長度域長,直接拋出異常 if (frameLength < (long)this.lengthFieldEndOffset) { in.skipBytes(this.lengthFieldEndOffset); throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + this.lengthFieldEndOffset); // 數據包長度超出最大包長度,進入丟棄模式 } else if (frameLength > (long)this.maxFrameLength) { long discard = frameLength - (long)in.readableBytes(); this.tooLongFrameLength = frameLength; if (discard < 0L) { in.skipBytes((int)frameLength); } else { this.discardingTooLongFrame = true; this.bytesToDiscard = discard; in.skipBytes(in.readableBytes()); } this.failIfNecessary(true); return null; } else { int frameLengthInt = (int)frameLength; //當前可讀的字節數小於包中的length,什麼都不作,等待下一次解碼 if (in.readableBytes() < frameLengthInt) { return null; //跳過的字節不能大於數據包的長度,不然就拋出 CorruptedFrameException 的異常 } else if (this.initialBytesToStrip > frameLengthInt) { in.skipBytes(frameLengthInt); throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip); } else { //根據initialBytesToStrip的設置來跳過某些字節 in.skipBytes(this.initialBytesToStrip); //拿到當前累積數據的讀指針 int readerIndex = in.readerIndex(); //拿到待抽取數據包的實際長度 int actualFrameLength = frameLengthInt - this.initialBytesToStrip; //進行抽取 ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength); //移動讀指針 in.readerIndex(readerIndex + actualFrameLength); return frame; } } } } }
下面分幾個部分來分析一下這個重量級函數
獲取frame長度
獲取須要待拆包的包大小
// 拿到長度域的實際字節偏移,就是長度域的開始下標 // 這裏就是需求4,開始的幾個字節並非長度域 int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset; // 拿到實際的未調整過的包長度 // 就是讀取長度域的十進制值,最原始傳過來的包的長度 long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder); // 調整包的長度 frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
上面這一段內容有個擴展點 getUnadjustedFrameLength,若是你的長度域表明的值表達的含義不是正常的int,short等基本類型,你能夠重寫這個函數
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) { buf = buf.order(order); long frameLength; switch (length) { case 1: frameLength = buf.getUnsignedByte(offset); break; case 2: frameLength = buf.getUnsignedShort(offset); break; case 3: frameLength = buf.getUnsignedMedium(offset); break; case 4: frameLength = buf.getUnsignedInt(offset); break; case 8: frameLength = buf.getLong(offset); break; default: throw new DecoderException( "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)"); } return frameLength; }
跳過指定字節長度
int frameLengthInt = (int)frameLength; //當前可讀的字節數小於包中的length,什麼都不作,等待下一次解碼 if (in.readableBytes() < frameLengthInt) { return null; //跳過的字節不能大於數據包的長度,不然就拋出 CorruptedFrameException 的異常 } else if (this.initialBytesToStrip > frameLengthInt) { in.skipBytes(frameLengthInt); throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip); } //根據initialBytesToStrip的設置來跳過某些字節 in.skipBytes(this.initialBytesToStrip);
先驗證當前是否已經讀到足夠的字節,若是讀到了,在下一步抽取一個完整的數據包以前,須要根據initialBytesToStrip的設置來跳過某些字節(見文章開篇),固然,跳過的字節不能大於數據包的長度,不然就拋出 CorruptedFrameException 的異常
抽取frame
//根據initialBytesToStrip的設置來跳過某些字節 in.skipBytes(this.initialBytesToStrip); //拿到當前累積數據的讀指針 int readerIndex = in.readerIndex(); //拿到待抽取數據包的實際長度 int actualFrameLength = frameLengthInt - this.initialBytesToStrip; //進行抽取 ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength); //移動讀指針 in.readerIndex(readerIndex + actualFrameLength); return frame;
到了最後抽取數據包其實就很簡單了,拿到當前累積數據的讀指針,而後拿到待抽取數據包的實際長度進行抽取,抽取以後,移動讀指針
protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { return buffer.retainedSlice(index, length); }
抽取的過程是簡單的調用了一下 ByteBuf 的retainedSliceapi,該api無內存copy開銷
自定義解碼器
協議實體的定義
public class MyProtocolBean { //類型 系統編號 0xA 表示A系統,0xB 表示B系統 private byte type; //信息標誌 0xA 表示心跳包 0xB 表示超時包 0xC 業務信息包 private byte flag; //內容長度 private int length; //內容 private String content; //省略get/set }
服務器端
服務端的實現
public class Server { private static final int MAX_FRAME_LENGTH = 1024 * 1024; //最大長度 private static final int LENGTH_FIELD_LENGTH = 4; //長度字段所佔的字節數 private static final int LENGTH_FIELD_OFFSET = 2; //長度偏移 private static final int LENGTH_ADJUSTMENT = 0; private static final int INITIAL_BYTES_TO_STRIP = 0; private int port; public Server(int port) { this.port = port; } public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false)); ch.pipeline().addLast(new ServerHandler()); }; }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); // 綁定端口,開始接收進來的鏈接 ChannelFuture future = sbs.bind(port).sync(); System.out.println("Server start listen at " + port ); future.channel().closeFuture().sync(); } catch (Exception e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new Server(port).start(); } }
自定義解碼器MyProtocolDecoder
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder { private static final int HEADER_SIZE = 6; /** * @param maxFrameLength 幀的最大長度 * @param lengthFieldOffset length字段偏移的地址 * @param lengthFieldLength length字段所佔的字節長 * @param lengthAdjustment 修改幀數據長度字段中定義的值,能夠爲負數 由於有時候咱們習慣把頭部記入長度,若爲負數,則說明要推後多少個字段 * @param initialBytesToStrip 解析時候跳過多少個長度 * @param failFast 爲true,當frame長度超過maxFrameLength時當即報TooLongFrameException異常,爲false,讀取完整個幀再報異 */ public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { //在這裏調用父類的方法,實現指獲得想要的部分,我在這裏所有都要,也能夠只要body部分 in = (ByteBuf) super.decode(ctx,in); if(in == null){ return null; } if(in.readableBytes()<HEADER_SIZE){ throw new Exception("字節數不足"); } //讀取type字段 byte type = in.readByte(); //讀取flag字段 byte flag = in.readByte(); //讀取length字段 int length = in.readInt(); if(in.readableBytes()!=length){ throw new Exception("標記的長度不符合實際長度"); } //讀取body byte []bytes = new byte[in.readableBytes()]; in.readBytes(bytes); return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8")); } }
服務端Hanlder
public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MyProtocolBean myProtocolBean = (MyProtocolBean)msg; //直接轉化成協議消息實體 System.out.println(myProtocolBean.getContent()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } }
客戶端和客戶端Handler
public class Client { static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MyProtocolEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = b.connect(HOST, PORT).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
客戶端編碼器
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> { @Override protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception { if(msg == null){ throw new Exception("msg is null"); } out.writeByte(msg.getType()); out.writeByte(msg.getFlag()); out.writeInt(msg.getLength()); out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8"))); } }
- 編碼的時候,只須要按照定義的順序依次寫入到ByteBuf中.
客戶端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty"); ctx.writeAndFlush(myProtocolBean); } }