Netty解決粘包和拆包問題的四種方案

前言

在RPC框架中,粘包和拆包問題是必須解決一個問題,由於RPC框架中,各個微服務相互之間都是維繫了一個TCP長鏈接,好比dubbo就是一個全雙工的長鏈接。因爲微服務往對方發送信息的時候,全部的請求都是使用的同一個鏈接,這樣就會產生粘包和拆包的問題。本文首先會對粘包和拆包問題進行描述,而後介紹其經常使用的解決方案,最後會對Netty提供的幾種解決方案進行講解。json

正文

1. 粘包和拆包

產生粘包和拆包問題的主要緣由是,操做系統在發送TCP數據的時候,底層會有一個緩衝區,例如1024個字節大小,若是一次請求發送的數據量比較小,沒達到緩衝區大小,TCP則會將多個請求合併爲同一個請求進行發送,這就造成了粘包問題;若是一次請求發送的數據量比較大,超過了緩衝區大小,TCP就會將其拆分爲屢次發送,這就是拆包,也就是將一個大的包拆分爲多個小包進行發送。以下圖展現了粘包和拆包的一個示意圖:bootstrap

 

上圖中演示了粘包和拆包的三種狀況:數組

  • A和B兩個包都恰好知足TCP緩衝區的大小,或者說其等待時間已經達到TCP等待時長,從而仍是使用兩個獨立的包進行發送;服務器

  • A和B兩次請求間隔時間內較短,而且數據包較小,於是合併爲同一個包發送給服務端;app

  • B包比較大,於是將其拆分爲兩個包B1和B2進行發送,而這裏因爲拆分後的B_2比較小,其又與A包合併在一塊兒發送。框架

2. 常看法決方案

對於粘包和拆包問題,常見的解決方案有四種:ide

  • 客戶端在發送數據包的時候,每一個包都固定長度,好比1024個字節大小,若是客戶端發送的數據長度不足1024個字節,則經過補充空格的方式補全到指定長度;函數

  • 客戶端在每一個包的末尾使用固定的分隔符,例如\r\n,若是一個包被拆分了,則等待下一個包發送過來以後找到其中的\r\n,而後對其拆分後的頭部部分與前一個包的剩餘部分進行合併,這樣就獲得了一個完整的包;微服務

  • 將消息分爲頭部和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息以後纔算是讀到了一個完整的消息;oop

  • 經過自定義協議進行粘包和拆包的處理。

3. Netty提供的粘包拆包解決方案

3.1 FixedLengthFrameDecoder

對於使用固定長度的粘包和拆包場景,可使用FixedLengthFrameDecoder,該解碼器會每次讀取固定長度的消息,若是當前讀取到的消息不足指定長度,那麼就會等待下一個消息到達後進行補足。其使用也比較簡單,只須要在構造函數中指定每一個消息的長度便可。這裏須要注意的是,FixedLengthFrameDecoder只是一個解碼器,Netty也只提供了一個解碼器,這是由於對於解碼是須要等待下一個包的進行補全的,代碼相對複雜,而對於編碼器,用戶能夠自行編寫,由於編碼時只須要將不足指定長度的部分進行補全便可。下面的示例中展現瞭如何使用FixedLengthFrameDecoder來進行粘包和拆包處理:

  1. public class EchoServer {

  2.  

  3. public void bind(int port) throws InterruptedException {

  4. EventLoopGroup bossGroup = new NioEventLoopGroup();

  5. EventLoopGroup workerGroup = new NioEventLoopGroup();

  6. try {

  7. ServerBootstrap bootstrap = new ServerBootstrap();

  8. bootstrap.group(bossGroup, workerGroup)

  9. .channel(NioServerSocketChannel.class)

  10. .option(ChannelOption.SO_BACKLOG, 1024)

  11. .handler(new LoggingHandler(LogLevel.INFO))

  12. .childHandler(new ChannelInitializer<SocketChannel>() {

  13. @Override

  14. protected void initChannel(SocketChannel ch) throws Exception {

  15. // 這裏將FixedLengthFrameDecoder添加到pipeline中,指定長度爲20

  16. ch.pipeline().addLast(new FixedLengthFrameDecoder(20));

  17. // 將前一步解碼獲得的數據轉碼爲字符串

  18. ch.pipeline().addLast(new StringDecoder());

  19. // 這裏FixedLengthFrameEncoder是咱們自定義的,用於將長度不足20的消息進行補全空格

  20. ch.pipeline().addLast(new FixedLengthFrameEncoder(20));

  21. // 最終的數據處理

  22. ch.pipeline().addLast(new EchoServerHandler());

  23. }

  24. });

  25.  

  26. ChannelFuture future = bootstrap.bind(port).sync();

  27. future.channel().closeFuture().sync();

  28. } finally {

  29. bossGroup.shutdownGracefully();

  30. workerGroup.shutdownGracefully();

  31. }

  32. }

  33.  

  34. public static void main(String[] args) throws InterruptedException {

  35. new EchoServer().bind(8080);

  36. }

  37. }

