dubbo 序列化機制之 hessian2序列化實現原理分析

  對於遠程通訊,每每都會涉及到數據持久化傳輸問題。往大了說,就是,從A發出的信息,怎樣能被B接收到相同信息內容!小點說就是,編碼與解碼問題!java

  而在dubbo或者說是java的遠程通訊中,編解碼則每每伴隨着序列化與反序列化!bootstrap

普通java對象要想實現序列化,通常有幾個步驟:數組

  1. 實現 Serializable 接口;緩存

  2. 生成一個序列號: serialVersionUID, (非必須,但建議);網絡

  3. 重寫 writeObject()/readObject() 自定義序列化,若有必要的話;app

  4. 調用 java.io.ObjectOutputStream 的 writeObject()/readObject() 進行序列化與反序列化;框架

  簡單吧,可是你們知道,市面上有許許多多的序列化框架!爲啥呢?由於它們須要速度更快,體積更小!異步

今天咱們就來細看下dubbo的默認序列化器 Hession2 是怎麼作的吧!socket

從Server初始化處開始, 能夠看到, 咱們使用 默認的 dubbo 是基於 netty 來建立 server的.tcp

    // com.alibaba.dubbo.remoting.transport.netty.NettyServer
    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org.browse.NETTY-365
        // https://issues.jboss.org.browse.NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                // encoder 是一個基於netty解碼器的子類: NettyCodecAdapter.InternalDecoder extends SimpleChannelUpstreamHandler
                pipeline.addLast("decoder", adapter.getDecoder());
                // decoder 是一個基於netty編碼器的子類: NettyCodecAdapter.InternalEncoder extends OneToOneEncoder
                pipeline.addLast("encoder", adapter.getEncoder());
                // handler 則處理全部的業務邏輯處理
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

