Dubbo源碼分析(六)Dubbo通訊的編碼解碼機制

TCP的粘包拆包問題

咱們知道Dubbo的網絡通訊框架Netty是基於TCP協議的,TCP協議的網絡通訊會存在粘包和拆包的問題,先看下爲何會出現粘包和拆包java

  • 當要發送的數據大於TCP發送緩衝區剩餘空間大小,將會發生拆包
  • 待發送數據大於MSS(最大報文長度),TCP在傳輸前將進行拆包
  • 要發送的數據小於TCP發送緩衝區的大小,TCP將屢次寫入緩衝區的數據一次發送出去,將會發生粘包
  • 接收數據端的應用層沒有及時讀取接收緩衝區中的數據,將發生粘包

以上四點基本上是出現粘包和拆包的緣由,業界的解決方法通常有如下幾種:數組

  • 將每一個數據包分爲消息頭和消息體,消息頭中應該至少包含數據包的長度,這樣接收端在接收到數據後,就知道每個數據包的實際長度了(Dubbo就是這種方案)
  • 消息定長,每一個數據包的封裝爲固定長度,不夠補0
  • 在數據包的尾部設置特殊字符,好比FTP協議

Dubbo消息協議頭規範

在dubbo.io官網上找到一張圖,協議頭約定 網絡

dubbo的消息頭是一個定長的 16個字節的數據包:

  • magic High & Magic Low:2byte:0-7位 8-15位:相似java字節碼文件裏的魔數,用來判斷是否是dubbo協議的數據包,就是一個固定的數字
  • Serialization id:1byte:16-20位:序列id,21 event,22 two way 一個標誌位,是單向的仍是雙向的,23 請求或響應標識,
  • status:1byte: 24-31位:狀態位,設置請求響應狀態,request爲空,response纔有值
  • Id(long):8byte:32-95位:每個請求的惟一識別id(因爲採用異步通信的方式,用來把請求request和返回的response對應上)
  • data length:4byte:96-127位:消息體長度,int 類型

看完這張圖,大體能夠理解Dubbo通訊協議解決的問題,Dubbo採用消息頭和消息體的方式來解決粘包拆包,並在消息頭中放入了一個惟一Id來解決異步通訊關聯request和response的問題,下面以一次調用爲入口分爲四個部分來看下源碼具體實現app

Comsumer端請求編碼

private class InternalEncoder extends OneToOneEncoder {
        @Override
        protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                    com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
            return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
        }
    }
複製代碼

這個InternalEncoder是一個NettyCodecAdapter的內部類,咱們看到codec.encode(channel, buffer, msg)這裏,這個時候codec=DubboCountCodec,這個是在構造方法中傳入的,DubboCountCodec.encode-->ExchangeCodec.encode-->ExchangeCodec.encodeRequest框架

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        //獲取序列化方式,默認是Hessian序列化
        Serialization serialization = getSerialization(channel);
        // new了一個16位的byte數組,就是request的消息頭
        byte[] header = new byte[HEADER_LENGTH];
        // 往消息頭中set magic數字,這個時候header中前2個byte已經填充
        Bytes.short2bytes(MAGIC, header);

        // set request and serialization flag.第三個byte已經填充
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
        if (req.isEvent()) header[2] |= FLAG_EVENT;

        // set request id.這個時候是0
        Bytes.long2bytes(req.getId(), header, 4);

        // 編碼 request data.
        int savedWriteIndex = buffer.writerIndex();
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        //序列化
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            encodeEventData(channel, out, req.getData());
        } else {
            //編碼消息體數據
            encodeRequestData(channel, out, req.getData());
        }
        out.flushBuffer();
        bos.flush();
        bos.close();
        int len = bos.writtenBytes();
        checkPayload(channel, len);
        //在消息頭中設置消息體長度
        Bytes.int2bytes(len, header, 12);

        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
複製代碼

就是這方法,對request請求進行了編碼操做,具體操做我寫在代碼的註釋中,就是剛剛咱們分析的消息頭的代碼實現異步

Provider端請求解碼

看到NettyCodecAdapter中的InternalDecoder這個類的messageReceived方法,這裏就是Provider端對於Consumer端的request請求的解碼ide

public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            ···
            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
            ···
複製代碼

進入DubboCountCodec.decode--ExchangeCodec.decode編碼

// 檢查 magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH
           ···
        }
        // check 長度若是小於16位繼續等待
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }
        // get 消息體長度
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);
        //消息體長度+消息頭的長度
        int tt = len + HEADER_LENGTH;
        //若是總長度小於tt,那麼返回繼續等待
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            //解析消息體內容
            return decodeBody(channel, is, header);
        } finally {
           ···
    }
複製代碼

這裏對於剛剛的request進行解碼操做,具體操做步驟寫在註釋中了url

Provider端響應編碼

當服務端執行完接口調用,看下服務端的響應編碼,和消費端不同的地方是,服務端進入的是ExchangeCodec.encodeResponse方法spa

try {
            //獲取序列化方式 默認Hession協議
            Serialization serialization = getSerialization(channel);
            // 初始化一個16位的header
            byte[] header = new byte[HEADER_LENGTH];
            // set magic 數字
            Bytes.short2bytes(MAGIC, header);
            // set request and serialization flag.
            header[2] = serialization.getContentTypeId();
            if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
            // set response status.這裏返回的是OK
            byte status = res.getStatus();
            header[3] = status;
            // set request id.
            Bytes.long2bytes(res.getId(), header, 4);

            int savedWriteIndex = buffer.writerIndex();
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            // 編碼返回消息體數據或者錯誤數據
            if (status == Response.OK) {
                if (res.isHeartbeat()) {
                    encodeHeartbeatData(channel, out, res.getResult());
                } else {
                    encodeResponseData(channel, out, res.getResult());
                }
            } else out.writeUTF(res.getErrorMessage());
            out.flushBuffer();
            bos.flush();
            bos.close();

            int len = bos.writtenBytes();
            checkPayload(channel, len);
            Bytes.int2bytes(len, header, 12);
            // write
            buffer.writerIndex(savedWriteIndex);
            buffer.writeBytes(header); // write header.
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // 發送失敗信息給Consumer,不然Consumer只能等超時了
            if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
                try {
                    // FIXME 在Codec中打印出錯日誌?在IoHanndler的caught中統一處理?
                    logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);

                    Response r = new Response(res.getId(), res.getVersion());
                    r.setStatus(Response.BAD_RESPONSE);
                    r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
                    channel.send(r);

                    return;
                } catch (RemotingException e) {
                    logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
                }
            }
            // 從新拋出收到的異常
            ···
    }
複製代碼

基本上和消費方請求編碼同樣,多了一個步驟,一個是在消息頭中加入了一個狀態位,第二個是若是發送有異常,則繼續發送失敗信息給Consumer,不然Consumer只能等超時了

Conmsuer端響應解碼

和上面的解碼同樣,具體操做是在ExchangeCodec.decode--DubboCodec.decodeBody中

相關文章
相關標籤/搜索