上面的pipeline中,對於入棧數據,這裏主要添加了FixedLengthFrameDecoder和StringDecoder,前面一個用於處理固定長度的消息的粘包和拆包問題,第二個則是將處理以後的消息轉換爲字符串。最後由EchoServerHandler處理最終獲得的數據,處理完成後,將處理獲得的數據交由FixedLengthFrameEncoder處理,該編碼器是咱們自定義的實現,主要做用是將長度不足20的消息進行空格補全。下面是FixedLengthFrameEncoder的實現代碼:

  1. public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> {

  2. private int length;

  3.  

  4. public FixedLengthFrameEncoder(int length) {

  5. this.length = length;

  6. }

  7.  

  8. @Override

  9. protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)

  10. throws Exception {

  11. // 對於超過指定長度的消息,這裏直接拋出異常

  12. if (msg.length() > length) {

  13. throw new UnsupportedOperationException(

  14. "message length is too large, it's limited " + length);

  15. }

  16.  

  17. // 若是長度不足,則進行補全

  18. if (msg.length() < length) {

  19. msg = addSpace(msg);

  20. }

  21.  

  22. ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));

  23. }

  24.  

  25. // 進行空格補全

  26. private String addSpace(String msg) {

  27. StringBuilder builder = new StringBuilder(msg);

  28. for (int i = 0; i < length - msg.length(); i++) {

  29. builder.append(" ");

  30. }

  31.  

  32. return builder.toString();

  33. }

  34. }

這裏FixedLengthFrameEncoder實現了decode()方法,在該方法中,主要是將消息長度不足20的消息進行空格補全。EchoServerHandler的做用主要是打印接收到的消息,而後發送響應給客戶端:

  1. public class EchoServerHandler extends SimpleChannelInboundHandler<String> {

  2.  

  3. @Override

  4. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

  5. System.out.println("server receives message: " + msg.trim());

  6. ctx.writeAndFlush("hello client!");

  7. }

  8. }

對於客戶端,其實現方式基本與服務端的使用方式相似,只是在最後進行消息發送的時候與服務端的處理方式不一樣。以下是客戶端EchoClient的代碼:

  1. public class EchoClient {

  2.  

  3. public void connect(String host, int port) throws InterruptedException {

  4. EventLoopGroup group = new NioEventLoopGroup();

  5. try {

  6. Bootstrap bootstrap = new Bootstrap();

  7. bootstrap.group(group)

  8. .channel(NioSocketChannel.class)

  9. .option(ChannelOption.TCP_NODELAY, true)

  10. .handler(new ChannelInitializer<SocketChannel>() {

  11. @Override

  12. protected void initChannel(SocketChannel ch) throws Exception {

  13. // 對服務端發送的消息進行粘包和拆包處理,因爲服務端發送的消息已經進行了空格補全,

  14. // 而且長度爲20,於是這裏指定的長度也爲20

  15. ch.pipeline().addLast(new FixedLengthFrameDecoder(20));

  16. // 將粘包和拆包處理獲得的消息轉換爲字符串

  17. ch.pipeline().addLast(new StringDecoder());

  18. // 對客戶端發送的消息進行空格補全,保證其長度爲20

  19. ch.pipeline().addLast(new FixedLengthFrameEncoder(20));

  20. // 客戶端發送消息給服務端,而且處理服務端響應的消息

  21. ch.pipeline().addLast(new EchoClientHandler());

  22. }

  23. });

  24.  

  25. ChannelFuture future = bootstrap.connect(host, port).sync();

  26. future.channel().closeFuture().sync();

  27. } finally {

  28. group.shutdownGracefully();

  29. }

  30. }

  31.  

  32. public static void main(String[] args) throws InterruptedException {

  33. new EchoClient().connect("127.0.0.1", 8080);

  34. }

  35. }

