dubbo通訊消息解析過程分析(1)

因爲rpc底層涉及網絡編程接口,線程模型,網絡數據結構,服務協議,細到字節的處理。牽涉內容較多,今天就先從一個點提及。
說說,dubbo經過netty框架作傳輸層,從接到數據字節流到把字節轉換爲dubbo上層可讀的Request消息對象的過程。當前dubbo還支持mina,grizzly作底層傳輸層。
這裏包括兩部,反序列化和解碼。我打算分兩篇寫。這篇主要是說解碼的過程。
就是下面這個dubbo架構圖中,紅框中的部分。java

既然是netty作傳輸層,netty的基礎得提一點。
netty框架是經過管道(ChannelPipeline)模型處理網絡數據流的,每一個管道中有多個處理接點(ChannelHandler),
節點分爲,進站(client請求進服務端口)和出站(請求響應出服務端口)兩種。好比一個進站消息老是,順序的(順序是程序中編碼指定的)經過進站處理節點。
同理出站消息,老是順序的經過出站節點到達網絡接口。編程

dubbo2.5.6版本,傳輸層dubbo提供有netty3和netty4兩種實現,初始化netty通道都在NettyServer類裏,兩個類同名,包名不一樣。bootstrap

具體,netty3在NettyServer類裏doOpen()方法:數組

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
	       //編解碼器的初始化 
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());//設置解碼hander
                pipeline.addLast("encoder", adapter.getEncoder());//設置編碼hander
                pipeline.addLast("handler", nettyHandler);//自定義NettyHandler 擴展自netty雙向handler基類,能夠接受進站和出站數據流
                return pipeline;
		//進站的請求,先通過adapter.getDecoder()handler處理,再由nettyHandler處理
		//出站的請求,先通過nettyHandler處理 再由adapter.getEncoder()handler處理
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

netty4版本NettyServer類裏doOpen()方法:網絡

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();

        bootstrap = new ServerBootstrap();
        //acceptor 事件循環線程
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        //client channel事件循環線程
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //編解碼器的初始化 
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())//設置解碼hander
                                .addLast("encoder", adapter.getEncoder())//設置編碼hander
                                .addLast("handler", nettyServerHandler);//自定義NettyServerHandler 擴展netty雙向handler基類,能夠接受進站和出站數據流
                        //進站的請求,先通過adapter.getDecoder()handler處理,再由nettyServerHandler處理
                        //出站的請求,先通過nettyServerHandler處理 再由adapter.getEncoder()handler處理
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

本次說的是進站流程,請求數據解析成Request對象過程,經過上面的代碼和netty特性可知,
進站數據先經過解碼hander,經解碼成消息Request對象後,再到自定義handler,而後由自定義hanlder經過裝飾模式,調用實際服務。數據結構

先看下解碼handler的實現:
由adapter.getDecoder()這句跟蹤到NettyCodecAdapter類的getDecoder()方法
public ChannelHandler getDecoder() {
return decoder;
}
能夠看到,這個方法獲取的解碼handler,decoder,是NettyCodecAdapter類的私有屬性架構

private final ChannelHandler decoder = new InternalDecoder();併發

看下InternalDecoder類定義,netty3版本:app

