Dubbo源碼解析之服務調用過程

  1. 簡介 在前面的文章中,咱們分析了 Dubbo SPI、服務導出與引入、以及集羣容錯方面的代碼。通過前文的鋪墊,本篇文章咱們終於能夠分析服務調用過程了。Dubbo 服務調用過程比較複雜,包含衆多步驟,好比發送請求、編解碼、服務降級、過濾器鏈處理、序列化、線程派發以及響應請求等步驟。限於篇幅緣由,本篇文章沒法對全部的步驟一一進行分析。本篇文章將會重點分析請求的發送與接收、編解碼、線程派發以及響應的發送與接收等過程,至於服務降級、過濾器鏈和序列化你們自行進行分析,也能夠將其當成一個黑盒,暫時忽略也不要緊。介紹完本篇文章要分析的內容,接下來咱們進入正題吧。

2. 源碼分析

在進行源碼分析以前,咱們先來經過一張圖瞭解 Dubbo 服務調用過程。html

首先服務消費者經過代理對象 Proxy 發起遠程調用,接着經過網絡客戶端 Client 將編碼後的請求發送給服務提供方的網絡層上,也就是 Server。Server 在收到請求後,首先要作的事情是對數據包進行解碼。而後將解碼後的請求發送至分發器 Dispatcher,再由分發器將請求派發到指定的線程池上,最後由線程池調用具體的服務。這就是一個遠程調用請求的發送與接收過程。至於響應的發送與接收過程,這張圖中沒有表現出來。對於這兩個過程,咱們也會進行詳細分析。java

2.1 服務調用方式

Dubbo 支持同步和異步兩種調用方式,其中異步調用還可細分爲「有返回值」的異步調用和「無返回值」的異步調用。所謂「無返回值」異步調用是指服務消費方只管調用,但不關心調用結果,此時 Dubbo 會直接返回一個空的 RpcResult。若要使用異步特性,須要服務消費方手動進行配置。默認狀況下,Dubbo 使用同步調用方式。git

本節以及其餘章節將會使用 Dubbo 官方提供的 Demo 分析整個調用過程,下面咱們從 DemoService 接口的代理類開始進行分析。Dubbo 默認使用 Javassist 框架爲服務接口生成動態代理類,所以咱們須要先將代理類進行反編譯才能看到源碼。這裏使用阿里開源 Java 應用診斷工具 Arthas 反編譯代理類,結果以下:github

/** * Arthas 反編譯步驟: * 1. 啓動 Arthas * java -jar arthas-boot.jar * * 2. 輸入編號選擇進程 * Arthas 啓動後,會打印 Java 應用進程列表,以下: * [1]: 11232 org.jetbrains.jps.cmdline.Launcher * [2]: 22370 org.jetbrains.jps.cmdline.Launcher * [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer * [4]: 22362 com.alibaba.dubbo.demo.provider.Provider * [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain * 這裏輸入編號 3,讓 Arthas 關聯到啓動類爲 com.....Consumer 的 Java 進程上 * * 3. 因爲 Demo 項目中只有一個服務接口,所以此接口的代理類類名爲 proxy0,此時使用 sc 命令搜索這個類名。 * $ sc *.proxy0 * com.alibaba.dubbo.common.bytecode.proxy0 * * 4. 使用 jad 命令反編譯 com.alibaba.dubbo.common.bytecode.proxy0 * $ jad com.alibaba.dubbo.common.bytecode.proxy0 * * 更多使用方法請參考 Arthas 官方文檔: * https://alibaba.github.io/arthas/quick-start.html */
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
    // 方法數組
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }

    public proxy0() {
    }

    public String sayHello(String string) {
        // 將參數存儲到 Object 數組中
        Object[] arrobject = new Object[]{string};
        // 調用 InvocationHandler 實現類的 invoke 方法獲得調用結果
        Object object = this.handler.invoke(this, methods[0], arrobject);
        // 返回調用結果
        return (String)object;
    }

    /** 回聲測試方法 */
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;
    }
}
複製代碼

如上,代理類的邏輯比較簡單。首先將運行時參數存儲到數組中,而後調用 InvocationHandler 接口實現類的 invoke 方法,獲得調用結果,最後將結果轉型並返回給調用方。關於代理類的邏輯就說這麼多,繼續向下分析。數據庫

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        
        // 攔截定義在 Object 類中的方法(未被子類重寫),好比 wait/notify
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        
        // 若是 toString、hashCode 和 equals 等方法被子類重寫了,這裏也直接調用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        
        // 將 method 和 args 封裝到 RpcInvocation 中,並執行後續的調用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}
複製代碼

InvokerInvocationHandler 中的 invoker 成員變量類型爲 MockClusterInvoker,MockClusterInvoker 內部封裝了服務降級邏輯。下面簡單看一下:apache

public class MockClusterInvoker<T> implements Invoker<T> {
    
