netty學習之四 編碼解碼和傳輸序列化

  • netty發送或接收消息後,Netty必須將消息數據從一種形式轉化爲另外一種。接收消息後,須要將消息從字節碼轉成Java對象(由某種解碼器解碼);發送消息前,須要將Java對象轉成字節(由某些類型的編碼器進行編碼)。這種轉換通常發生在網絡程序中,由於網絡上只能傳輸字節數據。git

  • 嚴格的說其餘handlers能夠作編碼器和適配器,使用不一樣的Adapter classes取決你想要作什麼。若是是解碼器則有一個ChannelInboundHandlerAdapter或ChannelInboundHandler,全部的解碼器都繼承或實現它們。「channelRead」方法/事件被覆蓋,這個方法從入站(inbound)通道讀取每一個消息。重寫的channelRead方法將調用每一個解碼器的「decode」方法並經過ChannelHandlerContext.fireChannelRead(Object msg)傳遞給ChannelPipeline中的下一個ChannelInboundHandler。
      相似入站消息,當你發送一個消息出去(出站)時,除編碼器將消息轉成字節碼外還會轉發到下一個ChannelOutboundHandler。
  • 下面看實際狀況開發中發送數據的encode方法github

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
        @Override
        public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
                throws Exception {
            if (remotingCommand == null) {
                LOGGER.error("Message is null");
                return;
            }
            try {
                ByteBuffer byteBuffer = codec.encode(remotingCommand);
                out.writeBytes(byteBuffer);
            } catch (Exception e) {
                Channel channel = new NettyChannel(ctx);
                LOGGER.error("encode exception, addr={}, remotingCommand={}", RemotingHelper.parseChannelRemoteAddr(channel), remotingCommand.toString(), e);
                RemotingHelper.closeChannel(channel);
            }
        }
    }
  • 使用code.encode的做用是根據本身定義的序列化方法將須要的數據進行序列化,衆所周知Java的序列化性能極低,在一些對性能要求較高的場景是不合適的,因此須要提供一些其它常見的序列化方式,好比hession,kryo,protobuf,jackson等序列化方式
public ByteBuffer encode(RemotingCommand remotingCommand) throws Exception {
       RemotingSerializable serializable =
               getRemotingSerializable(remotingCommand.getSid());
       // header length size
       int length = 4;
       // serializable id (int)
       length += 4;
       //  header data length
       byte[] headerData = serializable.serialize(remotingCommand);
       length += headerData.length;
       byte[] bodyData = null;
       byte[] bodyClass = null;
       RemotingCommandBody body = remotingCommand.getBody();
       if (body != null) {
           // body data
           bodyData = serializable.serialize(body);
           length += bodyData.length;
           bodyClass = body.getClass().getName().getBytes();
           length += bodyClass.length;
           length += 4;
       }
       ByteBuffer result = ByteBuffer.allocate(4 + length);
       // length
       result.putInt(length);
       // serializable Id
       result.putInt(serializable.getId());
       // header length
       result.putInt(headerData.length);
       // header data
       result.put(headerData);
       if (bodyData != null) {
           //  body length
           result.putInt(bodyData.length);
           //  body data
           result.put(bodyData);
           // body class
           result.put(bodyClass);
       }
       result.flip();
       return result;
   }
  • 序列化接口:
public interface RemotingSerializable {

    int getId();

    byte[] serialize(final Object obj) throws Exception;

    <T> T deserialize(final byte[] data, Class<T> clazz) throws Exception;
}

經過實現該接口能夠實現自定義的序列化方式,下面看下,當接收到網絡回傳的字節數據如何進行反序列化呢網絡

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
        public NettyDecoder() {
            super(appContext.getConfig().getParameter(ExtConfig.NETTY_FRAME_LENGTH_MAX, Constants.DEFAULT_BUFFER_SIZE), 0, 4, 0, 4);
        }
        @Override
        public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            try {
                ByteBuf frame = (ByteBuf) super.decode(ctx, in);
                if (frame == null) {
                    return null;
                }

                byte[] tmpBuf = new byte[frame.capacity()];
                frame.getBytes(0, tmpBuf);
                frame.release();

                ByteBuffer byteBuffer = ByteBuffer.wrap(tmpBuf);
                return codec.decode(byteBuffer);
            } catch (Exception e) {
                Channel channel = new NettyChannel(ctx);
                LOGGER.error("decode exception, {}", RemotingHelper.parseChannelRemoteAddr(channel), e);
                RemotingHelper.closeChannel(channel);
            }
            return null;
        }
    }

調用codec的反序列化方法,將byteBuffer轉換成自定義的對象。app

public RemotingCommand decode(ByteBuffer byteBuffer) throws Exception {
       int length = byteBuffer.limit();
       int serializableId = byteBuffer.getInt();
       RemotingSerializable serializable =
               getRemotingSerializable(serializableId);

       int headerLength = byteBuffer.getInt();
       byte[] headerData = new byte[headerLength];
       byteBuffer.get(headerData);

       RemotingCommand cmd = serializable.deserialize(headerData, RemotingCommand.class);

       int remaining = length - 4 - 4 - headerLength;

       if (remaining > 0) {

           int bodyLength = byteBuffer.getInt();
           int bodyClassLength = remaining - 4 - bodyLength;

           if (bodyLength > 0) {

               byte[] bodyData = new byte[bodyLength];
               byteBuffer.get(bodyData);

               byte[] bodyClassData = new byte[bodyClassLength];
               byteBuffer.get(bodyClassData);

               cmd.setBody((RemotingCommandBody) serializable.deserialize(bodyData, Class.forName(new String(bodyClassData))));
           }
       }
       return cmd;
   }

至此,整個數據傳輸中的序列化和反序列化過程結束,具體代碼已經上傳到github。ide

完整代碼連接:https://github.com/winstonelei/Smt性能

相關文章
相關標籤/搜索