* 這裏須要些netty的知識,繼承SimpleChannelUpstreamHandler,代表它是進站handler
     * 因此進站的數據流,都會通過本handler對象,具體就是messageReceived方法。
     */
    private class InternalDecoder extends SimpleChannelUpstreamHandler {

        private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;//這是dubbo根據nio本身實現的buffer

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            Object o = event.getMessage();
            if (!(o instanceof ChannelBuffer)) {//這裏ChnnelBuffer是netty基於jdk nio ByteBuffer 實現
                ctx.sendUpstream(event);
                return;
            }

            ChannelBuffer input = (ChannelBuffer) o;
            int readable = input.readableBytes();//到這就是從netty event對象取數據的過程。
            if (readable <= 0) {
                return;
            }

            com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
            if (buffer.readable()) {
                if (buffer instanceof DynamicChannelBuffer) {
                    buffer.writeBytes(input.toByteBuffer());
                    message = buffer;
                } else {
                    int size = buffer.readableBytes() + input.readableBytes();
                    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                            size > bufferSize ? size : bufferSize);//bufferSize 不指定是8k,這裏表示最小8K
                    message.writeBytes(buffer, buffer.readableBytes());
                    message.writeBytes(input.toByteBuffer());//把netty 讀到的字節流寫入message
                }
            } else {//直接經過構造器,構造message
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                        input.toByteBuffer());
            }

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            Object msg;
            int saveReaderIndex;
            //從netty框架,讀取到的數據,放入message後,下面就是針對message的反序列化和解碼過程。
            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        //解碼,這裏麪包括的反序列化
                        msg = codec.decode(channel, message);//重要!!!經過具體編解碼實例codec完成解碼
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {//若是解碼結果是,Codec2.DecodeResult.NEED_MORE_INPUT,表示,須要更多數據
                        message.readerIndex(saveReaderIndex);//很重要,設置readerIndex爲,解碼讀取前的位置,爲了下次再從頭讀取。
                        break;
                    } else {
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {//解碼完成,這裏的msg已是Request對象。!!
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());
            } finally {
                if (message.readable()) {
                    message.discardReadBytes();
                    buffer = message;
                } else {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                }
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }
        //處理異常
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            ctx.sendUpstream(e);//向下一個傳遞。通常在最後一個handler處理異常
        }
    }

netty4版本:框架

/***
     * netty4 擴展了ByteToMessageDecoder
     * 重寫decode 方法,解碼完成後,不用像netty3手動Channels.fireMessageReceived 發送事件,
     * netty4自動把對象,傳遞到下一個handler
     */
    private class InternalDecoder extends ByteToMessageDecoder {

        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

            ChannelBuffer message = new NettyBackedChannelBuffer(input);

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            Object msg;

            int saveReaderIndex;

            try {
                // decode object.
                do {
                    //保存初始,讀取位置
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);//重要!!!經過具體編解碼實例codec完成解碼
                    } catch (IOException e) {
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        //解碼失敗後,從新設置readerIndex爲讀取前位置
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        //is it possible to go here ?
                        if (saveReaderIndex == message.readerIndex()) {
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            //解碼成功後,加入到out list中,傳遞到下一個處理handler
                            out.add(msg);
                        }
                    }
                } while (message.readable());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    }

能夠看到,不管netty3,仍是netty4都是經過,NettyCodecAdapter的codec屬性完成解碼的,
這裏有個概念,編解碼handler是經過編解碼實例完成編解碼的這裏的編解碼實例就是codec
而codec實例是由它構造函數從上層方法傳遞的。以下

public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
        int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
        this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
    }

再回到NettySever類中NettyCodecAdapter的構造語句
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
跟進getCodec()方法,這個方就是獲取實際編解碼方案的。這個方法的實如今NettyServer的祖先類AbstractEndpoint中:

protected Codec2 getCodec() {
        return codec;
    }
    //能夠看到codec在構造方法裏建立的
     public AbstractEndpoint(URL url, ChannelHandler handler) {
        super(url, handler);
        this.codec = getChannelCodec(url);//根據url配置,構造編碼解碼器(經過spi獲得DubboCountCodec類實例)
        this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }
   // 再跟到getChannelCodec方法
     protected static Codec2 getChannelCodec(URL url) {
        //經過spi機制,從url裏獲取編解碼方案,這裏是dubbo。取不到就是telnet
	//dubbo編解碼方案,實現類是DubboCountCodec
        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
                    .getExtension(codecName));
        }
    }

這裏看下,dubbo中的全部編解碼類結構:

能夠看到,全部編解碼器實現,都擴展了Codec2接口。同時Codec2也是個spi擴展點。
接口Codec2,以下:

@SPI
public interface Codec2 {
    /**
     * spi 獲取編碼器
     * @param channel
     * @param buffer
     * @param message
     * @throws IOException
     */
    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

    /***
     * spi 獲取解碼器
     * @param channel
     * @param buffer
     * @return
     * @throws IOException
     */
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;


    enum DecodeResult {
        NEED_MORE_INPUT, SKIP_SOME_INPUT
    }

}

具體實現經過spi獲取,dubbo編解碼方案實例就是DubboCountCodec
那麼看下DubboCountCodec類,以及decode方法:

public final class DubboCountCodec implements Codec2 {

