Protobuf3 + Netty4: 在socket上傳輸多種類型的protobuf數據

Protobuf序列化的字節流數據是不能自描述的,當咱們經過socket把數據發送到Client時,Client必須知道發送的是什麼類型的數據,才能正確的反序列化它。這嚴重影響限制了C/S功能的實現,不解決的話信道事實上只能傳輸一種類型的數據。本文講解一下我用的解決辦法,雖然我以爲應該有官方的實現更合理,即原生支持Protobuf的自描述。html

(在金融領域,有一個叫FAST的協議,基本原理和Protobuf相同,而且有更高的壓縮率,而且序列化後的字節流是自描述的,能夠自動反序列化爲對應的模板的數據(模板至關於.proto文件),可是時間效率比protobuf差,你們也能夠關注一下。)java

 

解決方案一

首先,介紹另一種實現,在protobuf官方wiki中描述的一種workaround,經過定義一種用於自描述的類型:數組

message SelfDescribingMessage {
  // Set of .proto files which define the type.
  required FileDescriptorSet proto_files = 1;

  // Name of the message type.  Must be defined by one of the files in
  // proto_files.
  required string type_name = 2;

  // The message data.
  required bytes message_data = 3;
}

(參考:https://developers.google.com/protocol-buffers/docs/techniques#self-description服務器

 

把實際要傳輸的類型的字節數組放在message_data字段中,用proto_files和type_name字段來描述它的proto文件和類型。這樣,信道上傳輸的都是SelfDescribingMessage類型,可是其上的負載能夠是任何類型的數據。socket

我沒有試過這種方式。我不太願意使用這種方式的緣由是,很顯然,這樣作須要進行2次序列化和2次反序列化,byte數組也要被建立2次。若是對應時延和性能敏感的系統,這樣作不夠好。ide

 

解決方案二

今天主要要介紹的方案。在protobuf序列化的前面,加上一個自定義的頭,這個頭包含序列化的長度和它的類型。在解壓的時候根據包頭來反序列化。性能

 

假設socket上要傳輸2個類型的數據,股票行情信息和期權行情信息:ui

股票的.proto定義:this

syntax = "proto3";

package test.model.protobuf;

option java_package = "test.model.protobuf";

message StockTick {
    string stockId = 1;
    int price = 2;
}

 

期權的.proto定義:google

syntax = "proto3";

package test.model.protobuf;

option java_package = "test.model.protobuf";

message OptionTick {
    string optionId = 1;
    string securityId = 2;
    int price = 3;
}

 

netty4官方事實上已經實現了protobuf的編解碼的插件,可是隻能用於傳輸單一類型的protobuf序列化。我這裏截取一段netty代碼,熟悉netty的同窗立刻就能理解它的做用:

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new ProtobufVarint32FrameDecoder());
            pipeline.addLast(new ProtobufDecoder(StockTickOuterClass.StockTick.getDefaultInstance()));
            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast(new ProtobufEncoder());
            pipeline.addLast(new CustomProtoServerHandler());
        }

 

看以上代碼高亮部分,netty4官方的編解碼器必須指定單一的protobuf類型才行。具體每一個類的做用:

ProtobufEncoder:用於對Probuf類型序列化。
ProtobufVarint32LengthFieldPrepender:用於在序列化的字節數組前加上一個簡單的包頭,只包含序列化的字節長度。
ProtobufVarint32FrameDecoder:用於decode前解決半包和粘包問題(利用包頭中的包含數組長度來識別半包粘包)
ProtobufDecoder:反序列化指定的Probuf字節數組爲protobuf類型。

 

咱們能夠參考以上官方的編解碼代碼,將實現咱們客戶化的protobuf編解碼插件,可是要支持多種不一樣類型protobuf數據在一個socket上傳輸:

 

編碼器CustomProtobufEncoder:

import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 參考ProtobufVarint32LengthFieldPrepender 和 ProtobufEncoder
 */
@Sharable
public class CustomProtobufEncoder extends MessageToByteEncoder<MessageLite> {
    
HangqingEncoder hangqingEncoder;
    