對於客戶端而言,其消息的處理流程其實與服務端是類似的,對於入站消息,須要對其進行粘包和拆包處理,而後將其轉碼爲字符串,對於出站消息,則須要將長度不足20的消息進行空格補全。客戶端與服務端處理的主要區別在於最後的消息處理handler不同,也即這裏的EchoClientHandler,以下是該handler的源碼:

  1. public class EchoClientHandler extends SimpleChannelInboundHandler<String> {

  2.  

  3. @Override

  4. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

  5. System.out.println("client receives message: " + msg.trim());

  6. }

  7.  

  8. @Override

  9. public void channelActive(ChannelHandlerContext ctx) throws Exception {

  10. ctx.writeAndFlush("hello server!");

  11. }

  12. }

這裏客戶端的處理主要是重寫了channelActive()和channelRead0()兩個方法,這兩個方法的主要做用在於,channelActive()會在客戶端鏈接上服務器時執行,也就是說,其連上服務器以後就會往服務器發送消息。而channelRead0()主要是在服務器發送響應給客戶端時執行,這裏主要是打印服務器的響應消息。對於服務端而言,前面咱們咱們能夠看到,EchoServerHandler只重寫了channelRead0()方法,這是由於服務器只須要等待客戶端發送消息過來,而後在該方法中進行處理,處理完成後直接將響應發送給客戶端。以下是分別啓動服務端和客戶端以後控制檯打印的數據:

server

server receives message: hello server!

client

client receives message: hello client!

3.2 LineBasedFrameDecoder與DelimiterBasedFrameDecoder

對於經過分隔符進行粘包和拆包問題的處理,Netty提供了兩個編解碼的類,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。這裏LineBasedFrameDecoder的做用主要是經過換行符,即\n或者\r\n對數據進行處理;而DelimiterBasedFrameDecoder的做用則是經過用戶指定的分隔符對數據進行粘包和拆包處理。一樣的,這兩個類都是解碼器類,而對於數據的編碼,也即在每一個數據包最後添加換行符或者指定分割符的部分須要用戶自行進行處理。這裏以DelimiterBasedFrameDecoder爲例進行講解,以下是EchoServer中使用該類的代碼片斷,其他部分與前面的例子中的徹底一致:

  1. @Override

  2. protected void initChannel(SocketChannel ch) throws Exception {

  3. String delimiter = "_$";

  4. // 將delimiter設置到DelimiterBasedFrameDecoder中,通過該解碼器進行處理以後,源數據將會

  5. // 被按照_$進行分隔,這裏1024指的是分隔的最大長度,即當讀取到1024個字節的數據以後,若仍是未

  6. // 讀取到分隔符,則捨棄當前數據段,由於其頗有多是因爲碼流紊亂形成的

  7. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,

  8. Unpooled.wrappedBuffer(delimiter.getBytes())));

  9. // 將分隔以後的字節數據轉換爲字符串數據

  10. ch.pipeline().addLast(new StringDecoder());

  11. // 這是咱們自定義的一個編碼器,主要做用是在返回的響應數據最後添加分隔符

  12. ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));

  13. // 最終處理數據而且返回響應的handler

  14. ch.pipeline().addLast(new EchoServerHandler());

  15. }

上面pipeline的設置中,添加的解碼器主要有DelimiterBasedFrameDecoder和StringDecoder,通過這兩個處理器處理以後,接收到的字節流就會被分隔,而且轉換爲字符串數據,最終交由EchoServerHandler處理。這裏DelimiterBasedFrameEncoder是咱們自定義的編碼器,其主要做用是在返回的響應數據以後添加分隔符。以下是該編碼器的源碼:

  1. public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {

  2.  

  3. private String delimiter;

  4.  

  5. public DelimiterBasedFrameEncoder(String delimiter) {

  6. this.delimiter = delimiter;

  7. }

  8.  

  9. @Override

  10. protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)

  11. throws Exception {

  12. // 在響應的數據後面添加分隔符

  13. ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes()));

  14. }

  15. }

