Dubbo處理TCP拆包粘包問題

#Dubbo處理TCP拆包粘包問題java

在TCP網絡傳輸工程中,因爲TCP包的緩存大小限制,每次請求數據有可能不在一個TCP包裏面,或者也可能多個請求的數據在一個TCP包裏面。那麼若是合理的decode接受的TCP數據很重要,須要考慮TCP拆包和粘包的問題。咱們知道在Netty提供了各類Decoder來解決此類問題,好比LineBasedFrameDecoder,LengthFieldBasedFrameDecoder等等,可是這些都是處理一些通用簡單的協議棧,並不能處理高度自定義的協議棧。因爲dubbo協議是自定義協議棧,而且包含消息頭和消息體兩部分,而消息頭中包含消息類型、協議版本、協議魔數以及payload長度等信息。因此使用Netty自帶的處理方案可能沒法知足Dubbo解析自身協議的需求,因此須要Dubbo本身來處理,那本身處理,就須要本身處理TCP的拆包和粘包的問題。這裏就對Dubbo處理此類問題進行探討,從而加深本身對它的理解。web

##說明 此處所描述的協議是dubbo協議,其餘的協議好比http,webservice等協議不是這裏討論範圍。而且這裏使用的通訊框架以Netty來說解,Mina以及grizzly也不在種類討論範圍。緩存

##NettyCodecAdapter NettyCodecAdapter是對dubbo協議解析的入口,裏面包含decoder和encoder兩部分,而TCP的拆包和粘包主要是decoder部分,因此encoder這裏不進行討論。在NettyCodecAdapter中的decoder是由InternalDecoder來實現,它的父類是Netty的SimpleChannelUpstreamHandler能夠接受全部inbound消息,那麼就能夠對接受的消息進行decode。這裏須要說明一下對於某一個Channel都有一個私有的InternalDecoder對象,並非和其餘的Channel共享,這裏就避免了併發問題,因此在InternalDecoder裏面能夠用單線程的方式去看待,這樣就比較容易理解。網絡

###InternalDecoder 每一個channel的inbound消息都會發送到InternalDecodermessageReceived方法,而dubbo會先將接受的消息緩存到InternalDecoderbuffer屬性中,這個變量很重要,後面會討論。下面是messageReceived方法中將接受的消息負載到buffer實現。併發

private class InternalDecoder extends SimpleChannelUpstreamHandler {

        private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
            com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            Object o = event.getMessage();
            if (! (o instanceof ChannelBuffer)) {
                ctx.sendUpstream(event);
                return;
            }

            ChannelBuffer input = (ChannelBuffer) o;
            int readable = input.readableBytes();
            if (readable <= 0) {
                return;
            }

            com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
            if (buffer.readable()) {
                if (buffer instanceof DynamicChannelBuffer) {
                    buffer.writeBytes(input.toByteBuffer());
                    message = buffer;
                } else {
                    int size = buffer.readableBytes() + input.readableBytes();
                    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                        size > bufferSize ? size : bufferSize);
                    message.writeBytes(buffer, buffer.readableBytes());
                    message.writeBytes(input.toByteBuffer());
                }
            } else {
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                    input.toByteBuffer());
            }

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            Object msg;
            int saveReaderIndex;

            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());
            } finally {
                if (message.readable()) {
                    message.discardReadBytes();
                    buffer = message;
                } else {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                }
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            ctx.sendUpstream(e);
        }
    }

首先是判斷當前decoder對象的buffer中是否有能夠讀取的消息,若是有則進行合併,而且把對象引用賦予message局部變量,因此message則獲取了當前channel的inbound消息。獲得inbound消息以後,那麼接下來就是對協議的解析了。app

do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());

這裏首先要作的是把當前message的讀索引保存到局部變量saveReaderIndex中,用於後面的消息回滾。後面緊接着是對消息的decode,這裏的codecDubboCountCodec對象實體,這裏須要注意一點,DubboCountCodecdecode每次只會解析出一個完整的dubbo協議棧,帶着這個看看decode的實現。框架

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int save = buffer.readerIndex();
        MultiMessage result = MultiMessage.create();
        do {
            Object obj = codec.decode(channel, buffer);
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                buffer.readerIndex(save);
                break;
            } else {
                result.addMessage(obj);
                logMessageLength(obj, buffer.readerIndex() - save);
                save = buffer.readerIndex();
            }
        } while (true);
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        if (result.size() == 1) {
            return result.get(0);
        }
        return result;
    }

