netty 對 protobuf 協議的解碼與包裝探究(2)

netty 默認支持protobuf 的封裝與解碼,若是通訊雙方都使用netty則沒有什麼障礙,但若是客戶端是其它語言(C#)則須要本身仿寫與netty一致的方式(解碼+封裝),提早是必須很瞭解netty是如何進行封裝與解碼的。這裏主要經過讀源碼主要類ProtobufVarint32FrameDecoder(解碼)+ProtobufVarint32LengthFieldPrepender(封裝) 來解析其原理與實現。算法

文章來源http://www.cnblogs.com/tankaixiongide

一,支持protobuf 協議的默認實現

//配置服務端NIO線程組  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try{  
            ServerBootstrap b = new ServerBootstrap();  
            b.group(bossGroup, workerGroup)  
                .channel(NioServerSocketChannel.class)  
                .option(ChannelOption.SO_BACKLOG, 1024)  
                .handler(new LoggingHandler(LogLevel.INFO))  
                .childHandler(new ChannelInitializer<SocketChannel>() {  
  
                    @Override  
                    protected void initChannel(SocketChannel ch) throws Exception {  
                        ch.pipeline()  
                        .addLast(new ProtobufVarint32FrameDecoder())                          
                        .addLast(new ProtobufDecoder(  
                                SubscribeReqProto.SubscribeReq.getDefaultInstance()))                         
                        .addLast(new ProtobufVarint32LengthFieldPrepender())                          
                        .addLast(new ProtobufEncoder())                       
                        .addLast(new SubReqServerHandler());                          
                    }  
                      
                });  
            //綁定端口,同步等待成功  
            ChannelFuture f = b.bind(port).sync();  
            //等待服務端監聽端口關閉  
            f.channel().closeFuture().sync();  
              
        }finally{  
            //退出時釋放資源  
            bossGroup.shutdownGracefully();  
            workerGroup.shutdownGracefully();  
        }

 

以上是提供的默認實現。關鍵在於ProtobufVarint32FrameDecoder,ProtobufVarint32LengthFieldPrepender類。oop

二,ProtobufVarint32LengthFieldPrepender 編碼類

An encoder that prepends the the Google Protocol Buffers 128 Varints integer length field.this

* BEFORE DECODE (300 bytes) AFTER DECODE (302 bytes) * +---------------+ +--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * | (300 bytes) | | 0xAC02 | (300 bytes) | * +---------------+ +--------+---------------+

從類的說明來看, proto 消息格式如:Length + Protobuf Data (消息頭+消息數據) 方式,這裏特別須要注意的是頭長使用的是varints方式不是int ,消息頭描述消息數據體的長度。爲了更減小傳輸量,消息頭採用的是varint 格式。編碼

什麼是varint?spa

文章來源http://www.cnblogs.com/tankaixiongVarint 是一種緊湊的表示數字的方法。它用一個或多個字節來表示一個數字,值越小的數字使用越少的字節數。這能減小用來表示數字的字節數。 Varint 中的每一個 byte 的最高位 bit 有特殊的含義,若是該位爲 1,表示後續的 byte 也是該數字的一部分,若是該位爲 0,則結束。其餘的 7 個 bit 都用來表示數字。所以小於 128 的數字均可以用一個 byte 表示。大於 128 的數字,會用兩個字節。線程

更多可參見我上篇文章netty

最大的區別是消息頭它不是固定長度(常見是的使用INT 4個字節固定長度),Varint它用一個或多個字節來表示一個數字決定它不是固定長度!code

ProtobufVarint32LengthFieldPrepender 類的主要方法以下:orm

@Override
    protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = CodedOutputStream.computeRawVarint32Size(bodyLen);
        out.ensureWritable(headerLen + bodyLen);

        CodedOutputStream headerOut =
                CodedOutputStream.newInstance(new ByteBufOutputStream(out), headerLen);
        headerOut.writeRawVarint32(bodyLen);
        headerOut.flush();

        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

 

CodedOutputStream 主要是針對與varints相關操做類。 先看是如何寫消息頭的,獲得bodyLen 消息體長度而後調用computeRawVarint32Size()計算須要多少個字節,

public static int computeRawVarint32Size(final int value) {
    if ((value & (0xffffffff <<  7)) == 0) return 1;
    if ((value & (0xffffffff << 14)) == 0) return 2;
    if ((value & (0xffffffff << 21)) == 0) return 3;
    if ((value & (0xffffffff << 28)) == 0) return 4;
    return 5;
  }

 

0xffffffff << 7 二進制表示11111111111111111111111110000000 ,當與value &計算=0則表示value最大隻會是000000000000000000000001111111,一個字節足以。