對於客戶端而言,這裏的處理方式與服務端相似,其pipeline的添加方式以下:

  1. @Override

  2. protected void initChannel(SocketChannel ch) throws Exception {

  3. String delimiter = "_$";

  4. // 對服務端返回的消息經過_$進行分隔,而且每次查找的最大大小爲1024字節

  5. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,

  6. Unpooled.wrappedBuffer(delimiter.getBytes())));

  7. // 將分隔以後的字節數據轉換爲字符串

  8. ch.pipeline().addLast(new StringDecoder());

  9. // 對客戶端發送的數據進行編碼,這裏主要是在客戶端發送的數據最後添加分隔符

  10. ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));

  11. // 客戶端發送數據給服務端,而且處理從服務端響應的數據

  12. ch.pipeline().addLast(new EchoClientHandler());

  13. }

這裏客戶端的處理方式與服務端基本一致,關於這裏沒展現的代碼,其與示例一中的代碼徹底一致,這裏則不予展現。

3.3 LengthFieldBasedFrameDecoder與LengthFieldPrepender

這裏LengthFieldBasedFrameDecoder與LengthFieldPrepender須要配合起來使用,其實本質上來說,這二者一個是解碼,一個是編碼的關係。它們處理粘拆包的主要思想是在生成的數據包中添加一個長度字段,用於記錄當前數據包的長度。LengthFieldBasedFrameDecoder會按照參數指定的包長度偏移量數據對接收到的數據進行解碼,從而獲得目標消息體數據;而LengthFieldPrepender則會在響應的數據前面添加指定的字節數據,這個字節數據中保存了當前消息體的總體字節數據長度。LengthFieldBasedFrameDecoder的解碼過程以下圖所示:

 

關於LengthFieldBasedFrameDecoder,這裏須要對其構造函數參數進行介紹:

  • maxFrameLength:指定了每一個包所能傳遞的最大數據包大小;

  • lengthFieldOffset:指定了長度字段在字節碼中的偏移量;

  • lengthFieldLength:指定了長度字段所佔用的字節長度;

  • lengthAdjustment:對一些不只包含有消息頭和消息體的數據進行消息頭的長度的調整,這樣就能夠只獲得消息體的數據,這裏的lengthAdjustment指定的就是消息頭的長度;

  • initialBytesToStrip:對於長度字段在消息頭中間的狀況,能夠經過initialBytesToStrip忽略掉消息頭以及長度字段佔用的字節。

這裏咱們以json序列化爲例對LengthFieldBasedFrameDecoder和LengthFieldPrepender的使用方式進行講解。以下是EchoServer的源碼:

  1. public class EchoServer {

  2.  

  3. public void bind(int port) throws InterruptedException {

  4. EventLoopGroup bossGroup = new NioEventLoopGroup();

  5. EventLoopGroup workerGroup = new NioEventLoopGroup();

  6. try {

  7. ServerBootstrap bootstrap = new ServerBootstrap();

  8. bootstrap.group(bossGroup, workerGroup)

  9. .channel(NioServerSocketChannel.class)

  10. .option(ChannelOption.SO_BACKLOG, 1024)

  11. .handler(new LoggingHandler(LogLevel.INFO))

  12. .childHandler(new ChannelInitializer<SocketChannel>() {

  13. @Override

  14. protected void initChannel(SocketChannel ch) throws Exception {

  15. // 這裏將LengthFieldBasedFrameDecoder添加到pipeline的首位,由於其須要對接收到的數據

  16. // 進行長度字段解碼,這裏也會對數據進行粘包和拆包處理

  17. ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));

  18. // LengthFieldPrepender是一個編碼器,主要是在響應字節數據前面添加字節長度字段

  19. ch.pipeline().addLast(new LengthFieldPrepender(2));

  20. // 對通過粘包和拆包處理以後的數據進行json反序列化,從而獲得User對象

  21. ch.pipeline().addLast(new JsonDecoder());

  22. // 對響應數據進行編碼,主要是將User對象序列化爲json

  23. ch.pipeline().addLast(new JsonEncoder());

  24. // 處理客戶端的請求的數據,而且進行響應

  25. ch.pipeline().addLast(new EchoServerHandler());

  26. }

  27. });

  28.  

  29. ChannelFuture future = bootstrap.bind(port).sync();

  30. future.channel().closeFuture().sync();

  31. } finally {

  32. bossGroup.shutdownGracefully();

  33. workerGroup.shutdownGracefully();

  34. }

  35. }

  36.  

  37. public static void main(String[] args) throws InterruptedException {

  38. new EchoServer().bind(8080);

  39. }

  40. }