    private final Invoker<T> invoker;
    
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        // 獲取 mock 配置值
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            // 無 mock 邏輯,直接調用其餘 Invoker 對象的 invoke 方法,
            // 好比 FailoverClusterInvoker
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            // force:xxx 直接執行 mock 邏輯,不發起遠程調用
            result = doMockInvoke(invocation, null);
        } else {
            // fail:xxx 表示消費方對調用服務失敗後,再執行 mock 邏輯,不拋出異常
            try {
                // 調用其餘 Invoker 對象的 invoke 方法
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    // 調用失敗,執行 mock 邏輯
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
    
    // 省略其餘方法
}
複製代碼

服務降級不是本文重點,所以這裏就不分析 doMockInvoke 方法了。考慮到前文已經詳細分析過 FailoverClusterInvoker,所以本節略過 FailoverClusterInvoker,直接分析 DubboInvoker。編程

public abstract class AbstractInvoker<T> implements Invoker<T> {
    
    public Result invoke(Invocation inv) throws RpcException {
        if (destroyed.get()) {
            throw new RpcException("Rpc invoker for service ...");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        // 設置 Invoker
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            // 設置 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 變量中
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 設置異步信息到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            // 抽象方法,由子類實現
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            // ...
        } catch (RpcException e) {
            // ...
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
    
    // 省略其餘方法
}
上面的代碼來自 AbstractInvoker 類,其中大部分代碼用於添加信息到 RpcInvocation#attachment 變量中,添加完畢後,調用 doInvoke 執行後續的調用。doInvoke 是一個抽象方法,須要由子類實現,下面到 DubboInvoker 中看一下。

public class DubboInvoker<T> extends AbstractInvoker<T> {
    
    private final ExchangeClient[] clients;
    
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 設置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 從 clients 數組中獲取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 獲取異步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 爲 true,表示「單向」通訊
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 異步無返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 發送請求
                currentClient.send(inv, isSent);
                // 設置上下文中的 future 字段爲 null
                RpcContext.getContext().setFuture(null);
                // 返回一個空的 RpcResult
                return new RpcResult();
            } 

            // 異步有返回值
            else if (isAsync) {
                // 發送請求,並獲得一個 ResponseFuture 實例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 設置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                // 暫時返回一個空結果
                return new RpcResult();
            } 

            // 同步調用
            else {
                RpcContext.getContext().setFuture(null);
                // 發送請求,獲得一個 ResponseFuture 實例,並調用該實例的 get 方法進行等待
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
        }
    }
    
    // 省略其餘方法
}
複製代碼

上面的代碼包含了 Dubbo 對同步和異步調用的處理邏輯,搞懂了上面的代碼,會對 Dubbo 的同步和異步調用方式有更深刻的瞭解。Dubbo 實現同步和異步調用比較關鍵的一點就在於由誰調用 ResponseFuture 的 get 方法。同步調用模式下,由框架自身調用 ResponseFuture 的 get 方法。異步調用模式下,則由用戶調用該方法。ResponseFuture 是一個接口,下面咱們來看一下它的默認實現類 DefaultFuture 的源碼。數組

public class DefaultFuture implements ResponseFuture {
    
    private static final Map<Long, Channel> CHANNELS = 
        new ConcurrentHashMap<Long, Channel>();

    private static final Map<Long, DefaultFuture> FUTURES = 
        new ConcurrentHashMap<Long, DefaultFuture>();
    
    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private volatile Response response;
    
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        
        // 獲取請求 id,這個 id 很重要,後面還會見到
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 存儲 <requestId, DefaultFuture> 映射關係到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
    
    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        
        // 檢測服務提供方是否成功返回了調用結果
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                // 循環檢測服務提供方是否成功返回了調用結果
                while (!isDone()) {
                    // 若是調用結果還沒有返回,這裏等待一段時間
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 若是調用結果成功返回,或等待超時,此時跳出 while 循環,執行後續的邏輯
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            
            // 若是調用結果仍未返回,則拋出超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        
        // 返回調用結果
        return returnFromResponse();
    }
    
    @Override
    public boolean isDone() {
        // 經過檢測 response 字段爲空與否,判斷是否收到了調用結果
        return response != null;
    }
    
    private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        
        // 若是調用結果的狀態爲 Response.OK,則表示調用過程正常,服務提供方成功返回了調用結果
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        }
        
        // 拋出異常
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }
    
    // 省略其餘方法
}
複製代碼

如上,當服務消費者還未接收到調用結果時,用戶線程調用 get 方法會被阻塞住。同步調用模式下,框架得到 DefaultFuture 對象後,會當即調用 get 方法進行等待。而異步模式下則是將該對象封裝到 FutureAdapter 實例中,並將 FutureAdapter 實例設置到 RpcContext 中,供用戶使用。FutureAdapter 是一個適配器,用於將 Dubbo 中的 ResponseFuture 與 JDK 中的 Future 進行適配。這樣當用戶線程調用 Future 的 get 方法時,通過 FutureAdapter 適配,最終會調用 ResponseFuture 實現類對象的 get 方法,也就是 DefaultFuture 的 get 方法。bash

到這裏關於 Dubbo 幾種調用方式的代碼邏輯就分析完了,下面來分析請求數據的發送與接收,以及響應數據的發送與接收過程。網絡

2.2 服務消費方發送請求

2.2.1 發送請求

本節咱們來看一下同步調用模式下,服務消費方是如何發送調用請求的。在深刻分析源碼前,咱們先來看一張圖。

這張圖展現了服務消費方發送請求過程的部分調用棧,略爲複雜。從上圖能夠看出,通過屢次調用後,纔將請求數據送至 Netty NioClientSocketChannel。這樣作的緣由是經過 Exchange 層爲框架引入 Request 和 Response 語義,這一點會在接下來的源碼分析過程當中會看到。其餘的就很少說了,下面開始進行分析。首先分析 ReferenceCountExchangeClient 的源碼。

final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
        this.client = client;
        // 引用計數自增
        referenceCount.incrementAndGet();
        this.url = client.getUrl();
        
        // ...
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        // 直接調用被裝飾對象的同簽名方法
        return client.request(request);
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接調用被裝飾對象的同簽名方法
        return client.request(request, timeout);
    }

    /** 引用計數自增,該方法由外部調用 */
    public void incrementAndGetCount() {
        // referenceCount 自增
        referenceCount.incrementAndGet();
    }
    
        @Override
    public void close(int timeout) {
        // referenceCount 自減
        if (referenceCount.decrementAndGet() <= 0) {
            if (timeout == 0) {
                client.close();
            } else {
                client.close(timeout);
            }
            client = replaceWithLazyClient();
        }
    }
    
    // 省略部分方法
}
複製代碼

ReferenceCountExchangeClient 內部定義了一個引用計數變量 referenceCount,每當該對象被引用一次 referenceCount 都會進行自增。每當 close 方法被調用時,referenceCount 進行自減。ReferenceCountExchangeClient 內部僅實現了一個引用計數的功能,其餘方法並沒有複雜邏輯,均是直接調用被裝飾對象的相關方法。因此這裏就很少說了,繼續向下分析,此次是 HeaderExchangeClient。

public class HeaderExchangeClient implements ExchangeClient {

    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
    private final Client client;
    private final ExchangeChannel channel;
    private ScheduledFuture<?> heartbeatTimer;
    private int heartbeat;
    private int heartbeatTimeout;

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        
        // 建立 HeaderExchangeChannel 對象
        this.channel = new HeaderExchangeChannel(client);
        