經過&運算得出使用多少個字節就能夠表示當前數字。左移7位是與Varint定義相關,第一位須要保留給標識(1表示後續的 byte 也是該數字的一部分,0則結束)。要表示 int 32位 和多加的每一個字節第一個標識位,多出了4位,因此就最大會有5個字節。

獲得了varints值,而後如何寫入out? 再看關鍵方法writeRawVarint32()。

public void writeRawVarint32(int value) throws IOException {
    while (true) {
      //0x7F爲127
      if ((value & ~0x7F) == 0) {//是否小於127,小於則一個字節就能夠表示了
        writeRawByte(value);
        return;
      } else {
        writeRawByte((value & 0x7F) | 0x80);//因不於小127,加一高位標識
        value >>>= 7;//右移7位,再遞歸
      }
    }
  }
    /** Write a single byte. */
  public void writeRawByte(final byte value) throws IOException {
    if (position == limit) {
      refreshBuffer();
    }

    buffer[position++] = value;
  }
  
  private void refreshBuffer() throws IOException {
    if (output == null) {
      // We're writing to a single buffer.
      throw new OutOfSpaceException();
    }

    // Since we have an output stream, this is our buffer
    // and buffer offset == 0
    output.write(buffer, 0, position);
    position = 0;
  }

 

byte 的取值(-128~127) , 0x7F爲127 , 0x80爲128

循環取後7位,若是小於127則結束,不小於第一位加標識位1。 由於循環右移因此,實際位置顛倒了,解碼時須要倒過來再拼接。

消息頭由於是varint32可變字節,因此比較複雜些,消息體簡單直接writeBytes便可。

二,ProtobufVarint32FrameDecoder 解碼類

一樣對應CodedOutputStream有CodedInputStream類,操做解碼時的varints。

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        in.markReaderIndex();
        final byte[] buf = new byte[5];
        for (int i = 0; i < buf.length; i ++) {
            if (!in.isReadable()) {
                in.resetReaderIndex();
                return;
            }

            buf[i] = in.readByte();
            if (buf[i] >= 0) {
                int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
                if (length < 0) {
                    throw new CorruptedFrameException("negative length: " + length);
                }

                if (in.readableBytes() < length) {
                    in.resetReaderIndex();
                    return;
                } else {
                    out.add(in.readBytes(length));
                    return;
                }
            }
        }

        // Couldn't find the byte whose MSB is off.
        throw new CorruptedFrameException("length wider than 32-bit");
    }

 

前面說明了最大長度爲5個字節因此這裏聲明瞭5個長度的字節來讀取消息頭。

buf[i] >= 0 這裏爲何是>0而後就能夠解碼了呢?

仍是這句話:varints第一位表示後續的byte是不是該數字的一部分!

若是字節第一位爲1則表示後續還有字節是表示消息頭,當這個字節的第一位爲1則這個字節確定是負數(字節最高位表示正負),大於等於0表示描述消息體長度的數字已經讀完了。

而後調用readRawVarint32() 還原成int ,與以前 writeRawVarint32()反其道而行。

public int readRawVarint32() throws IOException {
   byte tmp = readRawByte();
   if (tmp >= 0) {
     return tmp;
   }
   int result = tmp & 0x7f;
   if ((tmp = readRawByte()) >= 0) {
     result |= tmp << 7;
   } else {
     result |= (tmp & 0x7f) << 7;
     if ((tmp = readRawByte()) >= 0) {
       result |= tmp << 14;
     } else {
       result |= (tmp & 0x7f) << 14;
       if ((tmp = readRawByte()) >= 0) {
         result |= tmp << 21;
       } else {
         result |= (tmp & 0x7f) << 21;
         result |= (tmp = readRawByte()) << 28;
         if (tmp < 0) {
           // Discard upper 32 bits.
           for (int i = 0; i < 5; i++) {
             if (readRawByte() >= 0) {
               return result;
             }
           }
           throw InvalidProtocolBufferException.malformedVarint();
         }
       }
     }
   }
   return result;
 }

 

取第N字節左移7*N位或|第一個字節拼接,實現了倒序拼接,最後獲得了消息體長度。而後根據獲得的消息體長度讀取數據,若是消息體長度不夠則回滾到markReaderIndex,等待數據。

四,總結

文章來源http://www.cnblogs.com/tankaixiong本文主要詳細介紹了netty 對 protobuf 協議的解碼與包裝。重點在消息頭 varint32的 算法表示上進行了說明。瞭解了varint32在協議中的實現,方便應用在其語言對接。

相關文章
相關標籤/搜索