server 中使用了管道來進行通訊,主要有三個 ChannelHandler:

  1. decoder, 負責消息解碼, 依賴於netty基礎設施;

  2. encoder, 負責消息的編碼工做, 依賴於netty基礎設施;(本文的主要目標)

  3. 業務處理的 handler, NettyHandler; 

  這幾個管道的流向如netty中闡述的同樣,會隨出站和入站的步驟進行流動; 本文講解出站請求,因此步驟會是 handler -> encoder -> decoder

    
    // com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel 此處開始發送請求數據
    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        // Request 中有個有意思的變量: private static final AtomicLong INVOKE_ID = new AtomicLong(0); 負責維護本地的請求序號
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 首先會調用 com.alibaba.dubbo.remoting.transport.netty.NettyClient 的 send() 方法
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

  通過 HeaderExchangeChannel 的封裝後,就有了 Request 請求, 接下來就是發送往遠程的過程了!幾個要點:

    1. 每一個請求都會有序列號,依次遞增;

    2. 設置爲雙向通訊,即 twoWay=true, 既發送也接收請求;

    3. 使用 DefaultFuture 封裝返回值,接收異步結果;

    
    // NettyClient, com.alibaba.dubbo.remoting.transport.AbstractPeer
    @Override
    public void send(Object message) throws RemotingException {
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }
    
    // NettyClient, com.alibaba.dubbo.remoting.transport.AbstractClient
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        Channel channel = getChannel();
        //TODO Can the value returned by getChannel() be null? need improvement.
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        // 調用另外一個 channel 寫入
        channel.send(message, sent);
    }
    // com.alibaba.dubbo.remoting.transport.netty.NettyChannel
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            // 寫業務由此觸發,返回一個 異步 Future
            ChannelFuture future = channel.write(message);
            if (sent) {
                // 若是是發送請求, 則而後再阻塞等待結果
                // 還有另外一個阻塞等待結果的地方,是在 DubboInvoker 中
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            // 若是發現異常,則將異步異常拋出
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }

        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
    

  談到對future結果的處理,咱們仍是倒回到 DubboInvoker 中,進行看下是怎樣處理的!

    // com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker
    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                // 若是單向發送的包,則直接忽略結果便可
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                // 針對設置爲異步的請求,直接將future設置到上下文後,返回空結果便可
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                // 針對同步請求
                // 發起遠程請求, 獲取到 future 異步結果, 調用 future.get() 同步阻塞,等待結果後返回
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

  接下就正式進入到 Socket 的髮網絡發送流程中了,咱們來看下都是怎麼作的!(注意: 如今的數據仍是原始數據,並無序列化)

    // org.jboss.netty.channel.socket.nio.NioClientSocketChannel, org.jboss.netty.channel.AbstractChannel
    public ChannelFuture write(Object message) {
        return Channels.write(this, message);
    }
    
    // org.jboss.netty.channel.Channels
    /**
     * Sends a {@code "write"} request to the last
     * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
     * the specified {@link Channel}.
     *
     * @param channel  the channel to write a message
     * @param message  the message to write to the channel
     *
     * @return the {@link ChannelFuture} which will be notified when the
     *         write operation is done
     */
    public static ChannelFuture write(Channel channel, Object message) {
        return write(channel, message, null);
    }
    
    // 寫入數據到管道尾部, 一切看起來都很美好,返回 future 了事! 進入 pipeline 以後,就會調用一系列的 鏈處理,如加解碼
    /**
     * Sends a {@code "write"} request to the last
     * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
     * the specified {@link Channel}.
     *
     * @param channel  the channel to write a message
     * @param message  the message to write to the channel
     * @param remoteAddress  the destination of the message.
     *                       {@code null} to use the default remote address
     *
     * @return the {@link ChannelFuture} which will be notified when the
     *         write operation is done
     */
    public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
        ChannelFuture future = future(channel);
        channel.getPipeline().sendDownstream(
                new DownstreamMessageEvent(channel, future, message, remoteAddress));
        return future;
    }
    // org.jboss.netty.channel.DefaultChannelPipeline
    public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
        if (tail == null) {
            try {
                getSink().eventSunk(this, e);
                return;
            } catch (Throwable t) {
                notifyHandlerException(e, t);
                return;
            }
        }
        // 添加到 tail 中
        sendDownstream(tail, e);
    }

    void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        if (e instanceof UpstreamMessageEvent) {
            throw new IllegalArgumentException("cannot send an upstream event to downstream");
        }

        try {
            // 調用下一個 pipeline 的處理方法 handler 的處理 handleDownstream(), 即調用 NettyHandler 了
            ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
        } catch (Throwable t) {
            // Unlike an upstream event, a downstream event usually has an
            // incomplete future which is supposed to be updated by ChannelSink.
            // However, if an exception is raised before the event reaches at
            // ChannelSink, the future is not going to be updated, so we update
            // here.
            e.getFuture().setFailure(t);
            notifyHandlerException(e, t);
        }
    }
    
    // NettyHandler, org.jboss.netty.channel.SimpleChannelHandler
    /**
     * {@inheritDoc}  Down-casts the received downstream event into more
     * meaningful sub-type event and calls an appropriate handler method with
     * the down-casted event.
     */
    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
            throws Exception {

        if (e instanceof MessageEvent) {
            // 消息發送走數據寫入邏輯, ctx 便是 上下文的末端 tail
            writeRequested(ctx, (MessageEvent) e);
        } else if (e instanceof ChannelStateEvent) {
            // 事件類處理邏輯
            ChannelStateEvent evt = (ChannelStateEvent) e;
            switch (evt.getState()) {
            case OPEN:
                if (!Boolean.TRUE.equals(evt.getValue())) {
                    closeRequested(ctx, evt);
                }
                break;
            case BOUND:
                if (evt.getValue() != null) {
                    bindRequested(ctx, evt);
                } else {
                    unbindRequested(ctx, evt);
                }
                break;
            case CONNECTED:
                if (evt.getValue() != null) {
                    connectRequested(ctx, evt);
                } else {
                    disconnectRequested(ctx, evt);
                }
                break;
            case INTEREST_OPS:
                setInterestOpsRequested(ctx, evt);
                break;
            default:
                ctx.sendDownstream(e);
            }
        } else {
            ctx.sendDownstream(e);
        }
    }

  接下來調用 handler 的 writeRequest(), 進行 pipeline 管道式調用:

    // NettyHandler, com.alibaba.dubbo.remoting.transport.netty.NettyHandler
    @Override
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        // 先調用父類, SimpleChannelHandler, 處理鏈上的邏輯
        super.writeRequested(ctx, e);
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.sent(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
    
    // org.jboss.netty.channel.SimpleChannelHandler
    /**
     * Invoked when {@link Channel#write(Object)} is called.
     */
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ctx.sendDownstream(e);
    }
    // org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext
    public void sendDownstream(ChannelEvent e) {
        // 查看是否有上一節點,若是有,遞歸調用。 即: pipeline 管道效果,依次流過事件處理
        // 所謂的 pipeline 鏈的實現原理哦
        DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
        if (prev == null) {
            try {
                getSink().eventSunk(DefaultChannelPipeline.this, e);
            } catch (Throwable t) {
                notifyHandlerException(e, t);
            }
        } else {
            DefaultChannelPipeline.this.sendDownstream(prev, e);
        }
    }
    // DefaultChannelPipeline.this.sendDownstream() , 調用業務的 handler 進行處理, 即編碼解碼過程
    void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        if (e instanceof UpstreamMessageEvent) {
            throw new IllegalArgumentException("cannot send an upstream event to downstream");
        }

        try {
            ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
        } catch (Throwable t) {
            // Unlike an upstream event, a downstream event usually has an
            // incomplete future which is supposed to be updated by ChannelSink.
            // However, if an exception is raised before the event reaches at
            // ChannelSink, the future is not going to be updated, so we update
            // here.
            e.getFuture().setFailure(t);
            notifyHandlerException(e, t);
        }
    }

  而後是編碼操做!

    // 調用encoder, InternalEncoder 的父類: org.jboss.netty.handler.codec.oneone.OneToOneEncoder
    public void handleDownstream(
            ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
        if (!(evt instanceof MessageEvent)) {
            ctx.sendDownstream(evt);
            return;
        }

        MessageEvent e = (MessageEvent) evt;
        // 編碼數據
        if (!doEncode(ctx, e)) {
            ctx.sendDownstream(e);
        }
    }
    protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Object originalMessage = e.getMessage();
        // 此處 encode() 由子類實現, 從而實現自定義的編碼方式
        Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
        if (originalMessage == encodedMessage) {
            return false;
        }
        if (encodedMessage != null) {
            // 編碼好後,就進行遠端的寫入了, tcp 協議, TruncatedChannelBuffer 
            // 其實在 encode() 的時候已經將數據發送了
            write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
        }
        return true;
    }

    // com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalEncoder
    @Sharable
    private class InternalEncoder extends OneToOneEncoder {
        @Override
        protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                    com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                // 調用 codec 的 encode(), 即 DubboCountCodec, 固然,該 codec 是能夠經過請求 url 參數裏指定的
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
            // 使用 ChannelBuffers 包裝 _buffer 後,返回數據給到調用方,以便進行寫稿
            return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
        }
    }
    // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        codec.encode(channel, buffer, msg);
    }
    // DubboCountCodec, com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec
    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 編碼數據
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

  具體的寫入格式以下: 請求頭魔數 -> 請求序列化方式標識 -> 請求類型標識 -> 請求序列號 -> body

    // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec,  com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec
    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);
        // 寫入格式以下: 請求頭魔數 -> 請求序列化方式標識 -> 請求類型標識 -> 請求序列號 -> body
        // 全部數據寫入 buffer
        // header.
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number.
        Bytes.short2bytes(MAGIC, header);

        // set request and serialization flag.
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
        if (req.isEvent()) header[2] |= FLAG_EVENT;

        // set request id.
        Bytes.long2bytes(req.getId(), header, 4);

        // encode request data.
        int savedWriteIndex = buffer.writerIndex();
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            encodeEventData(channel, out, req.getData());
        } else {
            // 編碼數據, 此處可能遇到複雜的對象, 解決辦法是 遞歸調用 JavaSerializer
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        int len = bos.writtenBytes();
        checkPayload(channel, len);
        Bytes.int2bytes(len, header, 12);

        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }

    // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec , 進行數據編碼
    @Override
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i = 0; i < args.length; i++) {
                // 寫入單個字段數據
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
        out.writeObject(inv.getAttachments());
    }

 

  接下來看看 hessian 是如何進行數據序列化的? 其實就是調用  hessian 的 writeObject() 方法原理!

    // com.alibaba.dubbo.common.serialize.hessian2.Hessian2ObjectOutput
    @Override
    public void writeObject(Object obj) throws IOException {
        mH2o.writeObject(obj);
    }

    // com.alibaba.com.caucho.hessian.io.Hessian2Output
    /**
     * Writes any object to the output stream.
     */
    @Override
    public void writeObject(Object object)
            throws IOException {
        if (object == null) {
            writeNull();
            return;
        }

        Serializer serializer;
        // 獲取序列化器: 這個過程比較繁雜會不停搜索符合條件的 Serializer, 默認 JavaSerializer, 而後調用 writeObject() 方法, 
        serializer = findSerializerFactory().getSerializer(object.getClass());

        serializer.writeObject(object, this);
    }
    
    // 序列化器工廠類 com.alibaba.com.caucho.hessian.io.SerializerFactory
    /**
     * Returns the serializer for a class.
     *
     * @param cl the class of the object that needs to be serialized.
     * @return a serializer object for the serialization.
     */
    @Override
    public Serializer getSerializer(Class cl)
            throws HessianProtocolException {
        Serializer serializer;

        // 1. 先嚐試從基本的序列化器中查找
        serializer = (Serializer) _staticSerializerMap.get(cl);
        if (serializer != null) {
            return serializer;
        }

        // 2. 從緩存中查找, 若是以前找到過的話
        if (_cachedSerializerMap != null) {
            serializer = (Serializer) _cachedSerializerMap.get(cl);
            if (serializer != null) {
                return serializer;
            }
        }
        // 3. 初始化多個工廠類
        for (int i = 0;
             serializer == null && _factories != null && i < _factories.size();
             i++) {
            AbstractSerializerFactory factory;

            factory = (AbstractSerializerFactory) _factories.get(i);

            serializer = factory.getSerializer(cl);
        }

        // 4. 簡單明確的複合對象斷定
        if (serializer != null) {

        } else if (isZoneId(cl)) //must before "else if (JavaSerializer.getWriteReplace(cl) != null)"
            serializer = ZoneIdSerializer.getInstance();
        else if (isEnumSet(cl))
            serializer = EnumSetSerializer.getInstance();
        else if (JavaSerializer.getWriteReplace(cl) != null)
            serializer = new JavaSerializer(cl, _loader);

        else if (HessianRemoteObject.class.isAssignableFrom(cl))
            serializer = new RemoteSerializer();

//    else if (BurlapRemoteObject.class.isAssignableFrom(cl))
//      serializer = new RemoteSerializer();

        else if (Map.class.isAssignableFrom(cl)) {
            if (_mapSerializer == null)
                _mapSerializer = new MapSerializer();

            serializer = _mapSerializer;
        } else if (Collection.class.isAssignableFrom(cl)) {
            if (_collectionSerializer == null) {
                _collectionSerializer = new CollectionSerializer();
            }

            serializer = _collectionSerializer;
        } else if (cl.isArray()) {
            // 數組序列化器
            serializer = new ArraySerializer();
        } else if (Throwable.class.isAssignableFrom(cl)) {
            serializer = new ThrowableSerializer(cl, getClassLoader());
        } else if (InputStream.class.isAssignableFrom(cl)) {
            serializer = new InputStreamSerializer();
        } else if (Iterator.class.isAssignableFrom(cl)) {
            serializer = IteratorSerializer.create();
        } else if (Enumeration.class.isAssignableFrom(cl)) {
            serializer = EnumerationSerializer.create();
        } else if (Calendar.class.isAssignableFrom(cl)) {
            serializer = CalendarSerializer.create();
        } else if (Locale.class.isAssignableFrom(cl)) {
            serializer = LocaleSerializer.create();
        } else if (Enum.class.isAssignableFrom(cl)) {
            serializer = new EnumSerializer(cl);
        }

        // 5. 都找不到則使用默認序列化器, 即普通繼承了 Serializable 接口的對象都是此類
        if (serializer == null) {
            serializer = getDefaultSerializer(cl);
        }

        if (_cachedSerializerMap == null) {
            _cachedSerializerMap = new ConcurrentHashMap(8);
        }
        // 6. 將查找結果存入緩存
        _cachedSerializerMap.put(cl, serializer);

        return serializer;
    }
    
    /**
     * Returns the default serializer for a class that isn't matched
     * directly.  Application can override this method to produce
     * bean-style serialization instead of field serialization.
     *
     * @param cl the class of the object that needs to be serialized.
     * @return a serializer object for the serialization.
     */
    protected Serializer getDefaultSerializer(Class cl) {
        if (_defaultSerializer != null)
            return _defaultSerializer;

        if (!Serializable.class.isAssignableFrom(cl)
                && !_isAllowNonSerializable) {
            throw new IllegalStateException("Serialized class " + cl.getName() + " must implement java.io.Serializable");
        }

        return new JavaSerializer(cl, _loader);
    }
    // com.alibaba.com.caucho.hessian.ioSerializer
    @Override
    public void writeObject(Object obj, AbstractHessianOutput out)
            throws IOException {
        if (out.addRef(obj)) {
            return;
        }

        Class cl = obj.getClass();

        try {
            // 若是有寫 writeReplace() 方法,則直接調用其便可; 不然遞歸簡單序列化結構
            if (_writeReplace != null) {
                Object repl;

                if (_writeReplaceFactory != null)
                    repl = _writeReplace.invoke(_writeReplaceFactory, obj);
                else
                    repl = _writeReplace.invoke(obj);

                //Some class would return itself for wrapReplace, which would cause infinite recursion
                //In this case, we could write the object just like normal cases
                if (repl != obj) {
                    out.removeRef(obj);

                    out.writeObject(repl);

                    out.replaceRef(repl, obj);

                    return;
                }
            }
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            // log.log(Level.FINE, e.toString(), e);
            throw new RuntimeException(e);
        }

        // 寫參數類名
        int ref = out.writeObjectBegin(cl.getName());

        if (ref < -1) {
            writeObject10(obj, out);
        } else {
            if (ref == -1) {
                // 寫開始標識
                writeDefinition20(out);
                out.writeObjectBegin(cl.getName());
            }
            // 寫字段數據
            writeInstance(obj, out);
        }
    }

    // 寫字段信息, 依次列舉字段,而後依次寫入, 若是是複合類型, 
    public void writeInstance(Object obj, AbstractHessianOutput out)
            throws IOException {
        for (int i = 0; i < _fields.length; i++) {
            Field field = _fields[i];

            _fieldSerializers[i].serialize(out, obj, field);
        }
    }
    // 寫字段信息, FieldSerializer 是個 JavaSerializer 的內部類
    static class FieldSerializer {
        static final FieldSerializer SER = new FieldSerializer();

        void serialize(AbstractHessianOutput out, Object obj, Field field)
                throws IOException {
            Object value = null;

            try {
                value = field.get(obj);
            } catch (IllegalAccessException e) {
                log.log(Level.FINE, e.toString(), e);
            }

            try {
                // 基於前面已經寫入的參數類型信息,如今寫入 value 便可, 若是value() 是複合類型,則再次遞歸處理
                out.writeObject(value);
            } catch (RuntimeException e) {
                throw new RuntimeException(e.getMessage() + "\n Java field: " + field,
                        e);
            } catch (IOException e) {
                throw new IOExceptionWrapper(e.getMessage() + "\n Java field: " + field,
                        e);
            }
        }
    }

 

  來看一下一個 string 的寫入方式吧!

    // String 的編碼, 直接調用 out.writeString()
    static class StringFieldSerializer extends FieldSerializer {
        static final FieldSerializer SER = new StringFieldSerializer();

        @Override
        void serialize(AbstractHessianOutput out, Object obj, Field field)
                throws IOException {
            String value = null;

            try {
                value = (String) field.get(obj);
            } catch (IllegalAccessException e) {
                log.log(Level.FINE, e.toString(), e);
            }
            // 將string 寫入 buffer 中
            out.writeString(value);
        }
    }

    // com.alibaba.com.caucho.hessian.io.Hessian2Output 將 string 轉換爲 byte 存儲起來, 經過移位處理的方式    
    /**
     * Writes a string value to the stream using UTF-8 encoding.
     * The string will be written with the following syntax:
     * <p>
     * <code><pre>
     * S b16 b8 string-value
     * </pre></code>
     * <p>
     * If the value is null, it will be written as
     * <p>
     * <code><pre>
     * N
     * </pre></code>
     *
     * @param value the string value to write.
     */
    @Override
    public void writeString(String value)
            throws IOException {
        int offset = _offset;
        byte[] buffer = _buffer;

        if (SIZE <= offset + 16) {
            flush();
            offset = _offset;
        }

        if (value == null) {
            buffer[offset++] = (byte) 'N';

            _offset = offset;
        } else {
            int length = value.length();
            int strOffset = 0;

            // 針對長字符, 分段循環寫入
            while (length > 0x8000) {
                int sublen = 0x8000;

                offset = _offset;

                if (SIZE <= offset + 16) {
                    flush();
                    offset = _offset;
                }

                // chunk can't end in high surrogate
                char tail = value.charAt(strOffset + sublen - 1);

                if (0xd800 <= tail && tail <= 0xdbff)
                    sublen--;

                buffer[offset + 0] = (byte) BC_STRING_CHUNK;
                buffer[offset + 1] = (byte) (sublen >> 8);
                buffer[offset + 2] = (byte) (sublen);

                _offset = offset + 3;

                printString(value, strOffset, sublen);

                length -= sublen;
                strOffset += sublen;
            }

            offset = _offset;

            if (SIZE <= offset + 16) {
                flush();
                offset = _offset;
            }

            // 先寫入長度信息, 再寫入string內容
            if (length <= STRING_DIRECT_MAX) {
                // STRING_DIRECT_MAX = 0x1f
                // b0 格式寫入
                buffer[offset++] = (byte) (BC_STRING_DIRECT + length);
            } else if (length <= STRING_SHORT_MAX) {
                // STRING_SHORT_MAX = 0x3ff
                // b8, 0x30 + x >> 8
                buffer[offset++] = (byte) (BC_STRING_SHORT + (length >> 8));
                buffer[offset++] = (byte) (length);
            } else {
                // <= 0x8000, 存在兩位字節
                // S len
                buffer[offset++] = (byte) ('S');
                buffer[offset++] = (byte) (length >> 8);
                buffer[offset++] = (byte) (length);
            }

            _offset = offset;

            printString(value, strOffset, length);
        }
    }
    
    // 寫入string內容, 以utf-8編碼
    /**
     * Prints a string to the stream, encoded as UTF-8
     *
     * @param v the string to print.
     */
    public void printString(String v, int strOffset, int length)
            throws IOException {
        int offset = _offset;
        byte[] buffer = _buffer;

        for (int i = 0; i < length; i++) {
            if (SIZE <= offset + 16) {
                _offset = offset;
                flush();
                offset = _offset;
            }

            char ch = v.charAt(i + strOffset);

            if (ch < 0x80)
                buffer[offset++] = (byte) (ch);
            else if (ch < 0x800) {
                buffer[offset++] = (byte) (0xc0 + ((ch >> 6) & 0x1f));
                buffer[offset++] = (byte) (0x80 + (ch & 0x3f));
            } else {
                buffer[offset++] = (byte) (0xe0 + ((ch >> 12) & 0xf));
                buffer[offset++] = (byte) (0x80 + ((ch >> 6) & 0x3f));
                buffer[offset++] = (byte) (0x80 + (ch & 0x3f));
            }
        }

        _offset = offset;
    }

  解碼則是在 JavaDeserializer 中, 大概思路就是, 按照序列化的方向,反向拆解下就好了!例如:

    
    public JavaDeserializer(Class cl) {
        _type = cl;
        _fieldMap = getFieldMap(cl);

        _readResolve = getReadResolve(cl);

        if (_readResolve != null) {
            _readResolve.setAccessible(true);
        }

        Constructor[] constructors = cl.getDeclaredConstructors();
        long bestCost = Long.MAX_VALUE;

        for (int i = 0; i < constructors.length; i++) {
            Class[] param = constructors[i].getParameterTypes();
            long cost = 0;

            for (int j = 0; j < param.length; j++) {
                cost = 4 * cost;

                if (Object.class.equals(param[j]))
                    cost += 1;
                else if (String.class.equals(param[j]))
                    cost += 2;
                else if (int.class.equals(param[j]))
                    cost += 3;
                else if (long.class.equals(param[j]))
                    cost += 4;
                else if (param[j].isPrimitive())
                    cost += 5;
                else
                    cost += 6;
            }

            if (cost < 0 || cost > (1 << 48))
                cost = 1 << 48;

            cost += (long) param.length << 48;

            if (cost < bestCost) {
                _constructor = constructors[i];
                bestCost = cost;
            }
        }

        if (_constructor != null) {
            _constructor.setAccessible(true);
            Class[] params = _constructor.getParameterTypes();
            _constructorArgs = new Object[params.length];
            for (int i = 0; i < params.length; i++) {
                _constructorArgs[i] = getParamArg(params[i]);
            }
        }
    }
    
    static class ObjectFieldDeserializer extends FieldDeserializer {
        private final Field _field;

        ObjectFieldDeserializer(Field field) {
            _field = field;
        }

        @Override
        void deserialize(AbstractHessianInput in, Object obj)
                throws IOException {
            Object value = null;

            try {
                value = in.readObject(_field.getType());

                _field.set(obj, value);
            } catch (Exception e) {
                logDeserializeError(_field, obj, value, e);
            }
        }
    }

  寫入最後數據後,觸發異步 worker 進行發送處理!

    // org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink, 寫入最後數據後,觸發異步 worker 進行發送處理
    private static void handleAcceptedSocket(ChannelEvent e) {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();

            switch (state) {
            case OPEN:
                if (Boolean.FALSE.equals(value)) {
                    channel.worker.close(channel, future);
                }
                break;
            case BOUND:
            case CONNECTED:
                if (value == null) {
                    channel.worker.close(channel, future);
                }
                break;
            case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                break;
            }
        } else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            boolean offered = channel.writeBufferQueue.offer(event);
            assert offered;
            channel.worker.writeFromUserCode(channel);
        }
    }
    // org.jboss.netty.channel.socket.nio.AbstractNioWorker
    void writeFromUserCode(final AbstractNioChannel<?> channel) {
        if (!channel.isConnected()) {
            // 若是已斷開,則清理 buffer, 防止內存泄漏
            cleanUpWriteBuffer(channel);
            return;
        }

        // 加入異步隊列處理, 成功則返回
        if (scheduleWriteIfNecessary(channel)) {
            return;
        }

        // From here, we are sure Thread.currentThread() == workerThread.

        if (channel.writeSuspended) {
            return;
        }

        if (channel.inWriteNowLoop) {
            return;
        }

        // 當前線程,則直接寫
        write0(channel);
    }
        
    protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
        Exception cause = null;
        boolean fireExceptionCaught = false;

        // Clean up the stale messages in the write buffer.
        synchronized (channel.writeLock) {
            MessageEvent evt = channel.currentWriteEvent;
            if (evt != null) {
                // Create the exception only once to avoid the excessive overhead
                // caused by fillStackTrace.
                if (channel.isOpen()) {
                    cause = new NotYetConnectedException();
                } else {
                    cause = new ClosedChannelException();
                }

                ChannelFuture future = evt.getFuture();
                if (channel.currentWriteBuffer != null) {
                    channel.currentWriteBuffer.release();
                    channel.currentWriteBuffer = null;
                }
                channel.currentWriteEvent = null;
                // Mark the event object for garbage collection.
                //noinspection UnusedAssignment
                evt = null;
                future.setFailure(cause);
                fireExceptionCaught = true;
            }

            Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
            for (;;) {
                evt = writeBuffer.poll();
                if (evt == null) {
                    break;
                }
                // Create the exception only once to avoid the excessive overhead
                // caused by fillStackTrace.
                if (cause == null) {
                    if (channel.isOpen()) {
                        cause = new NotYetConnectedException();
                    } else {
                        cause = new ClosedChannelException();
                    }
                    fireExceptionCaught = true;
                }
                evt.getFuture().setFailure(cause);
            }
        }

        if (fireExceptionCaught) {
            if (isIoThread(channel)) {
                fireExceptionCaught(channel, cause);
            } else {
                fireExceptionCaughtLater(channel, cause);
            }
        }
    }

    protected void write0(AbstractNioChannel<?> channel) {
        boolean open = true;
        boolean addOpWrite = false;
        boolean removeOpWrite = false;
        boolean iothread = isIoThread(channel);

        long writtenBytes = 0;

        final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
        final WritableByteChannel ch = channel.channel;
        final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
        final int writeSpinCount = channel.getConfig().getWriteSpinCount();
        List<Throwable> causes = null;

        synchronized (channel.writeLock) {
            channel.inWriteNowLoop = true;
            for (;;) {

                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf = null;
                ChannelFuture future = null;
                try {
                    if (evt == null) {
                        if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                            removeOpWrite = true;
                            channel.writeSuspended = false;
                            break;
                        }
                        future = evt.getFuture();

                        channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                    } else {
                        future = evt.getFuture();
                        buf = channel.currentWriteBuffer;
                    }

                    long localWrittenBytes = 0;
                    for (int i = writeSpinCount; i > 0; i --) {
                        localWrittenBytes = buf.transferTo(ch);
                        if (localWrittenBytes != 0) {
                            writtenBytes += localWrittenBytes;
                            break;
                        }
                        if (buf.finished()) {
                            break;
                        }
                    }

                    if (buf.finished()) {
                        // Successful write - proceed to the next message.
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        // Mark the event object for garbage collection.
                        //noinspection UnusedAssignment
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } else {
                        // Not written fully - perhaps the kernel buffer is full.
                        addOpWrite = true;
                        channel.writeSuspended = true;

                        if (writtenBytes > 0) {
                            // Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        break;
                    }
                } catch (AsynchronousCloseException e) {
                    // Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    if (buf != null) {
                        buf.release();
                    }
                    channel.currentWriteEvent = null;
                    channel.currentWriteBuffer = null;
                    // Mark the event object for garbage collection.
                    //noinspection UnusedAssignment
                    buf = null;
                    //noinspection UnusedAssignment
                    evt = null;
                    if (future != null) {
                        future.setFailure(t);
                    }
                    if (iothread) {
                        // An exception was thrown from within a write in the iothread. We store a reference to it
                        // in a list for now and notify the handlers in the chain after the writeLock was released
                        // to prevent possible deadlock.
                        // See #1310
                        if (causes == null) {
                            causes = new ArrayList<Throwable>(1);
                        }
                        causes.add(t);
                    } else {
                        fireExceptionCaughtLater(channel, t);
                    }
                    if (t instanceof IOException) {
                        // close must be handled from outside the write lock to fix a possible deadlock
                        // which can happen when MemoryAwareThreadPoolExecutor is used and the limit is exceed
                        // and a close is triggered while the lock is hold. This is because the close(..)
                        // may try to submit a task to handle it via the ExecutorHandler which then deadlocks.
                        // See #1310
                        open = false;
                    }
                }
            }
            channel.inWriteNowLoop = false;

            // Initially, the following block was executed after releasing
            // the writeLock, but there was a race condition, and it has to be
            // executed before releasing the writeLock:
            //
            //     https://issues.jboss.org/browse/NETTY-410
            //
            if (open) {
                if (addOpWrite) {
                    setOpWrite(channel);
                } else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }
        if (causes != null) {
            for (Throwable cause: causes) {
                // notify about cause now as it was triggered in the write loop
                fireExceptionCaught(channel, cause);
            }
        }
        if (!open) {
            // close the channel now
            close(channel, succeededFuture(channel));
        }
        // 鉤子事件
        if (iothread) {
            fireWriteComplete(channel, writtenBytes);
        } else {
            fireWriteCompleteLater(channel, writtenBytes);
        }
    }
相關文章
相關標籤/搜索