這裏EchoServer主要是在pipeline中添加了兩個編碼器和兩個解碼器,編碼器主要是負責將響應的User對象序列化爲json對象,而後在其字節數組前面添加一個長度字段的字節數組;解碼器主要是對接收到的數據進行長度字段的解碼,而後將其反序列化爲一個User對象。下面是JsonDecoder的源碼:

  1. public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {

  2.  

  3. @Override

  4. protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out)

  5. throws Exception {

  6. byte[] bytes = new byte[buf.readableBytes()];

  7. buf.readBytes(bytes);

  8. User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class);

  9. out.add(user);

  10. }

  11. }

JsonDecoder首先從接收到的數據流中讀取字節數組,而後將其反序列化爲一個User對象。下面咱們看看JsonEncoder的源碼:

  1. public class JsonEncoder extends MessageToByteEncoder<User> {

  2.  

  3. @Override

  4. protected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf)

  5. throws Exception {

  6. String json = JSON.toJSONString(user);

  7. ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));

  8. }

  9. }

JsonEncoder將響應獲得的User對象轉換爲一個json對象,而後寫入響應中。對於EchoServerHandler,其主要做用就是接收客戶端數據,而且進行響應,以下是其源碼:

  1. public class EchoServerHandler extends SimpleChannelInboundHandler<User> {

  2.  

  3. @Override

  4. protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {

  5. System.out.println("receive from client: " + user);

  6. ctx.write(user);

  7. }

  8. }

對於客戶端,其主要邏輯與服務端的基本相似,這裏主要展現其pipeline的添加方式,以及最後發送請求,而且對服務器響應進行處理的過程:

  1. @Override

  2. protected void initChannel(SocketChannel ch) throws Exception {

  3. ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));

  4. ch.pipeline().addLast(new LengthFieldPrepender(2));

  5. ch.pipeline().addLast(new JsonDecoder());

  6. ch.pipeline().addLast(new JsonEncoder());

  7. ch.pipeline().addLast(new EchoClientHandler());

  8. }

EchoClientHandler

  1. public class EchoClientHandler extends SimpleChannelInboundHandler<User> {

  2.  

  3. @Override

  4. public void channelActive(ChannelHandlerContext ctx) throws Exception {

  5. ctx.write(getUser());

  6. }

  7.  

  8. private User getUser() {

  9. User user = new User();

  10. user.setAge(27);

  11. user.setName("zhangxufeng");

  12. return user;

  13. }

  14.  

  15. @Override

  16. protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {

  17. System.out.println("receive message from server: " + user);

  18. }

  19. }

這裏客戶端首先會在鏈接上服務器時,往服務器發送一個User對象數據,而後在接收到服務器響應以後,會打印服務器響應的數據。

3.4 自定義粘包與拆包器

對於粘包與拆包問題,其實前面三種基本上已經可以知足大多數情形了,可是對於一些更加複雜的協議,可能有一些定製化的需求。對於這些場景,其實本質上,咱們也不須要手動從頭開始寫一份粘包與拆包處理器,而是經過繼承LengthFieldBasedFrameDecoder和LengthFieldPrepender來實現粘包和拆包的處理。

若是用戶確實須要不經過繼承的方式實現本身的粘包和拆包處理器,這裏能夠經過實現MessageToByteEncoder和ByteToMessageDecoder來實現。這裏MessageToByteEncoder的做用是將響應數據編碼爲一個ByteBuf對象,而ByteToMessageDecoder則是將接收到的ByteBuf數據轉換爲某個對象數據。經過實現這兩個抽象類,用戶就能夠達到實現自定義粘包和拆包處理的目的。以下是這兩個類及其抽象方法的聲明:

ByteToMessageDecoder

  1. public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {

  2. protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)

  3. throws Exception;

  4. }

MessageToByteEncoder

  1. public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {

  2. protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)

  3. throws Exception;

  4. }

小結

本文首先對粘包和拆包的問題原理進行描述,幫助讀者理解粘包和拆包問題所在。而後對處理粘包和拆包的幾種經常使用解決方案進行講解。接着經過輔以示例的方式對Netty提供的幾種解決粘包和拆包問題的解決方案進行了詳細講解。

相關文章
相關標籤/搜索