        // 如下代碼均與心跳檢測邏輯有關
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        if (needHeartbeat) {
            // 開啓心跳檢測定時器
            startHeartbeatTimer();
        }
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        // 直接 HeaderExchangeChannel 對象的同簽名方法
        return channel.request(request);
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接 HeaderExchangeChannel 對象的同簽名方法
        return channel.request(request, timeout);
    }

    @Override
    public void close() {
        doClose();
        channel.close();
    }
    
    private void doClose() {
        // 中止心跳檢測定時器
        stopHeartbeatTimer();
    }

    private void startHeartbeatTimer() {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        @Override
                        public Collection<Channel> getChannels() {
                            return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }

    private void stopHeartbeatTimer() {
        if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
            try {
                heartbeatTimer.cancel(true);
                scheduled.purge();
            } catch (Throwable e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        heartbeatTimer = null;
    }
    
    // 省略部分方法
}
複製代碼

HeaderExchangeClient 中不少方法只有一行代碼,即調用 HeaderExchangeChannel 對象的同簽名方法。那 HeaderExchangeClient 有什麼用處呢?答案是封裝了一些關於心跳檢測的邏輯。心跳檢測並不是本文所關注的點,所以就很少說了,繼續向下看。

final class HeaderExchangeChannel implements ExchangeChannel {
    
    private final Channel channel;
    
    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        }
        
        // 這裏的 channel 指向的是 NettyClient
        this.channel = channel;
    }
    
    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(..., "Failed to send request ...);
        }
        // 建立 Request 對象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        // 設置雙向通訊標誌爲 true
        req.setTwoWay(true);
        // 這裏的 request 變量類型爲 RpcInvocation
        req.setData(request);
                                        
        // 建立 DefaultFuture 對象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 調用 NettyClient 的 send 方法發送請求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        // 返回 DefaultFuture 對象
        return future;
    }
}
複製代碼

到這裏你們終於看到了 Request 語義了,上面的方法首先定義了一個 Request 對象,而後再將該對象傳給 NettyClient 的 send 方法,進行後續的調用。須要說明的是,NettyClient 中並未實現 send 方法,該方法繼承自父類 AbstractPeer,下面直接分析 AbstractPeer 的代碼。

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    
    @Override
    public void send(Object message) throws RemotingException {
        // 該方法由 AbstractClient 類實現
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }
    
    // 省略其餘方法
}
複製代碼
public abstract class AbstractClient extends AbstractEndpoint implements Client {
    
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        
        // 獲取 Channel,getChannel 是一個抽象方法,具體由子類實現
        Channel channel = getChannel();
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send ...");
        }
        
        // 繼續向下調用
        channel.send(message, sent);
    }
    
    protected abstract Channel getChannel();
    
    // 省略其餘方法
}
複製代碼

默認狀況下,Dubbo 使用 Netty 做爲底層的通訊框架,所以下面咱們到 NettyClient 類中看一下 getChannel 方法的實現邏輯。

public class NettyClient extends AbstractClient {
    
    // 這裏的 Channel 全限定名稱爲 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 獲取一個 NettyChannel 類型對象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}
複製代碼
final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = 
        new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

    private final org.jboss.netty.channel.Channel channel;
    
