Dubbo源碼分析(五)Dubbo調用鏈-服務端

這篇來分析Dubbo服務端接收消費端請求調用的過程,先看一張調用鏈的總體流程圖 java

上面綠色的部分爲服務端接收請求部分,大致流程是從ThreadPool-->server-->Exporter-->Filter-->Invoker-->Impl,下面來看源碼

源碼入口

咱們知道Dubbo默認是經過Netty進行網絡傳輸,因此這裏的源碼入口咱們應該找到NettyHandler的接收消息的方法網絡

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
複製代碼

這裏就是服務端接收消費端發送請求的地方,進入handler.received方法,在AbstractPeer類中app

public void received(Channel ch, Object msg) throws RemotingException {
        if (closed) {
            return;
        }
        handler.received(ch, msg);
    }
複製代碼

進入到handler.received,最終咱們進入AllChannelHandler.received方法中框架

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
複製代碼

這裏從線程池總來執行請求,咱們看到ChannelEventRunnable類,這個類中必定有一個run方法,咱們看一下異步

public void run() {
        switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case RECEIVED:
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
        }
    }
複製代碼

這裏有不少種類型的請求,咱們此次是RECEIVED請求,進入handler.received(channel, message)方法,此時的handler=DecodeHandler,先進行解碼,這裏的內容暫時不說,放在後面的解編碼一塊兒說,繼續進入HeaderExchangeHandler.received方法,ide

public void received(Channel channel, Object message) throws RemotingException {
        ···
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } ···
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
複製代碼

執行handleRequest(exchangeChannel, request)方法,這裏是網絡通訊接收處理方法,繼續走,進入DubboProtocol.reply方法源碼分析

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                ···
                return invoker.invoke(inv);
            }
           ···
        }
複製代碼

進入getInvoker方法,post

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
       ···
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
       ···
        return exporter.getInvoker();
    }
複製代碼

從exporterMap中獲取所需的exporter,還記不記得這個exporterMap是何時放入值的,在服務端暴露接口的時候,這個是在第二篇中有提到過編碼

而後執行exporter.getInvoker(),如今咱們要將exporter轉化爲invoeker對象,咱們拿到這個invoker對象,並執行invoker.invoke(inv)方法,而後通過8個Filter,最終進入JavassistProxyFactory.AbstractProxyInvoker.doInvoke方法,

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
複製代碼

還記得這段代碼嗎?這個wrapper是什麼?下面這個截圖是在服務暴露的時候,url

這個時候執行 wrapper.invokeMethod方法,此時的wrapper就是以前咱們動態生成的一個wrapper包裝類,而後進入真正的實現類DemoServiceImpl.sayHello方法,執行完成以後,回到HeaderExchangeHandler.received方法

channel.send(response)
複製代碼

最終把結果經過ChannelFuture future = channel.write(message)發回consumer

消費端接收服務端發送的執行結果

咱們先進入NettyHandler.messageReceived方法,再執行handler.received(channel, e.getMessage()),這個和上面的代碼是同樣的,繼續進入MultiMessageHandler.received方法,繼續進入ChannelEventRunnable線程池,繼續進入DecodeHandler.received解碼,最終進入DefaultFuture.doReceived方法

private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
複製代碼

這裏用到了一個Condition,就是這個done,這裏的done.signal的做用是什麼呢?既然有signal方法,必定還有done.await方法,咱們看到這個get方法,

public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
複製代碼

這個裏面有一個done.await方法,貌似這個get方法有點熟悉,在哪裏見過呢,在DubboInvoker.doInvoke()方法中的最後一行,貼代碼

return (Result) currentClient.request(inv, timeout).get();
複製代碼

這裏拿到消費端發起請求以後,調用了get方法,這個get方法一直阻塞,直到服務端返回結果調用done.signal(),這個也是Dubbo的異步轉同步機制的實現方式,至此,Dubbo的調用鏈就分析完了

相關文章
相關標籤/搜索