    public CustomProtobufEncoder(HangqingEncoder hangqingEncoder)
    {
        this.hangqingEncoder = hangqingEncoder;
    }
    
    @Override
    protected void encode(
            ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception {
        

        byte[] body = msg.toByteArray();
        byte[] header = encodeHeader(msg, (short)body.length);
        
        out.writeBytes(header);
        out.writeBytes(body);
        
        return;
    }
    
    private byte[] encodeHeader(MessageLite msg, short bodyLength) {
        byte messageType = 0x0f;
        
        if (msg instanceof StockTickOuterClass.StockTick) {
            messageType = 0x00;
        } else if (msg instanceof OptionTickOuterClass.OptionTick) {
            messageType = 0x01;
        }
        
        byte[] header = new byte[4];
        header[0] = (byte) (bodyLength & 0xff);
        header[1] = (byte) ((bodyLength >> 8) & 0xff);
        header[2] = 0; // 保留字段
        header[3] = messageType;

        return header;

    }
}

 

CustomProtobufEncoder序列化傳入的protobuf類型,而且爲它建立了一個4個字節的包頭,格式以下

 

body長度(low) body長度
(high)
保留字節 類型

 

其中的encodeHeader方法具體的實現要根據你要傳輸哪些protobuf類型來修改代碼,也能夠稍加設計避免使用太多的if…else。

 

解碼器CustomProtobufDecoder:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import com.google.protobuf.MessageLite;

/**
 * 參考ProtobufVarint32FrameDecoder 和 ProtobufDecoder
 */

public class CustomProtobufDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() > 4) { // 若是可讀長度小於包頭長度,退出。
            in.markReaderIndex();

            // 獲取包頭中的body長度
            byte low = in.readByte();
            byte high = in.readByte();
            short s0 = (short) (low & 0xff);
            short s1 = (short) (high & 0xff);
            s1 <<= 8;
            short length = (short) (s0 | s1);

            // 獲取包頭中的protobuf類型
            in.readByte();
            byte dataType = in.readByte();

            // 若是可讀長度小於body長度,恢復讀指針,退出。
            if (in.readableBytes() < length) {
                in.resetReaderIndex();
                return;
            }

            // 讀取body
            ByteBuf bodyByteBuf = in.readBytes(length);

            byte[] array;
            int offset;

            int readableLen= bodyByteBuf.readableBytes();
            if (bodyByteBuf.hasArray()) {
                array = bodyByteBuf.array();
                offset = bodyByteBuf.arrayOffset() + bodyByteBuf.readerIndex();
            } else {
                array = new byte[readableLen];
                bodyByteBuf.getBytes(bodyByteBuf.readerIndex(), array, 0, readableLen);
                offset = 0;
            }
            
            //反序列化
            MessageLite result = decodeBody(dataType, array, offset, readableLen);
            out.add(result);
        }
    }

    public MessageLite decodeBody(byte dataType, byte[] array, int offset, int length) throws Exception {
        if (dataType == 0x00) {
            return StockTickOuterClass.StockTick.getDefaultInstance().
                    getParserForType().parseFrom(array, offset, length);

        } else if (dataType == 0x01) {
            return OptionTickOuterClass.OptionTick.getDefaultInstance().
                    getParserForType().parseFrom(array, offset, length);
        }

        return null; // or throw exception
    }
}

 

CustomProtobufDecoder實現了2個功能,1)經過包頭中的長度信息來解決半包和粘包。 2)把消息body反序列化爲對應的protobuf類型(根據包頭中的類型信息)。

其中的decodeBody方法具體的實現要根據你要傳輸哪些protobuf類型來修改代碼,也能夠稍加設計避免使用太多的if…else。

 

在Netty服務器上應用編解碼器

如何把咱們自定義的編解碼用於netty Server:

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("decoder",new CustomProtobufDecoder());
            pipeline.addLast("encoder",new CustomProtobufEncoder());
            pipeline.addLast(new CustomProtoServerHandler());
        }

 

Binhua Liu原創文章,轉載請註明原地址http://www.cnblogs.com/Binhua-Liu/p/5577622.html

相關文章
相關標籤/搜索