    /** 私有構造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }
        
        // 嘗試從集合中獲取 NettyChannel 實例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 若是 ret = null,則建立一個新的 NettyChannel 實例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 將 <Channel, NettyChannel> 鍵值對存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            }
            if (ret == null) {
                ret = nc;
            }
        }
        return ret;
    }
}
複製代碼

獲取到 NettyChannel 實例後,便可進行後續的調用。下面看一下 NettyChannel 的 send 方法。

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 發送消息(包含請求和響應消息)
        ChannelFuture future = channel.write(message);
        
        // sent 的值源於 <dubbo:method sent="true/false" /> 中 sent 的配置值,有兩種配置值:
        // 1. true: 等待消息發出,消息發送失敗將拋出異常
        // 2. false: 不等待消息發出,將消息放入 IO 隊列,即刻返回
        // 默認狀況下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息發出,若在規定時間沒能發出,success 會被置爲 false
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message ...");
    }

    // 若 success 爲 false,這裏拋出異常
    if (!success) {
        throw new RemotingException(this, "Failed to send message ...");
    }
}
複製代碼

經歷屢次調用,到這裏請求數據的發送過程就結束了,過程漫長。爲了便於你們閱讀代碼,這裏以 DemoService 爲例,將 sayHello 方法的整個調用路徑貼出來。

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation) // 包含多個 Filter 調用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)
複製代碼

在 Netty 中,出站數據在發出以前還須要進行編碼操做,接下來咱們來分析一下請求數據的編碼邏輯。

2.2.2 請求編碼

在分析請求編碼邏輯以前,咱們先來看一下 Dubbo 數據包結構。

Dubbo 數據包分爲消息頭和消息體,消息頭用於存儲一些元信息,好比魔數(Magic),數據包類型(Request/Response),消息體長度(Data Length)等。消息體中用於存儲具體的調用消息,好比方法名稱,參數列表等。下面簡單列舉一下消息頭的內容。

偏移量(Bit) 字段 取值 0 ~ 7 魔數高位 0xda00 8 ~ 15 魔數低位 0xbb 16 數據包類型 0 - Response, 1 - Request 17 調用方式 僅在第16位被設爲1的狀況下有效,0 - 單向調用,1 - 雙向調用 18 事件標識 0 - 當前數據包是請求或響應包,1 - 當前數據包是心跳包 19 ~ 23 序列化器編號 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization 24 ~ 31 狀態 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ...... 32 ~ 95 請求編號 共8字節,運行時生成 96 ~ 127 消息體長度 運行時計算 瞭解了 Dubbo 數據包格式,接下來咱們就能夠探索編碼過程了。此次咱們開門見山,直接分析編碼邏輯所在類。以下:

public class ExchangeCodec extends TelnetCodec {

    // 消息頭長度
    protected static final int HEADER_LENGTH = 16;
    // 魔數內容
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode() {
        return MAGIC;
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 對 Request 對象進行編碼
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 對 Response 對象進行編碼,後面分析
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);

        // 建立消息頭字節數組,長度爲 16
        byte[] header = new byte[HEADER_LENGTH];

        // 設置魔數
        Bytes.short2bytes(MAGIC, header);

        // 設置數據包類型(Request/Response)和序列化器編號
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 設置通訊方式(單向/雙向)
        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        
        // 設置事件標識
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // 設置請求編號,8個字節,從第4個字節開始設置
        Bytes.long2bytes(req.getId(), header, 4);

        // 獲取 buffer 當前的寫位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,爲消息頭預留 16 個字節的空間
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 建立序列化器,好比 Hessian2ObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            // 對事件數據進行序列化操做
            encodeEventData(channel, out, req.getData());
        } else {
            // 對請求數據進行序列化操做
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        
        // 獲取寫入的字節數,也就是消息體長度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 將消息體長度寫入到消息頭中
        Bytes.int2bytes(len, header, 12);

        // 將 buffer 指針移動到 savedWriteIndex,爲寫消息頭作準備
        buffer.writerIndex(savedWriteIndex);
        // 從 savedWriteIndex 下標處寫入消息頭
        buffer.writeBytes(header);
        // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
    
    // 省略其餘方法
}
複製代碼

以上就是請求對象的編碼過程,該過程首先會經過位運算將消息頭寫入到 header 數組中。而後對 Request 對象的 data 字段執行序列化操做,序列化後的數據最終會存儲到 ChannelBuffer 中。序列化操做執行完後,可獲得數據序列化後的長度 len,緊接着將 len 寫入到 header 指定位置處。最後再將消息頭字節數組 header 寫入到 ChannelBuffer 中,整個編碼過程就結束了。本節的最後,咱們再來看一下 Request 對象的 data 字段序列化過程,也就是 encodeRequestData 方法的邏輯,以下:

public class DubboCodec extends ExchangeCodec implements Codec2 {
    
	protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        // 依次序列化 dubbo version、path、version
        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        // 序列化調用方法名
        out.writeUTF(inv.getMethodName());
        // 將參數類型轉換爲字符串,並進行序列化
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i = 0; i < args.length; i++) {
                // 對運行時參數進行序列化
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
        
        // 序列化 attachments
        out.writeObject(inv.getAttachments());
    }
}
複製代碼

至此,關於服務消費方發送請求的過程就分析完了,接下來咱們來看一下服務提供方是如何接收請求的。

2.3 服務提供方接收請求

前面說過,默認狀況下 Dubbo 使用 Netty 做爲底層的通訊框架。Netty 檢測到有數據入站後,首先會經過解碼器對數據進行解碼,並將解碼後的數據傳遞給下一個入站處理器的指定方法。因此在進行後續的分析以前,咱們先來看一下數據解碼過程。

2.3.1 請求解碼

這裏直接分析請求數據的解碼邏輯,忽略中間過程,以下:

public class ExchangeCodec extends TelnetCodec {

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();
    // 建立消息頭字節數組
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    // 讀取消息頭數據
    buffer.readBytes(header);
    // 調用重載方法進行後續解碼工做
    return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // 檢查魔數是否相等
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        // 經過 telnet 命令行發送的數據包不包含消息頭,因此這裏
        // 調用 TelnetCodec 的 decode 方法對數據包進行解碼
        return super.decode(channel, buffer, readable, header);
    }
    
    // 檢測可讀數據量是否少於消息頭長度,若小於則當即返回 DecodeResult.NEED_MORE_INPUT
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // 從消息頭中獲取消息體長度
    int len = Bytes.bytes2int(header, 12);
    // 檢測消息體長度是否超出限制,超出則拋出異常
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    // 檢測可讀的字節數是否小於實際的字節數
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }
    
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        // 繼續進行解碼工做
        return decodeBody(channel, is, header);
    } finally {
        if (is.available() > 0) {
            try {
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}
複製代碼

} 上面方法經過檢測消息頭中的魔數是否與規定的魔數相等,提早攔截掉很是規數據包,好比經過 telnet 命令行發出的數據包。接着再對消息體長度,以及可讀字節數進行檢測。最後調用 decodeBody 方法進行後續的解碼工做,ExchangeCodec 中實現了 decodeBody 方法,但因其子類 DubboCodec 覆寫了該方法,因此在運行時 DubboCodec 中的 decodeBody 方法會被調用。下面咱們來看一下該方法的代碼。

public class DubboCodec extends ExchangeCodec implements Codec2 {

    @Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        // 獲取消息頭中的第三個字節,並經過邏輯與運算獲得序列化器編號
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲取調用編號
        long id = Bytes.bytes2long(header, 4);
        // 經過邏輯與運算獲得調用類型,0 - Response,1 - Request
        if ((flag & FLAG_REQUEST) == 0) {
            // 對響應結果進行解碼,獲得 Response 對象。這個非本節內容,後面再分析
            // ...
        } else {
            // 建立 Request 對象
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            // 經過邏輯與運算獲得通訊方式,並設置到 Request 對象中
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            
            // 經過位運算檢測數據包是否爲事件類型
            if ((flag & FLAG_EVENT) != 0) {
                // 設置心跳事件到 Request 對象中
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                if (req.isHeartbeat()) {
                    // 對心跳包進行解碼,該方法已被標註爲廢棄
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    // 對事件數據進行解碼
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {
                    DecodeableRpcInvocation inv;
                    // 根據 url 參數判斷是否在 IO 線程上對消息體進行解碼
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        // 在當前線程,也就是 IO 線程上進行後續的解碼工做。此工做完成後,可將
                        // 調用方法名、attachment、以及調用參數解析出來
                        inv.decode();
                    } else {
                        // 僅建立 DecodeableRpcInvocation 對象,但不在當前線程上執行解碼邏輯
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                
                // 設置 data 到 Request 對象中
                req.setData(data);
            } catch (Throwable t) {
                // 若解碼過程當中出現異常,則將 broken 字段設爲 true,
                // 並將異常對象設置到 Reqeust 對象中
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }
}
複製代碼

如上,decodeBody 對部分字段進行了解碼,並將解碼獲得的字段封裝到 Request 中。隨後會調用 DecodeableRpcInvocation 的 decode 方法進行後續的解碼工做。此工做完成後,可將調用方法名、attachment、以及調用參數解析出來。下面咱們來看一下 DecodeableRpcInvocation 的 decode 方法邏輯。

public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
    
	@Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 經過反序列化獲得 dubbo version,並保存到 attachments 變量中
        String dubboVersion = in.readUTF();
        request.setVersion(dubboVersion);
        setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

        // 經過反序列化獲得 path,version,並保存到 attachments 變量中
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());

        // 經過反序列化獲得調用方法名
        setMethodName(in.readUTF());
        try {
            Object[] args;
            Class<?>[] pts;
            // 經過反序列化獲得參數類型字符串,好比 Ljava/lang/String;
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                // 將 desc 解析爲參數類型數組
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        // 解析運行時參數
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            
            // 設置參數類型數組
            setParameterTypes(pts);

            // 經過反序列化獲得原 attachment 的內容
            Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap<String, String>();
                }
                // 將 map 與當前對象中的 attachment 集合進行融合
                attachment.putAll(map);
                setAttachments(attachment);
            }
            
            // 對 callback 類型的參數進行處理
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            // 設置參數列表
            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        } finally {
            if (in instanceof Cleanable) {
                ((Cleanable) in).cleanup();
            }
        }
        return this;
    }
}
複製代碼

上面的方法經過反序列化將諸如 path、version、調用方法名、參數列表等信息依次解析出來,並設置到相應的字段中,最終獲得一個具備完整調用信息的 DecodeableRpcInvocation 對象。

到這裏,請求數據解碼的過程就分析完了。此時咱們獲得了一個 Request 對象,這個對象會被傳送到下一個入站處理器中,咱們繼續往下看。

2.3.2 調用服務

解碼器將數據包解析成 Request 對象後,NettyHandler 的 messageReceived 方法緊接着會收到這個對象,並將這個對象繼續向下傳遞。這期間該對象會被依次傳遞給 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最後由 AllChannelHandler 將該對象封裝到 Runnable 實現類對象中,並將 Runnable 放入線程池中執行後續的調用邏輯。整個調用棧以下:

NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent) —> AbstractPeer#received(Channel, Object) —> MultiMessageHandler#received(Channel, Object) —> HeartbeatHandler#received(Channel, Object) —> AllChannelHandler#received(Channel, Object) —> ExecutorService#execute(Runnable) // 由線程池執行後續的調用邏輯 考慮到篇幅,以及不少中間調用的邏輯並不是十分重要,因此這裏就不對調用棧中的每一個方法都進行分析了。這裏咱們直接分析調用棧中的分析第一個和最後一個調用方法邏輯。以下:

@Sharable
public class NettyHandler extends SimpleChannelHandler {
    
    private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>();

    private final URL url;

    private final ChannelHandler handler;
    
    public NettyHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        
        // 這裏的 handler 類型爲 NettyServer
        this.handler = handler;
    }
    
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        // 獲取 NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            // 繼續向下調用
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
}
複製代碼

如上,NettyHandler 中的 messageReceived 邏輯比較簡單。首先根據一些信息獲取 NettyChannel 實例,而後將 NettyChannel 實例以及 Request 對象向下傳遞。下面再來看看 AllChannelHandler 的邏輯,在詳細分析代碼以前,咱們先來了解一下 Dubbo 中的線程派發模型。

2.3.2.1 線程派發模型

Dubbo 將底層通訊框架中接收請求的線程稱爲 IO 線程。若是一些事件處理邏輯能夠很快執行完,好比只在內存打一個標記,此時直接在 IO 線程上執行該段邏輯便可。但若是事件的處理邏輯比較耗時,好比該段邏輯會發起數據庫查詢或者 HTTP 請求。此時咱們就不該該讓事件處理邏輯在 IO 線程上執行,而是應該派發到線程池中去執行。緣由也很簡單,IO 線程主要用於接收請求,若是 IO 線程被佔滿,將致使它不能接收新的請求。

以上就是線程派發的背景,下面咱們再來經過 Dubbo 調用圖,看一下線程派發器所處的位置。

如上圖,紅框中的 Dispatcher 就是線程派發器。須要說明的是,Dispatcher 真實的職責建立具備線程派發能力的 ChannelHandler,好比 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其自己並不具有線程派發能力。Dubbo 支持 5 種不一樣的線程派發策略,下面經過一個表格列舉一下。

策略 用途 all 全部消息都派發到線程池,包括請求,響應,鏈接事件,斷開事件等 direct 全部消息都不派發到線程池,所有在 IO 線程上直接執行 message 只有請求和響應消息派發到線程池,其它消息均在 IO 線程上執行 execution 只有請求消息派發到線程池,不含響應。其它消息均在 IO 線程上執行 connection 在 IO 線程上,將鏈接斷開事件放入隊列,有序逐個執行,其它消息派發到線程池 默認配置下,Dubbo 使用 all 派發策略,即將全部的消息都派發到線程池中。下面咱們來分析一下 AllChannelHandler 的代碼。

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /** 處理鏈接事件 */
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 獲取線程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將鏈接事件派發到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., " error when process connected event .", t);
        }
    }

    /** 處理斷開事件 */
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process disconnected event .", t);
        }
    }

    /** 處理請求和響應消息,這裏的 message 變量類型多是 Request,也多是 Response */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將請求和響應消息派發到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                // 若是通訊方式爲雙向通訊,此時將 Server side ... threadpool is exhausted 
                // 錯誤信息封裝到 Response 中,並返回給服務消費方。
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() 
                        + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    // 返回包含錯誤信息的 Response 對象
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(..., " error when process received event .", t);
        }
    }

    /** 處理異常信息 */
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process caught event ...");
        }
    }
}
複製代碼

