dubbo消費者調用流程

    dubbo相關說明(官方):bootstrap

  • 在RPC中,Protocol是核心層,也就是隻要有Protocol + Invoker + Exporter就能夠完成非透明的RPC調用,而後在Invoker的主過程上Filter攔截點。api

  • 圖中的Consumer和Provider是抽象概念,只是想讓看圖者更直觀的瞭解哪些類分屬於客戶端與服務器端,不用Client和Server的緣由是Dubbo在不少場景下都使用Provider, Consumer, Registry, Monitor劃分邏輯拓普節點,保持統一律念。服務器

  • 而Cluster是外圍概念,因此Cluster的目的是將多個Invoker假裝成一個Invoker,這樣其它人只要關注Protocol層Invoker便可,加上Cluster或者去掉Cluster對其它層都不會形成影響,由於只有一個提供者時,是不須要Cluster的。app

  • Proxy層封裝了全部接口的透明化代理,而在其它層都以Invoker爲中心,只有到了暴露給用戶使用時,才用Proxy將Invoker轉成接口,或將接口實現轉成Invoker,也就是去掉Proxy層RPC是能夠Run的,只是不那麼透明,不那麼看起來像調本地服務同樣調遠程服務。socket

  • 而Remoting實現是Dubbo協議的實現,若是你選擇RMI協議,整個Remoting都不會用上,Remoting內部再劃爲Transport傳輸層和Exchange信息交換層,Transport層只負責單向消息傳輸,是對Mina,Netty,Grizzly的抽象,它也能夠擴展UDP傳輸,而Exchange層是在傳輸層之上封裝了Request-Response語義。tcp

    

    dubbo遠程調用詳細過程,官方中有個大概的流程,根據本身的理解跟蹤畫出下面的調用鏈(默認使用dubbo協議):ide

    調用鏈經過一系列的Invoker和filter,最終經過netty實現遠程通訊。ui

    其中:this

  • 服務消費端:

    經過LazyConnectExchangeClient.request()中調用initClient()對NettyClient進行初始化;編碼

private void initClient() throws RemotingException {
    if (client != null )
        return;
    if (logger.isInfoEnabled()) {
        logger.info("Lazy connect to " + url);
    }
    connectLock.lock();
    try {
        if (client != null)
            return;
        //建立鏈接
        this.client = Exchangers.connect(url, requestHandler);
    } finally {
        connectLock.unlock();
    }
}

    經過NettyClient建立鏈接:

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    bootstrap = new ClientBootstrap(channelFactory);
    // config
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getTimeout());
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ChannelPipeline pipeline = Channels.pipeline();
            //添加編解碼器,對消息進行編碼、解碼
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            //添加消息接收、發送處理
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
}

    adapter.getEncoder():InternalEncoder:

private class InternalEncoder extends OneToOneEncoder {

    @Override
    protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
        com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
            com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
        NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
        try {
           //編碼處理
           codec.encode(channel, buffer, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ch);
        }
        return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
    }
}

     ExchangeCodec:

public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    //對請求參數編碼
    if (msg instanceof Request) {
        encodeRequest(channel, buffer, (Request) msg);
    } else if (msg instanceof Response) {
        encodeResponse(channel, buffer, (Response) msg);
    } else {
        super.encode(channel, buffer, msg);
    }
}

    encodeRequest():

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    Serialization serialization = getSerialization(channel);
    // header. 協議頭
    byte[] header = new byte[HEADER_LENGTH];
    // set magic number. 添加2字節的魔數
    Bytes.short2bytes(MAGIC, header);
    //添加序1個字節的列化標識
    // set request and serialization flag.
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

    if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
    if (req.isEvent()) header[2] |= FLAG_EVENT;

    // set request id. 添加8字節的請求惟一id
    Bytes.long2bytes(req.getId(), header, 4);

    // encode request data. 對請求數據進行encode
    int savedWriteIndex = buffer.writerIndex();
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {
        encodeEventData(channel, out, req.getData());
    } else {
        //經過子類DubboCodec編碼請求
        encodeRequestData(channel, out, req.getData());
    }
    out.flushBuffer();
    bos.flush();
    bos.close();
    int len = bos.writtenBytes();
    checkPayload(channel, len);
    Bytes.int2bytes(len, header, 12);

    // write 寫入buffer中(請求起始地址、結束地址以及header)
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

    DubboCodec.encodeRequestData()發送消息:

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;
    //請求消息中包含dubbo版本號、接口、參數等信息
    out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
    out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
    out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

    out.writeUTF(inv.getMethodName());
    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
    Object[] args = inv.getArguments();
    if (args != null)
    for (int i = 0; i < args.length; i++){
        out.writeObject(encodeInvocationArgument(channel, inv, i));
    }
    out.writeObject(inv.getAttachments());
}

     消息數據包含dubbo版本號、接口名稱、、方法名稱、參數類等信息,將它們序列化後寫入到類型到buffer中。

  • 服務提供端接收請求:

    經過接收到的請求數據,進行decode,解碼完成後經過調用連進入NettyHandler.messageReceived()進行處理接收到的消息=>HeaderExchangeHandler.received()

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        //請求消息
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    //處理請求
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            //......
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

    handleRequest()方法:

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    //處理失敗的消息  
    if (req.isBroken()) {
        //.........
        return res;
    }
    // find handler by message class.
    Object msg = req.getData();
    try {
        // handle data. 服務端處理接收到的消息,handler爲DubboProtocol的內部實現
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

    DubboProtocol.requestHandler:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        Invocation inv = (Invocation) message;
        //獲取invoker
        Invoker<?> invoker = getInvoker(channel, inv);
        //若是是callback 須要處理高版本調用低版本的問題
        if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
            String methodsStr = invoker.getUrl().getParameters().get("methods");
            boolean hasMethod = false;
            if (methodsStr == null || methodsStr.indexOf(",") == -1){
                hasMethod = inv.getMethodName().equals(methodsStr);
            } else {
                String[] methods = methodsStr.split(",");
                for (String method : methods){
                    if (inv.getMethodName().equals(method)){
                        hasMethod = true;
                        break;
                    }
                }
            }
            if (!hasMethod){
                logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                return null;
            }
        }
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
        //經過invoke調用目標方法
        return invoker.invoke(inv);
    }
    throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}

    經過invoke調用目標方法,並最終執行接口相關邏輯;並把結果封裝爲response返回給客戶端(詳見HeaderExchangeHandler.received()中channel.send(response)方法)。

    同理:返回給客戶端的reponse對象也會通過編碼encode:包括魔數、序列化協議、響應碼、requestId等。

  • 服務消費端解析返回的結果數據

    服務端給客戶端返回數據以後,客戶端會收到IO事件,NettyClient對響應數據進行解碼(即解析requestId、響應碼、序列化協議、響應數據等);解碼完成後經過client中綁定的NettyHandler調用其received()方法處理(相似服務端解析邏輯)。

    NettyHandler:   

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
           //........處理請求request——服務端
        } else if (message instanceof Response) {
            //響應response——服務消費端
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

    handleResponse(channel, (Response) message):

static void handleResponse(Channel channel, Response response) throws RemotingException {
    //響應內容不爲空而且不是心跳檢測的請求響應
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}

    DefaultFuture:

public static void received(Channel channel, Response response) {
    try {
        //移除當前future
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            //喚醒等待響應結果的線程
            future.doReceived(response);
        } else {
            //.....
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}
private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            //喚醒等待的線程
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}

    調用netty發送數據後,該請求線程一直DefaultFuture.await()等待響應。

    注:經過綁定的NettyServer:NettyHandler.messageReceived()——>HeaderExchangeHandler喚醒DefaultFuture後續處理

相關文章
相關標籤/搜索