    private DubboCodec codec = new DubboCodec();//具體dubbo協議編解碼方案實現

    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        codec.encode(channel, buffer, msg);
    }

    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int save = buffer.readerIndex();//記錄讀取初始位置
        MultiMessage result = MultiMessage.create();//解碼後對象容器。list,可放多個消息
        do {
            Object obj = codec.decode(channel, buffer);//解碼過程在DubboCodec類中的decode方法裏
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//須要接受更多信息
                buffer.readerIndex(save);//恢復讀取位置
                break;
            } else {//解碼完成,加到對象容器
                result.addMessage(obj);
                logMessageLength(obj, buffer.readerIndex() - save);//記錄日誌,可忽略
                save = buffer.readerIndex();//更新讀取位置
            }
        } while (true);
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        if (result.size() == 1) {
            return result.get(0);//返回解碼後對象
        }
        return result;
    }

    private void logMessageLength(Object result, int bytes) {
        if (bytes <= 0) {
            return;
        }
        if (result instanceof Request) {
            try {
                ((RpcInvocation) ((Request) result).getData()).setAttachment(
                        Constants.INPUT_KEY, String.valueOf(bytes));
            } catch (Throwable e) {
                /* ignore */
            }
        } else if (result instanceof Response) {
            try {
                ((RpcResult) ((Response) result).getResult()).setAttachment(
                        Constants.OUTPUT_KEY, String.valueOf(bytes));
            } catch (Throwable e) {
                /* ignore */
            }
        }
    }

}

分析DubboCodec類以前,先說下dubbo協議消息格式,它包括消息頭和消息體:

前2個字節:
爲協議魔數,固定值oxdabb

第三字節:
第1比特(0/1)表示是請求消息,仍是響應消息
第2比特(0/1)表示是是否必須雙向通訊,即有請求,必有響應
第3比特(0/1)表示是是不是,心跳消息
第低5位比特,表示一個表示消息序列化的方式(1,是dubbo ,2,是hessian...)

第四字節:
只在響應消息中用到,表示響應消息的狀態,是成功,失敗等

第5-12字節:
8個字節,表示一個long型數字,是reqeustId

第13—16字節:
4個字節,表示消息體的長度(字節數)

消息體,不固定長度
是請求消息時,表示請求數據
是響應消息時,表示方法調用返回結果。

編碼和解碼主要是對消息頭的設置和解析。序列化和反序列化主要是對消息體的操做。

先看DubboCodec的關係圖:

DubboCodec類decode方法的實如今其父類ExchangeCodec中:

//先看下類中定義的常量:
    // header length.消息頭長度
    protected static final int HEADER_LENGTH = 16;
    // magic header.
    protected static final short MAGIC = (short) 0xdabb;//1101 1010 1011 1011魔數
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];//高字節
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];//低字節
    // message flag.
    protected static final byte FLAG_REQUEST = (byte) 0x80;//1000,0000//表示消息類型
    protected static final byte FLAG_TWOWAY = (byte) 0x40;//0100,0000//表示是否雙向通訊
    protected static final byte FLAG_EVENT = (byte) 0x20;//0010,0000//表示是不是心跳事件
    protected static final int SERIALIZATION_MASK = 0x1f;//0001,1111/表示是序列化實現類型
  /***
     * 解碼入口方法
     * @param channel
     * @param buffer
     * @return
     * @throws IOException
     */
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        //取得可讀的字節數
        int readable = buffer.readableBytes();
        //header 最大16字節
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        //從buffer中讀16字節到header數組中
        buffer.readBytes(header);
        //調用自己decode方法
        return decode(channel, buffer, readable, header);
    }

     /***
     * 具體協議解析方法,本方法主要是讀取驗證消息頭的過程
     * @param channel
     * @param buffer
     * @param readable
     * @param header
     * @return
     * @throws IOException
     */
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // check magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            //前兩字節,不是dubbo魔數
            int length = header.length;
            //若是可讀的字節數目,大於16字節
            if (header.length < readable) {
                //給header擴容到readable大小
                header = Bytes.copyOf(header, readable);
                //把buffer剩下的字節讀到header中,這裏多於16字節
                buffer.readBytes(header, length, readable - length);
            }
            //上層方法,第一字節已經驗證過,這個從第二字節開始驗證。
            for (int i = 1; i < header.length - 1; i++) {
                //若是發現,後續字節有dubbo協議的開頭
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    //重置readerIndex 位置,到魔數開始的地方。
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    //把魔數開始位置,之前的數據,覆蓋賦給header(下面調用super.decode解析)
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            //上層方法是dubbo telnet 編解碼實現
            return super.decode(channel, buffer, readable, header);
        }
        // check length.
        if (readable < HEADER_LENGTH) {//小於16字節,返回 須要更多對象
            return DecodeResult.NEED_MORE_INPUT;
        }

        //get data length.讀取header[]最後四字節,構造一個int的數據,
        //根據dubbo協議,這個是消息體的長度
        int len = Bytes.bytes2int(header, 12);
        //檢查數據大小
        checkPayload(channel, len);//默認爲8M

        int tt = len + HEADER_LENGTH;//總的消息大小,消息頭加消息實體
        if (readable < tt) {//若是可讀取的,不夠消息總大小,就返回 須要更多數據
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.這個時候,buffer的readerIndex位置已經是,讀完header後的位置,接下來的len長度的數據,全是消息體的數據。
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            //解碼反序列化成Reqeust或者Response對象,decodeBody方法被子類DubboCodec重寫了
            //這裏要看DubboCodec的decodeBody的方法
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

DubboCodec類的decodeBody方法:

/***
     * 解碼,是從輸出流 is 取字節數據,經反序列化,構造Request 和Response對象的過程。
     * @param channel
     * @param is
     * @param header
     * @return
     * @throws IOException
     */
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        //消息頭第3字節和SERIALIZATION_MASK&操做後,就能夠獲得,序列化/反序列化方案
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        //根據序列化方案id,或者url指定,經過spi機制去獲取序列化實現。dubbo協議默認用hession2序列化方案
        //是放在消息頭flag 裏的。這裏proto 值是2
        //獲取具體用序列化/反序列化實現
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // get request id.header字節從第5到12 8個字節,是請求id
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {//根據flag&FLAG_REQUEST後,判斷須要解碼的消息類型
            // decode response.
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // get status. 獲取響應 狀態 成功,失敗等
            byte status = header[3];
            res.setStatus(status);
            if (status == Response.OK) {//返回結果狀態 ok成功
                try {
                    Object data;
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    } else if (res.isEvent()) {//事件消息,反序列化
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    } else {//業務調用結果消息 解碼構造 DecodeableRpcResult 對象的過程
                        DecodeableRpcResult result;
                        if (channel.getUrl().getParameter(
                                Constants.DECODE_IN_IO_THREAD_KEY,
                                Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否在io 線程內解碼
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            //!!!Response消息反序列化 就是把調用結果返回值 從is裏反序列化出來,放在 DecodeableRpcResult類的result 字段的過程。
                            result.decode();
                        } else {
                            //不在io線程解碼,要先經過readMessageData方法把調用結果數組取出後,
                            //放在UnsafeByteArrayInputStream對象,存在DecodeableRpcResult對象裏,後續經過上層方法解碼。
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }

                    //同時把DecodeableRpcResult對象放入Response result字段。
                    res.setResult(data);
                } catch (Throwable t) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode response failed: " + t.getMessage(), t);
                    }
                    //異常處理,設置status和異常信息
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            } else {
                //異常處理,設置異常信息
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        } else {//反解碼Requset 消息類型
            // decode request.
            Request req = new Request(id);
            req.setVersion("2.0.0");
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {//業務調用請求消息 解碼構造 DecodeableRpcInvocation 對象的過程
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否 io 線程內解碼
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        //!!!Requset 類型反序列化方法
                        inv.decode();
                    } else {
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                //同時把DecodeableRpcInvocation對象放入Request data字段。
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request 異常請求對象設置
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }

    /***
     * 獲取反序列化方案
     * @param serialization
     * @param url
     * @param is
     * @return
     * @throws IOException
     */
    private ObjectInput deserialize(Serialization serialization, URL url, InputStream is)
            throws IOException {
        return serialization.deserialize(url, is);
    }

    /***
     * 讀取is 裏的可用數據
     * @param is
     * @return
     * @throws IOException
     */
    private byte[] readMessageData(InputStream is) throws IOException {
        if (is.available() > 0) {
            byte[] result = new byte[is.available()];
            is.read(result);
            return result;
        }
        return new byte[]{};
    }

RPC調用請求:DecodeableRpcInvocation 類反序列化方法:

public void decode() throws Exception {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {//具體在decode重載方法裏
                decode(channel, inputStream);
            } catch (Throwable e) {//異常請求設置
                if (log.isWarnEnabled()) {
                    log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
                }
                request.setBroken(true);
                request.setData(e);
            } finally {
                hasDecoded = true;
            }
        }
    }

    /**
     * 反序列化,解碼 經過反序列化還原
     * RpcInvocation 類的
     * private String methodName;
     private Class<?>[] parameterTypes;
     private Object[] arguments;
     private Map<String, String> attachments; 是個屬性值,就像在客戶端請求時設置的同樣。
     * @param channel channel.
     * @param input   input stream.
     * @return
     * @throws IOException
     */
    public Object decode(Channel channel, InputStream input) throws IOException {
        //獲取反序列化方案
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);
        //反序列化出,dubbo版本,路徑,服務版本信息,設置到attachments裏,這讀取順序和序列化時的寫入順序也一致
        //固然,序列化方案也一致。這裏默認都是hissen2
        //調用ObjectInput的readUTF()反序列化方法,依次獲取調用信息
        setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());
        //讀取方法名
        setMethodName(in.readUTF());
        //反序列化,方法請求參數類型,
        try {
            Object[] args;
            Class<?>[] pts;
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        //更具類型讀取請求參數值
                        //調用ObjectInput的readObject()反序列化方法,反序列化出參數值
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            //設置保存請求參數類型
            setParameterTypes(pts);
            //反序列化,attachment map
            //調用ObjectInput的readObject()反序列化方法,反序列化出attachemnet值
            Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap<String, String>();
                }
                attachment.putAll(map);
                setAttachments(attachment);
            }
            //decode argument ,may be callback 回調參數設置,這個再說。
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }
            //保存請求參數值
            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        }
        return this;
    }