如上,請求對象會被封裝 ChannelEventRunnable 中,ChannelEventRunnable 將會是服務調用過程的新起點。因此接下來咱們以 ChannelEventRunnable 爲起點向下探索。

2.3.2.2 調用服務

本小節,咱們從 ChannelEventRunnable 開始分析,該類的主要代碼以下:

public class ChannelEventRunnable implements Runnable {
    
    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;
    
    @Override
    public void run() {
        // 檢測通道狀態,對於請求或響應消息,此時 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 將 channel 和 message 傳給 ChannelHandler 對象,進行後續的調用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("... operation error, channel is ... message is ...");
            }
        } 
        
        // 其餘消息類型經過 switch 進行處理
        else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("... operation error, channel is ...");
                }
                break;
            case DISCONNECTED:
                // ...
            case SENT:
                // ...
            case CAUGHT:
                // ...
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }
}
複製代碼

如上,請求和響應消息出現頻率明顯比其餘類型消息高,因此這裏對該類型的消息進行了針對性判斷。ChannelEventRunnable 僅是一箇中轉站,它的 run 方法中並不包含具體的調用邏輯,僅用於將參數傳給其餘 ChannelHandler 對象進行處理,該對象類型爲 DecodeHandler。

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            // 對 Decodeable 接口實現類對象進行解碼
            decode(message);
        }

        if (message instanceof Request) {
            // 對 Request 的 data 字段進行解碼
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            // 對 Request 的 result 字段進行解碼
            decode(((Response) message).getResult());
        }

        // 執行後續邏輯
        handler.received(channel, message);
    }

    private void decode(Object message) {
        // Decodeable 接口目前有兩個實現類,
        // 分別爲 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message != null && message instanceof Decodeable) {
            try {
                // 執行解碼邏輯
                ((Decodeable) message).decode();
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            }
        }
    }
}
複製代碼

