netty 粘包的解決策略

粘包問題的解決策略

     因爲底層的 TCP 沒法理解上層業務數據,因此在底層是沒法保證數據包不被拆分和重組的 , 這個問題只能經過上層的應用協議棧設計來解決,根據業界主流的協議的解決方案, 能夠概括以下:
  1. 消息定長, 例如每一個報文的大小固定長度200字節,若是不夠,空位補齊空格;
  2. 在包尾部添加回車換行符進行分割, 例如 FTP 協議;
  3. 將消息分爲消息頭和消息體,消息頭中包含表示消息總長度(或者消息具體長度)的字段,一般設計思路爲消息頭的第一個字段使用 int32 來表示消息的總長度;
  4. 更復雜的應用協議層;
 

 一. LineBasedFrameDecoder 與 StringDecoder

 LineBasedFrameDecoder 與 StringDecoder 的工做原理  

  LineBasedFrameDecoder 的工做原理是它依次遍歷 ByteBuf 中得可讀字節,判斷看是否有 '\n' 或者  '\r\n' ,  若是有,就以此位置爲結束位置,從可讀索引到結束位置區間的字節組成一行.他是以換行符爲結束標誌的解碼器.支持攜帶結束符或者不攜帶結束符兩種編碼方式,同時支持配置單行最大長度 . 若是連續讀取到最大長度後仍然沒有發現換行符,則拋出異常,同時忽略掉以前讀到的異常碼流.
 
StringDecoder 的功能很是簡單,就是將收到的對象轉換成字符串,而後繼續調用後面的handler 
  LineBasedFrameDecoder + StringDecoder  組合就是按換行切換的文本解碼器, 它被設計來用於支持 TCP 的粘包和拆包.
 
使用以下:
package time.server.impl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * TODO
 * 
 * @description
 * @author mjorcen
 * @time 2015年5月25日 下午2:50:57
 */
public class NTimeServerImpl {

    public void bind(int port) {
        // 建立兩個NioEventLoopGroup 實例,NioEventLoopGroup
        // 是一個線程組,它包含一組NIO線程,專門用於處理網絡事件的處理,實際上他們就是Reactor 線程組
        // 這裏建立兩個的緣由是一個用於服務端接收用戶的連接,另外一個用於進行SocketChannel的網絡讀寫
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 建立一個 ServerBootstrap ,它是netty用於NIO服務端的輔助啓動類,目的是下降服務端的開發複雜度.
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 設定 服務端接收用戶請求的線程組和用於進行SocketChannel網絡讀寫的線程組
            bootstrap.group(bossGroup, workerGroup);
            // 設置建立的 channel 類型
            bootstrap.channel(NioServerSocketChannel.class);
            // 配置 NioServerSocketChannel 的 tcp 參數, BACKLOG 的大小
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            // 綁定io處理類(childChannelHandler).他的做用相似於 reactor 模式中的 handler
            // 類,主要用於處理網絡 I/O 事件,例如對記錄日誌,對消息進行解碼等.
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 加入行處理器
                    ch.pipeline().addLast(new StringDecoder()); // 加入字符串解碼器
                    ch.pipeline().addLast(new TimeServerHandler());
                }
            });
            // 綁定端口,隨後調用它的同步阻塞方法 sync 等等綁定操做成功,完成以後 Netty 會返回一個 ChannelFuture
            // 它的功能相似於的 Future,主要用於異步操做的通知回調.
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            // 等待服務端監聽端口關閉,調用 sync 方法進行阻塞,等待服務端鏈路關閉以後 main 函數才退出.
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅的退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        NTimeServerImpl server = new NTimeServerImpl();
        server.bind(9091);
    }

}

ServerHandlerjava

package time.server.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;

import time.TimeConfig;

/**
 * TODO
 * 
 * @description
 * @author ez
 * @time 2015年5月25日 下午3:06:09
 */
public class TimeServerHandler extends ChannelHandlerAdapter implements
        TimeConfig {

    /*
     * (non-Javadoc)
     * 
     * @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel.
     * ChannelHandlerContext, java.lang.Object)
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        System.out.println("The time server receive order : " + body);
        String currentTime = QUERY.equalsIgnoreCase(body) ? new Date()
                .toString() : "BAD ORDER";
        currentTime += System.getProperty("line.separator");
        System.out.println("currentTime : " + currentTime);
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes("utf-8"));
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        // 當出現異常時,釋放資源.
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

}

 

Clientreact

package time.client.impl;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * TODO
 * 
 * @description
 * @author ez
 * @time 2015年5月25日 下午3:17:29
 */
public class NTimeClient {

    public void connect(int port, String host) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // 發起異步連接操做
            ChannelFuture future = bootstrap.connect(host, port).sync();

            // 等待客戶端鏈路關閉
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        NTimeClient client = new NTimeClient();
        client.connect(9091, "localhost");
    }
}

ClientHandlerbootstrap

package time.server.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;

import time.TimeConfig;

/**
 * TODO
 * 
 * @description
 * @author ez
 * @time 2015年5月25日 下午3:06:09
 */
public class TimeServerHandler extends ChannelHandlerAdapter implements
        TimeConfig {

    /*
     * (non-Javadoc)
     * 
     * @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel.
     * ChannelHandlerContext, java.lang.Object)
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        System.out.println("The time server receive order : " + body);
        String currentTime = QUERY.equalsIgnoreCase(body) ? new Date()
                .toString() : "BAD ORDER";
        currentTime += System.getProperty("line.separator");
        System.out.println("currentTime : " + currentTime);
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes("utf-8"));
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        // 當出現異常時,釋放資源.
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

}

 

2: DelimiterBasedFrameDecoder
   

  

DelimiterBasedFrameDecoder 跟 LineBasedFrameDecoder 很類似 , 只是增長以自定義的分割符.網絡

  

                    ByteBuf buf = Unpooled.copiedBuffer("$".getBytes("utf-8"));
                    ch.pipeline().addLast(
                            new DelimiterBasedFrameDecoder(1024, buf));

 

3: FixedLengthFrameDecoder 定長的分割器. 
 
                    ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));

 

 
以上內容出自 : <Netty  權威指南> 
相關文章
相關標籤/搜索