Netty5_ByteToMessageDecoder_源碼解析

歡迎你們關注個人微博 http://weibo.com/hotbain 會將發佈的開源項目技術貼經過微博通知你們,但願你們可以互勉共進!謝謝!也很但願可以獲得你們對我博文的反饋,寫出更高質量的文章!!java

ByteToMessageDecoder在Netty中起着很大的做用,用來解決半包字節累積問題。粘貼部分重要代碼(固然自己方法不是很
git

public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {

    ByteBuf cumulation;
    private boolean singleDecode;
    private boolean first;

    protected ByteToMessageDecoder() {
        if (getClass().isAnnotationPresent(Sharable.class)) {//由於每個ByteToMessageDecoder都有針對某個socket的累積對象
//故是一個不能夠共享的對象類型
            throw new IllegalStateException("@Sharable annotation is not allowed");
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    //緩衝區的大小沒有超過須要寫入的數據的大小
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());
                    }
                    cumulation.writeBytes(data);//將數據寫入到積累對象中
                    data.release();//釋放bytebuffer(heap或者direct)--經過引用的方式進行釋放緩衝區
                }
                //收集完畢以後解析收集到的字符串
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
//若是累積對象中沒有數據了(由於全部發送的數據剛恰好n個msg)
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release(); 
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;
                //針對解析後的out結果,逐個調用message
                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private void expandCumulation(ChannelHandlerContext ctx, int readable) {
        ByteBuf oldCumulation = cumulation;//新的容量=舊的容量+可讀取的數量  ---在此處的擴展和初次的分配都是經過同一個allocator進行分配的
        cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);//複製的過程
        oldCumulation.release();//釋放老的對象
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (cumulation != null && !first) {//cumulation可讀的數據爲0那麼就不會調用緊跟着的代碼段
            cumulation.discardSomeReadBytes();//若是存在半包得話,那麼就釋放沒必要要的空間(有時間的話,咱們會將一個Netty中ByteBuf的構造)
        }
        if (decodeWasNull) {
            decodeWasNull = false;
            if (!ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        }
        ctx.fireChannelReadComplete();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        RecyclableArrayList out = RecyclableArrayList.newInstance();
        try {//若是channel不活動了得話,對累積對象進行解碼。
            if (cumulation != null) {
                callDecode(ctx, cumulation, out);
                decodeLast(ctx, cumulation, out);
            } else {
                decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            if (cumulation != null) {
                cumulation.release();
                cumulation = null;
            }
            int size = out.size();
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.get(i));
            }
            ctx.fireChannelInactive();
            out.recycle();
        }
    }

    /**
     * Called once data should be decoded from the given {@link ByteBuf}. This method will call
     * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added
     */
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
                int oldInputLength = in.readableBytes();
                decode(ctx, in, out);//特別注意: 調用完若是msg被解析出來的話,那麼累積對象的readableBytes必定會發生變化
                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {//若是此handler被移除
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }
}

由此總結以下:github

  1. 累積對象主要是承載socket接收的字節數的。每一次decode,當有msg生成的時候,readableBytes都會減少socket

  2. 當一個socket處於inactive時,會對累積對象的數據進行解析。而後釋放累積對象ide

  3. 噹噹前的累積對象不能承載數據的時候,須要進行擴展(調用Allocator建立個新的bytebuf,而後copy一下數據)oop

  4. 該handler是一個有狀態的handler,須要記憶每個socket的發送的字節.而後再去decode成msg對象
    ui

相關文章
相關標籤/搜索