netty學習之二 分包、組包、粘包處理

在數據傳輸中,咱們發送的數據包以下所示java

+-----+-----+-----+git

| ABC | DEF | GHI |github

+-----+-----+-----+json

而實際接收的包的格式爲:服務器

+----+-------+---+---+ | AB | CDEFG | H | I | +----+-------+---+---+app

產生的緣由爲:數據在傳輸過程當中,產生數據包碎片(TCP/IP數據傳輸時大數據包沒法一次傳輸,被拆分紅小數據包,小數據包即爲數據包碎片),這就形成了實際接收的數據包和發送的數據包不一致的狀況。框架

那麼通常狀況下咱們是如何解決這種問題的呢?我所知道的有這幾種方案:分佈式

  1. 消息定長
  2. 在包尾增長一個標識,經過這個標誌符進行分割
  3. 將消息分爲兩部分,也就是消息頭和消息尾,消息頭中寫入要發送數據的總長度,一般是在消息頭的第一個字段使用int值來標識發送數據的長度。
  • 首先看netty中消息定長如何處理的,首先在服務端設定當前服務端接受大小爲固定長度,sc.pipeline().addLast(new FixedLengthFrameDecoder(5));本例中指定長度爲5個字符串大小,此類是netty框架提供。
ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup)
		 .channel(NioServerSocketChannel.class)
		 .option(ChannelOption.SO_BACKLOG, 1024)
		 .option(ChannelOption.SO_SNDBUF, 32*1024)
		 .option(ChannelOption.SO_RCVBUF, 32*1024)
		 .childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				//設置定長字符串接收
				sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
				//設置字符串形式的解碼
				sc.pipeline().addLast(new StringDecoder());
				sc.pipeline().addLast(new ServerHandler());
			}
		});

客戶端代碼在建立handler的時候也要指定長度大小,而且與服務器端指定大小一致便可ide

EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		 .channel(NioSocketChannel.class)
		 .handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new FixedLengthFrameDecoder(5));//制定傳輸數據大小
				sc.pipeline().addLast(new StringDecoder());
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
		cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes()));
		cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccccc".getBytes()));

運行結果以下:oop

輸入圖片說明

  • 嘗試了第一種方案後,下面嘗試下第二種方案,這種定長的方式直接限制了傳輸信息的大小,並且要服務端和客戶端同時指定大小感受並非太好,下面看下指定分隔符是如何處理的呢,首先服務端指定分隔符ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes());而後建立new DelimiterBasedFrameDecoder(1024, buf)分割符指定Decoder
ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup)
		 .channel(NioServerSocketChannel.class)
		 .option(ChannelOption.SO_BACKLOG, 1024)
		 .option(ChannelOption.SO_SNDBUF, 32*1024)
		 .option(ChannelOption.SO_RCVBUF, 32*1024)
		 .childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				//設置特殊分隔符
				ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes());
				sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
				//設置字符串形式的解碼
				sc.pipeline().addLast(new StringDecoder());
				sc.pipeline().addLast(new ServerHandler());
			}
		});
  • 相同的在客戶端須要一樣指定客戶端分割符decoder,並向服務端發送消息以下
Bootstrap b = new Bootstrap();
		b.group(group)
		 .channel(NioSocketChannel.class)
		 .handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes());
				sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
				sc.pipeline().addLast(new StringDecoder());
				sc.pipeline().addLast(new ClientHandler());
			}
		});
	ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
		cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbasfab*_".getBytes()));
		cf.channel().writeAndFlush(Unpooled.wrappedBuffer("ccsdfasfcc*_".getBytes()));
		cf.channel().writeAndFlush(Unpooled.wrappedBuffer("fffff*_".getBytes()));
		//等待客戶端端口關閉
		cf.channel().closeFuture().sync();

輸入圖片說明

下面介紹第三種處理方案,也是不少分佈式框架中使用的方式,

Netty4自己自帶了ObjectDecoder,ObjectEncoder來實現自定義對象的序列 化, 可是用的是java內置的序列化,因爲java序列化的性能並非很好, 因此不少時候咱們須要用其餘序列化方式,常見的有 Kryo,Jackson,fastjson,protobuf等。這裏要寫的其實用什麼序列化不是重點,而是咱們怎麼設計咱們的Decoder和 Encoder。

首先咱們寫一個Encoder,咱們繼承自MessageToByteEncoder<T> ,把對象轉換成byte,繼承這個對象,會要求咱們實現一個encode方法:

@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
	byte[] body = convertToBytes(msg);  //將對象轉換爲byte,僞代碼,具體用什麼進行序列化,大家自行選擇。可使用我上面說的一些
	int dataLength = body.length;  //讀取消息的長度
	out.writeInt(dataLength);  //先將消息長度寫入,也就是消息頭
	out.writeBytes(body);  //消息體中包含咱們要發送的數據
}

那麼當咱們在Decode的時候,該怎麼處理髮送過來的數據呢?這裏咱們繼承ByteToMessageDecoder方法,繼承這個對象,會要求咱們實現一個decode方法

public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
	if (in.readableBytes() < HEAD_LENGTH) {  //這個HEAD_LENGTH是咱們用於表示頭長度的字節數。  因爲上面咱們傳的是一個int類型的值,因此這裏HEAD_LENGTH的值爲4.
		return;
	}
	in.markReaderIndex();                  //咱們標記一下當前的readIndex的位置
	int dataLength = in.readInt();       // 讀取傳送過來的消息的長度。ByteBuf 的readInt()方法會讓他的readIndex增長4
	if (dataLength < 0) { // 咱們讀到的消息體長度爲0,這是不該該出現的狀況,這裏出現這狀況,關閉鏈接。
		ctx.close();
	}

	if (in.readableBytes() < dataLength) { //讀到的消息體長度若是小於咱們傳送過來的消息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方
		in.resetReaderIndex();
		return;
	}

	byte[] body = new byte[dataLength];  //  嗯,這時候,咱們讀到的長度,知足咱們的要求了,把傳送過來的數據,取出來吧~~
	in.readBytes(body);  //
	Object o = convertToObject(body);  //將byte數據轉化爲咱們須要的對象。僞代碼,用什麼序列化,自行選擇
	out.add(o);  
}

完整代碼連接:https://github.com/winstonelei/Smt

相關文章
相關標籤/搜索