RPC調用結果:DecodeableRpcResult 反序列化 方法decode()

public void decode() throws Exception {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {//具體實如今decode(channel, inputStream)重載方法裏
                decode(channel, inputStream);
            } catch (Throwable e) {//設置異常返回response
                if (log.isWarnEnabled()) {
                    log.warn("Decode rpc result failed: " + e.getMessage(), e);
                }
                response.setStatus(Response.CLIENT_ERROR);
                response.setErrorMessage(StringUtils.toString(e));
            } finally {
                hasDecoded = true;
            }
        }
    }

     /***
     * 反序列化,解碼過程,讀取input的調用結果字節數據,經反序列化成方法返回類型對象
     * 併發放回結果設置到RpcResult的result字段裏
     * 以及異常返回字段的設置。
     * @param channel channel.
     * @param input   input stream.
     * @return
     * @throws IOException
     */
    public Object decode(Channel channel, InputStream input) throws IOException {
        //獲取反序列化方案
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);
        //反序列化出 結果標識 調用ObjectInput的readByte方法,反序列化出一個byte值
        byte flag = in.readByte();//
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE://null 值
                break;
            case DubboCodec.RESPONSE_VALUE: //非空值
                try {
                    //根據invocation獲取調用方法的放回類型
                    Type[] returnType = RpcUtils.getReturnTypes(invocation);
                    //根據返回類型,反序列出結果並這是到RpcResult 的result字段裏。
                    //void 類型 結果值 是null ;int 等基本類型,自動裝箱 Integer;
                    //具體調用ObjectInput的readObject重載的兩個方法,反序列出結果對象
                    setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                            (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                    : in.readObject((Class<?>) returnType[0], returnType[1])));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION://異常信息反序列化,設置到exception字段
                try {
                   //具體調用ObjectInput的readObject重載的兩個方法,反序列出異常對象
                    Object obj = in.readObject();
                    if (obj instanceof Throwable == false)
                        throw new IOException("Response data error, expect Throwable, but get " + obj);
                    setException((Throwable) obj);
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }
        return this;
    }

下一篇再說說反序列化。

相關文章
相關標籤/搜索