這裏暫存了當前buffer的讀索引,一樣也是爲了後面的回滾。能夠看到當decode返回的是NEED_MORE_INPUT則表示當前的buffer中數據不足,不能完整解析出一個dubbo協議棧,同時將buffer的讀索引回滾到以前暫存的索引而且退出循環,將結果返回。那接下來看看何時會返回NEED_MORE_INPUT,最終會定位到在ExchangeCodecdecode方法會解析出協議棧。tcp

protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // check magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH 
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i ++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            return super.decode(channel, buffer, readable, header);
        }
        // check length.
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // get data length.
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        if( readable < tt ) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

這個方法開始是對telnet協議進行解析(因爲dubbo支持telnet鏈接,因此這裏提供了支持,能夠忽略這一部分)。看到會有兩個地方返回NEED_MORE_INPUT,一個是當前buffer的可讀長度尚未消息頭長,說明當前buffer連協議棧的頭都不完整,因此須要繼續讀取inbound數據,另外一個是當前buffer包含了完整的消息頭,即可以獲得payload的長度,發現它的可讀的長度,並無包含整個協議棧的數據,因此也須要繼續讀取inbound數據。若是上面兩個狀況都不復核,那麼說明當前的buffer至少包含一個dubbo協議棧的數據,那麼從當前buffer中讀取一個dubbo協議棧的數據,解析出一個dubbo數據,固然這裏可能讀取完一個dubbo數據以後還會有剩餘的數據。ide

上面對dubbo解析出一個完整的dubbo協議棧過程進行了討論,可是尚未對TCP的拆包和粘包問題作過多的討論。下面結合上面內容作一個綜合討論。url

我這裏對TCP拆包和粘包分別列舉一個場景來討論。

###當反生TCP拆包問題時候 這裏假設以前尚未發生過任何數據交互,系統剛剛初始化好,那麼這個時候在InternalDecoder裏面的buffer屬性會是EMPTY_BUFFER。當發生第一次inbound數據的時候,第一次在InternalDecoder裏面接收的確定是dubbo消息頭的部分(這個由TCP協議保證),因爲發生了拆包狀況,那麼此時接收的inbound消息可能存在一下幾種狀況

一、當前inbound消息只包含dubbo協議頭的一部分

二、當前inbound消息只包含dubbo的協議頭

三、當前inbound消息只包含dubbo消息頭和部分payload消息

經過上面的討論,咱們知道發生上面三種狀況,都會觸發ExchangeCodec返回NEED_MORE_INPUT,因爲在DubboCountCodec對餘返回NEED_MORE_INPUT會回滾讀索引,因此此時的buffer裏面的數據能夠看成並無發生過讀取操做,而且DubboCountCodec的decode也會返回NEED_MORE_INPUT,在InternalDecoder對於當判斷返回NEED_MORE_INPUT,也會進行讀索引回滾,而且退出循環,最後會執行finally內容,這裏會判斷inbound消息是否還有可讀的,因爲在DubboCountCodec裏面進行了讀索引回滾,因此次數的buffer裏面是完整的inbound消息,等待第二次的inbound消息的到來,當第二次inbound消息過來的時候,再次通過上面的判斷。

###當發生TCP粘包的時候 當發生粘包的時候是tcp將一個以上的dubbo協議棧放在一個tcp包中,那麼有可能發生下面幾種狀況

一、當前inbound消息只包含一個dubbo協議棧

二、當前inbound消息包含一個dubbo協議棧,同時包含部分另外一個或者多個dubbo協議棧內容

若是發生只包含一個協議棧,那麼當前buffer經過ExchangeCodec解析協議以後,當前的buffer的readeIndex位置應該是 buffer尾部,那麼在返回到InternalDecodermessage的方法readable返回的是false,那麼就會對buffer從新賦予EMPTY_BUFFER實體,而針對包含一個以上的dubbo協議棧,固然也會解析出其中一個dubbo協議棧,可是通過ExchangeCodec解析以後,message的readIndex不在message尾部,因此messagereadable方法返回的是true。那麼則會繼續遍歷message,讀取下面的信息。最終要麼message恰好整數倍包含完整的dubbo協議棧,要不ExchangeCodec返回NEED_MORE_INPUT,最後將未讀完的數據緩存到buffer中,等待下次inbound事件,將buffer中的消息合併到下次的inbound消息中,種類又回到了拆包的問題上。

##總結

dubbo在處理tcp的粘包和拆包時是藉助InternalDecoderbuffer緩存對象來緩存不完整的dubbo協議棧數據,等待下次inbound事件,合併進去。因此說在dubbo中解決TCP拆包和粘包的時候是經過buffer變量來解決的。

相關文章
相關標籤/搜索