DecodeHandler 主要是包含了一些解碼邏輯。2.2.1 節分析請求解碼時說過,請求解碼可在 IO 線程上執行,也可在線程池中執行,這個取決於運行時配置。DecodeHandler 存在的意義就是保證請求或響應對象可在線程池中被解碼。解碼完畢後,徹底解碼後的 Request 對象會繼續向後傳遞,下一站是 HeaderExchangeHandler。

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    private final ExchangeHandler handler;

    public HeaderExchangeHandler(ExchangeHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.handler = handler;
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請求對象
            if (message instanceof Request) {
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 處理事件
                    handlerEvent(channel, request);
                } 
                // 處理普通的請求
                else {
                    // 雙向通訊
                    if (request.isTwoWay()) {
                        // 向後調用服務,並獲得調用結果
                        Response response = handleRequest(exchangeChannel, request);
                        // 將調用結果返回給服務消費端
                        channel.send(response);
                    } 
                    // 若是是單向通訊,僅向後調用指定服務便可,無需返回調用結果
                    else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            }      
            // 處理響應對象,服務消費方會執行此處邏輯,後面分析
            else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相關,忽略
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 檢測請求是否合法,不合法則返回狀態碼爲 BAD_REQUEST 的響應
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null)
                msg = null;
            else if
                (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
            else
                msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            // 設置 BAD_REQUEST 狀態
            res.setStatus(Response.BAD_REQUEST);

            return res;
        }
        
        // 獲取 data 字段值,也就是 RpcInvocation 對象
        Object msg = req.getData();
        try {
            // 繼續向下調用
            Object result = handler.reply(channel, msg);
            // 設置 OK 狀態碼
            res.setStatus(Response.OK);
            // 設置調用結果
            res.setResult(result);
        } catch (Throwable e) {
            // 若調用過程出現異常,則設置 SERVICE_ERROR,表示服務端異常
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }
}
複製代碼

到這裏,咱們看到了比較清晰的請求和響應邏輯。對於雙向通訊,HeaderExchangeHandler 首先向後進行調用,獲得調用結果。而後將調用結果封裝到 Response 對象中,最後再將該對象返回給服務消費方。若是請求不合法,或者調用失敗,則將錯誤信息封裝到 Response 對象中,並返回給服務消費方。接下來咱們繼續向後分析,把剩餘的調用過程分析完。下面分析定義在 DubboProtocol 類中的匿名類對象邏輯,以下:

public class DubboProtocol extends AbstractProtocol {

    public static final String NAME = "dubbo";
    
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 獲取 Invoker 實例
                Invoker<?> invoker = getInvoker(channel, inv);
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    // 回調相關,忽略
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 經過 Invoker 調用具體的服務
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: ...");
        }
        
        // 忽略其餘方法
    }
    
    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        // 忽略回調和本地存根相關邏輯
        // ...
        
        int port = channel.getLocalAddress().getPort();
        
        // 計算 service key,格式爲 groupName/serviceName:serviceVersion:port。好比:
        // dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

        // 從 exporterMap 查找與 serviceKey 相對應的 DubboExporter 對象,
        // 服務導出過程當中會將 <serviceKey, DubboExporter> 映射關係存儲到 exporterMap 集合中
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service ...");

        // 獲取 Invoker 對象,並返回
        return exporter.getInvoker();
    }
    
    // 忽略其餘方法
}
複製代碼

以上邏輯用於獲取與指定服務對應的 Invoker 實例,並經過 Invoker 的 invoke 方法調用服務邏輯。invoke 方法定義在 AbstractProxyInvoker 中,代碼以下。

public abstract class AbstractProxyInvoker implements Invoker {

@Override
public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 調用 doInvoke 執行後續的調用,並將調用結果封裝到 RpcResult 中,並
        return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
    } catch (InvocationTargetException e) {
        return new RpcResult(e.getTargetException());
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method ...");
    }
}

protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
複製代碼

} 如上,doInvoke 是一個抽象方法,這個須要由具體的 Invoker 實例實現。Invoker 實例是在運行時經過 JavassistProxyFactory 建立的,建立邏輯以下:

public class JavassistProxyFactory extends AbstractProxyFactory {
    
    // 省略其餘方法

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 建立匿名類對象
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
                // 調用 invokeMethod 方法進行後續的調用
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}
複製代碼

Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。Dubbo 會在運行時經過 Javassist 框架爲 Wrapper 生成實現類,並實現 invokeMethod 方法,該方法最終會根據調用信息調用具體的服務。以 DemoServiceImpl 爲例,Javassist 爲其生成的代理類以下。

/** Wrapper0 是在運行時生成的,你們可以使用 Arthas 進行反編譯 */ public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0;

// 省略其餘方法

public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
    DemoService demoService;
    try {
        // 類型轉換
        demoService = (DemoService)object;
    }
    catch (Throwable throwable) {
        throw new IllegalArgumentException(throwable);
    }
    try {
        // 根據方法名調用指定的方法
        if ("sayHello".equals(string) && arrclass.length == 1) {
            return demoService.sayHello((String)arrobject[0]);
        }
    }
    catch (Throwable throwable) {
        throw new InvocationTargetException(throwable);
    }
    throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
}
複製代碼

} 到這裏,整個服務調用過程就分析完了。最後把調用過程貼出來,以下:

ChannelEventRunnable#run() —> DecodeHandler#received(Channel, Object) —> HeaderExchangeHandler#received(Channel, Object) —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request) —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object) —> Filter#invoke(Invoker, Invocation) —> AbstractProxyInvoker#invoke(Invocation) —> Wrapper0#invokeMethod(Object, String, Class[], Object[]) —> DemoServiceImpl#sayHello(String) 2.4 服務提供方返回調用結果 服務提供方調用指定服務後,會將調用結果封裝到 Response 對象中,並將該對象返回給服務消費方。服務提供方也是經過 NettyChannel 的 send 方法將 Response 對象返回,這個方法在 2.2.1 節分析過,這裏就不在重複分析了。本節咱們僅需關注 Response 對象的編碼過程便可,這裏仍然省略一些中間調用,直接分析具體的編碼邏輯。

public class ExchangeCodec extends TelnetCodec {
	public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 對響應對象進行編碼
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }
    
    protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
        int savedWriteIndex = buffer.writerIndex();
        try {
            Serialization serialization = getSerialization(channel);
            // 建立消息頭字節數組
            byte[] header = new byte[HEADER_LENGTH];
            // 設置魔數
            Bytes.short2bytes(MAGIC, header);
            // 設置序列化器編號
            header[2] = serialization.getContentTypeId();
            if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
            // 獲取響應狀態
            byte status = res.getStatus();
            // 設置響應狀態
            header[3] = status;
            // 設置請求編號
            Bytes.long2bytes(res.getId(), header, 4);

            // 更新 writerIndex,爲消息頭預留 16 個字節的空間
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
           
            if (status == Response.OK) {
                if (res.isHeartbeat()) {
                    // 對心跳響應結果進行序列化,已廢棄
                    encodeHeartbeatData(channel, out, res.getResult());
                } else {
                    // 對調用結果進行序列化
                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
                }
            } else { 
                // 對錯誤信息進行序列化
                out.writeUTF(res.getErrorMessage())
            };
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();

            // 獲取寫入的字節數,也就是消息體長度
            int len = bos.writtenBytes();
            checkPayload(channel, len);
            
            // 將消息體長度寫入到消息頭中
            Bytes.int2bytes(len, header, 12);
            // 將 buffer 指針移動到 savedWriteIndex,爲寫消息頭作準備
            buffer.writerIndex(savedWriteIndex);
            // 從 savedWriteIndex 下標處寫入消息頭
            buffer.writeBytes(header); 
            // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // 異常處理邏輯不是很難理解,可是代碼略多,這裏忽略了
        }
    }
}
複製代碼
public class DubboCodec extends ExchangeCodec implements Codec2 {
    
	protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        Result result = (Result) data;
        // 檢測當前協議版本是否支持帶有 attachment 集合的 Response 對象
        boolean attach = Version.isSupportResponseAttachment(version);
        Throwable th = result.getException();
        
        // 異常信息爲空
        if (th == null) {
            Object ret = result.getValue();
            // 調用結果爲空
            if (ret == null) {
                // 序列化響應類型
                out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
            } 
            // 調用結果非空
            else {
                // 序列化響應類型
                out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
                // 序列化調用結果
                out.writeObject(ret);
            }
        } 
        // 異常信息非空
        else {
            // 序列化響應類型
            out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
            // 序列化異常對象
            out.writeObject(th);
        }

        if (attach) {
            // 記錄 Dubbo 協議版本
            result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
            // 序列化 attachments 集合
            out.writeObject(result.getAttachments());
        }
    }
}
複製代碼

以上就是 Response 對象編碼的過程,和前面分析的 Request 對象編碼過程很類似。若是你們能看 Request 對象的編碼邏輯,那麼這裏的 Response 對象的編碼邏輯也不難理解,就很少說了。接下來咱們再來分析雙向通訊的最後一環 —— 服務消費方接收調用結果。

2.5 服務消費方接收調用結果

服務消費方在收到響應數據後,首先要作的事情是對響應數據進行解碼,獲得 Response 對象。而後再將該對象傳遞給下一個入站處理器,這個入站處理器就是 NettyHandler。接下來 NettyHandler 會將這個對象繼續向下傳遞,最後 AllChannelHandler 的 received 方法會收到這個對象,並將這個對象派發到線程池中。這個過程和服務提供方接收請求的過程是同樣的,所以這裏就不重複分析了。本節咱們重點分析兩個方面的內容,一是響應數據的解碼過程,二是 Dubbo 如何將調用結果傳遞給用戶線程的。下面先來分析響應數據的解碼過程。

2.5.1 響應數據解碼

響應數據解碼邏輯主要的邏輯封裝在 DubboCodec 中,咱們直接分析這個類的代碼。以下:

public class DubboCodec extends ExchangeCodec implements Codec2 {

