netty源碼解析之netty解碼器

1 ByteToMessageDecoder.java類的基本結構如下:
 
 
其中ByteToMessageDecoder類是累加器的基礎類,其核心方法如下:
// ByteToMessageDecoder.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        // 首先會從RECYCLE的stack中取一個對象 return RECYCLER.get();
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            first = cumulation == null;
            // 判斷是否第一次讀,第一次讀直接賦值
            if (first) {
                cumulation = data;
            } else {
                // A點: 非第一次進行字節流的累加
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // B點:對累加的字節流進行解析,並解析到out中(B點會提供一個decode(ctx, in, out);給子類進行實現 --模板方法模式)
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
           } else if (++ numReads >= discardAfterReads) {
                // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                // See https://github.com/netty/netty/issues/4275
                numReads = 0;
                discardSomeReadBytes();
            }
 
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            // C點:將字節流byteBuf向下傳播
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}
 
首先分析上面A點:非第一次進行字節流的累加 ,跟蹤源碼如下:
  • 分析當前對應的對象變量如下:private Cumulator cumulator = MERGE_CUMULATOR;
  • 繼續跟蹤源碼如下:
// ByteToMessageDecoder.java
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        // 判斷當前是否還有空間可寫
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) {
            // 如果有就進行拓展
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        // 將新來的字節流加入到buf中,然後釋放
        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};
拓展的方法如下:
// 總的來說就是拿到一個新的加上舊的空間,然後將舊的寫到新申請的buf中,然後釋放舊的
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
    ByteBuf oldCumulation = cumulation;
    cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
    cumulation.writeBytes(oldCumulation);
    oldCumulation.release();
    return cumulation;
}