    @Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲取請求編號
        long id = Bytes.bytes2long(header, 4);
        // 檢測消息類型,若下面的條件成立,代表消息類型爲 Response
        if ((flag & FLAG_REQUEST) == 0) {
            // 建立 Response 對象
            Response res = new Response(id);
            // 檢測事件標誌位
            if ((flag & FLAG_EVENT) != 0) {
                // 設置心跳事件
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // 獲取響應狀態
            byte status = header[3];
            // 設置響應狀態
            res.setStatus(status);
            
            // 若是響應狀態爲 OK,代表調用過程正常
            if (status == Response.OK) {
                try {
                    Object data;
                    if (res.isHeartbeat()) {
                        // 反序列化心跳數據,已廢棄
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    } else if (res.isEvent()) {
                        // 反序列化事件數據
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    } else {
                        DecodeableRpcResult result;
                        // 根據 url 參數決定是否在 IO 線程上執行解碼邏輯
                        if (channel.getUrl().getParameter(
                                Constants.DECODE_IN_IO_THREAD_KEY,
                                Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            // 建立 DecodeableRpcResult 對象
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            // 進行後續的解碼工做
                            result.decode();
                        } else {
                            // 建立 DecodeableRpcResult 對象
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    
                    // 設置 DecodeableRpcResult 對象到 Response 對象中
                    res.setResult(data);
                } catch (Throwable t) {
                    // 解碼過程當中出現了錯誤,此時設置 CLIENT_ERROR 狀態碼到 Response 對象中
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            } 
            // 響應狀態非 OK,代表調用過程出現了異常
            else {
                // 反序列化異常信息,並設置到 Response 對象中
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        } else {
            // 對請求數據進行解碼,前面已分析過,此處忽略
        }
    }
}
複製代碼

以上就是響應數據的解碼過程,上面邏輯看起來是否是似曾相識。對的,咱們在前面章節分析過 DubboCodec 的 decodeBody 方法中關於請求數據的解碼過程,該過程和響應數據的解碼過程很類似。下面,咱們繼續分析調用結果的反序列化過程,以下:

public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable {
    
    private Invocation invocation;
	
    @Override
    public void decode() throws Exception {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {
                // 執行反序列化操做
                decode(channel, inputStream);
            } catch (Throwable e) {
                // 反序列化失敗,設置 CLIENT_ERROR 狀態到 Response 對象中
                response.setStatus(Response.CLIENT_ERROR);
                // 設置異常信息
                response.setErrorMessage(StringUtils.toString(e));
            } finally {
                hasDecoded = true;
            }
        }
    }
    
    @Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);
        
        // 反序列化響應類型
        byte flag = in.readByte();
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE:
                break;
            case DubboCodec.RESPONSE_VALUE:
                // ...
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION:
                // ...
                break;
                
            // 返回值爲空,且攜帶了 attachments 集合
            case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
                try {
                    // 反序列化 attachments 集合,並存儲起來 
                    setAttachments((Map<String, String>) in.readObject(Map.class));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
                
            // 返回值不爲空,且攜帶了 attachments 集合
            case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
                try {
                    // 獲取返回值類型
                    Type[] returnType = RpcUtils.getReturnTypes(invocation);
                    // 反序列化調用結果,並保存起來
                    setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                            (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                    : in.readObject((Class<?>) returnType[0], returnType[1])));
                    // 反序列化 attachments 集合,並存儲起來
                    setAttachments((Map<String, String>) in.readObject(Map.class));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
                
            // 異常對象不爲空,且攜帶了 attachments 集合
            case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
                try {
                    // 反序列化異常對象
                    Object obj = in.readObject();
                    if (obj instanceof Throwable == false)
                        throw new IOException("Response data error, expect Throwable, but get " + obj);
                    // 設置異常對象
                    setException((Throwable) obj);
                    // 反序列化 attachments 集合,並存儲起來
                    setAttachments((Map<String, String>) in.readObject(Map.class));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
        return this;
    }
}
複製代碼

本篇文章所分析的源碼版本爲 2.6.4,該版本下的 Response 支持 attachments 集合,因此上面僅對部分 case 分支進行了註釋。其餘 case 分支的邏輯比被註釋分支的邏輯更爲簡單,這裏就忽略了。咱們所使用的測試服務接口 DemoService 包含了一個具備返回值的方法,正常調用下,線程會進入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。而後線程會從 invocation 變量(你們探索一下 invocation 變量的由來)中獲取返回值類型,接着對調用結果進行反序列化,並將序列化後的結果存儲起來。最後對 attachments 集合進行反序列化,並存到指定字段中。到此,關於響應數據的解碼過程就分析完了。接下來,咱們再來探索一下響應對象 Response 的去向。

2.5.2 向用戶線程傳遞調用結果

響應數據解碼完成後,Dubbo 會將響應對象派發到線程池上。要注意的是,線程池中的線程並不是用戶的調用線程,因此要想辦法將響應對象從線程池線程傳遞到用戶線程上。咱們在 2.1 節分析過用戶線程在發送完請求後的動做,即調用 DefaultFuture 的 get 方法等待響應對象的到來。當響應對象到來後,用戶線程會被喚醒,並經過調用編號獲取屬於本身的響應對象。下面咱們來看一下整個過程對應的代碼。

public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // 處理請求,前面已分析過,省略
            } else if (message instanceof Response) {
                // 處理響應
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相關,忽略
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            // 繼續向下調用
            DefaultFuture.received(channel, response);
        }
    }
}

public class DefaultFuture implements ResponseFuture {  
    
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private volatile Response response;
    
	public static void received(Channel channel, Response response) {
        try {
            // 根據調用編號從 FUTURES 集合中查找指定的 DefaultFuture 對象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                // 繼續向下調用
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at ...");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

	private void doReceived(Response res) {
        lock.lock();
        try {
            // 保存響應對象
            response = res;
            if (done != null) {
                // 喚醒用戶線程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}
複製代碼

以上邏輯是將響應對象保存到相應的 DefaultFuture 實例中,而後再喚醒用戶線程,隨後用戶線程便可從 DefaultFuture 實例中獲取到相應結果。

本篇文章在多個地方都強調過調用編號很重要,但一直沒有解釋緣由,這裏簡單說明一下。通常狀況下,服務消費方會併發調用多個服務,每一個用戶線程發送請求後,會調用不一樣 DefaultFuture 對象的 get 方法進行等待。 一段時間後,服務消費方的線程池會收到多個響應對象。這個時候要考慮一個問題,如何將每一個響應對象傳遞給相應的 DefaultFuture 對象,且不出錯。答案是經過調用編號。DefaultFuture 被建立時,會要求傳入一個 Request 對象。此時 DefaultFuture 可從 Request 對象中獲取調用編號,並將 <調用編號, DefaultFuture 對象> 映射關係存入到靜態 Map 中,即 FUTURES。線程池中的線程在收到 Response 對象後,會根據 Response 對象中的調用編號到 FUTURES 集合中取出相應的 DefaultFuture 對象,而後再將 Response 對象設置到 DefaultFuture 對象中。最後再喚醒用戶線程,這樣用戶線程便可從 DefaultFuture 對象中獲取調用結果了。整個過程大體以下圖:

3. 總結

本篇文章主要對 Dubbo 中的幾種服務調用方式,以及從雙向通訊的角度對整個通訊過程進行了詳細的分析。按照通訊順序,通訊過程包括服務消費方發送請求,服務提供方接收請求,服務提供方返回響應數據,服務消費方接收響應數據等過程。理解這些過程須要你們對網絡編程,尤爲是 Netty 有必定的瞭解。限於篇幅緣由,本篇文章沒法將服務調用的全部內容都一一進行分析。對於本篇文章未講到或未詳細分析的內容,好比服務降級、過濾器鏈、以及序列化等。

歡迎你們加入Java高級架構羣 378461078

相關文章
相關標籤/搜索