dubbo源碼解析(四十七)服務端處理請求過程

2.7大揭祕——服務端處理請求過程

目標:從源碼的角度分析服務端接收到請求後的一系列操做,最終把客戶端須要的值返回。

前言

上一篇講到了消費端發送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數據包後的一系列處理以及如何返回最終結果。咱們也知道消費端在發送請求的時候已經作了編碼,因此咱們也須要在服務端接收到數據包後,對協議頭和協議體進行解碼。不過本篇不講解如何解碼。有興趣的能夠翻翻我之前的文章,有講到關於解碼的邏輯。接下來就開始講解服務端收到請求後的邏輯。java

處理過程

假設遠程通訊的實現仍是用netty4,解碼器將數據包解析成 Request 對象後,NettyHandler 的 messageReceived 方法緊接着會收到這個對象,因此第一步就是NettyServerHandler的channelRead。json

(一)NettyServerHandler的channelRead

能夠參考《dubbo源碼解析(十七)遠程通訊——Netty4》的(三)NettyServerHandlersegmentfault

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 看是否在緩存中命中,若是沒有命中,則建立NettyChannel而且緩存。
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        // 接受消息
        handler.received(channel, msg);
    } finally {
        // 若是通道不活躍或者斷掉,則從緩存中清除
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

NettyServerHandler是基於netty4實現的服務端通道處理實現類,而該方法就是用來接收請求,接下來就是執行AbstractPeer的received。api

(二)AbstractPeer的received

能夠參考《dubbo源碼解析(九)遠程通訊——Transport層》的(一)AbstractPeer緩存

public void received(Channel ch, Object msg) throws RemotingException {
    // 若是通道已經關閉,則直接返回
    if (closed) {
        return;
    }
    handler.received(ch, msg);
}

該方法比較簡單,以前也講過AbstractPeer類就作了裝飾模式中裝飾角色,只是維護了通道的正在關閉和關閉完成兩個狀態。而後到了MultiMessageHandler的receivedapp

(三)MultiMessageHandler的received

能夠參考《dubbo源碼解析(九)遠程通訊——Transport層》的(八)MultiMessageHandler框架

public void received(Channel channel, Object message) throws RemotingException {
    // 若是消息是MultiMessage類型的,也就是多消息類型
    if (message instanceof MultiMessage) {
        // 強制轉化爲MultiMessage
        MultiMessage list = (MultiMessage) message;
        // 把各個消息進行發送
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        // 直接發送
        handler.received(channel, message);
    }
}

該方法也比較簡單,就是對於多消息的處理。異步

(四)HeartbeatHandler的received

能夠參考《dubbo源碼解析(十)遠程通訊——Exchange層》的(二十)HeartbeatHandler。其中就是對心跳事件作了處理。若是不是心跳請求,那麼接下去走到AllChannelHandler的received。async

(五)AllChannelHandler的received

能夠參考《dubbo源碼解析(九)遠程通訊——Transport層》的(十一)AllChannelHandler。該類處理的是鏈接、斷開鏈接、捕獲異常以及接收到的全部消息都分發到線程池。因此這裏的received方法就是把請求分發到線程池,讓線程池去執行該請求。ide

還記得我在以前文章裏面講到到Dispatcher接口嗎,它是一個線程派發器。分別有五個實現:

Dispatcher實現類 對應的handler 用途
AllDispatcher AllChannelHandler 全部消息都派發到線程池,包括請求,響應,鏈接事件,斷開事件等
ConnectionOrderedDispatcher ConnectionOrderedChannelHandler 在 IO 線程上,將鏈接和斷開事件放入隊列,有序逐個執行,其它消息派發到線程池
DirectDispatcher 全部消息都不派發到線程池,所有在 IO 線程上直接執行
ExecutionDispatcher ExecutionChannelHandler 只有請求消息派發到線程池,不含響應。其它消息均在 IO 線程上執行
MessageOnlyDispatcher MessageOnlyChannelHandler 只有請求和響應消息派發到線程池,其它消息均在 IO 線程上執行

這些Dispatcher的實現類以及對應的Handler均可以在《dubbo源碼解析(九)遠程通訊——Transport層》中查看相關實現。dubbo默認all爲派發策略。因此我在這裏講了AllChannelHandler的received。把消息送到線程池後,能夠看到首先會建立一個ChannelEventRunnable實體。那麼接下來就是線程接收而且執行任務了。

(六)ChannelEventRunnable的run

ChannelEventRunnable實現了Runnable接口,主要是用來接收消息事件,而且根據事件的種類來分別執行不一樣的操做。來看看它的run方法:

public void run() {
    // 若是是接收的消息
    if (state == ChannelState.RECEIVED) {
        try {
            // 直接調用下一個received
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is " + message, e);
        }
    } else {
        switch (state) {
            //若是是鏈接事件請求
        case CONNECTED:
            try {
                // 執行鏈接
                handler.connected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
            // 若是是斷開鏈接事件請求
        case DISCONNECTED:
            try {
                // 執行斷開鏈接
                handler.disconnected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
            // 若是是發送消息
        case SENT:
            try {
                // 執行發送消息
                handler.sent(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
            break;
            // 若是是異常
        case CAUGHT:
            try {
                // 執行異常捕獲
                handler.caught(channel, exception);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is: " + message + ", exception is " + exception, e);
            }
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}

能夠看到把消息分爲了幾種類別,由於請求和響應消息出現頻率明顯比其餘類型消息高,也就是RECEIVED,因此單獨先作處理,根據不一樣的類型的消息,會被執行不一樣的邏輯,咱們這裏主要看state爲RECEIVED的,那麼若是是RECEIVED,則會執行下一個received方法。

(七)DecodeHandler的received

能夠參考《dubbo源碼解析(九)遠程通訊——Transport層》的(七)DecodeHandler。能夠看到received方法中根據消息的類型進行不一樣的解碼。而DecodeHandler 存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成後,就會分發到HeaderExchangeHandler的received。

(八)HeaderExchangeHandler的received

public void received(Channel channel, Object message) throws RemotingException {
    // 設置接收到消息的時間戳
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    // 得到通道
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        // 若是消息是Request類型
        if (message instanceof Request) {
            // handle request.
            // 強制轉化爲Request
            Request request = (Request) message;
            // 若是該請求是事件心跳事件或者只讀事件
            if (request.isEvent()) {
                // 執行事件
                handlerEvent(channel, request);
            } else {
                // 若是是正常的調用請求,且須要響應
                if (request.isTwoWay()) {
                    // 處理請求
                    handleRequest(exchangeChannel, request);
                } else {
                    // 若是不須要響應,則繼續下一步
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            // 處理響應
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            // 若是是telnet相關的請求
            if (isClientSide(channel)) {
                // 若是是客戶端側,則直接拋出異常,由於客戶端側不支持telnet
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                // 若是是服務端側,則執行telnet命令
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            // 若是都不是,則繼續下一步
            handler.received(exchangeChannel, message);
        }
    } finally {
        // 移除關閉或者不活躍的通道
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

該方法中就對消息進行了細分,事件請求、正常的調用請求、響應、telnet命令請求等,而且針對不一樣的消息類型作了不一樣的邏輯調用。咱們這裏主要看正常的調用請求。見下一步。

(九)HeaderExchangeHandler的handleRequest

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    // 建立一個Response實例
    Response res = new Response(req.getId(), req.getVersion());
    // 若是請求被破壞了
    if (req.isBroken()) {
        // 得到請求的數據包
        Object data = req.getData();

        String msg;
        // 若是數據爲空
        if (data == null) {
            //消息設置爲空
            msg = null;
            // 若是在這以前已經出現異常,也就是數據爲Throwable類型
        } else if (data instanceof Throwable) {
            // 響應消息把異常信息返回
            msg = StringUtils.toString((Throwable) data);
        } else {
            // 返回請求數據
            msg = data.toString();
        }
        res.setErrorMessage("Fail to decode request due to: " + msg);
        // 設置錯誤請求的狀態碼
        res.setStatus(Response.BAD_REQUEST);

        // 發送該消息
        channel.send(res);
        return;
    }
    // find handler by message class.
    // 得到請求數據 也就是 RpcInvocation 對象
    Object msg = req.getData();
    try {
        // 繼續向下調用 返回一個future
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            try {
                if (t == null) {
                    //設置調用結果狀態爲成功
                    res.setStatus(Response.OK);
                    // 把結果放入響應
                    res.setResult(appResult);
                } else {
                    // 若是服務調用有異常,則設置結果狀態碼爲服務錯誤
                    res.setStatus(Response.SERVICE_ERROR);
                    // 把報錯信息放到響應中
                    res.setErrorMessage(StringUtils.toString(t));
                }
                // 發送該響應
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            } finally {
                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        });
    } catch (Throwable e) {
        // 若是在執行中拋出異常,則也算服務異常
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

該方法是處理正常的調用請求,主要作了正常、異常調用的狀況處理,而且加入了狀態碼,而後發送執行後的結果給客戶端。接下來看下一步。

(十)DubboProtocol的requestHandler實例的reply

這裏我默認是使用dubbo協議,因此執行的是DubboProtocol的requestHandler的reply方法。能夠參考《dubbo源碼解析(二十四)遠程調用——dubbo協議》的(三)DubboProtocol

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    // 若是請求消息不屬於會話域,則拋出異常
    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    //強制類型轉化
    Invocation inv = (Invocation) message;
    // 得到暴露的服務invoker
    Invoker<?> invoker = getInvoker(channel, inv);
    // need to consider backward-compatibility if it's a callback
    // 若是是回調服務
    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        // 得到 方法定義
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        // 若是隻有一個方法定義
        if (methodsStr == null || !methodsStr.contains(",")) {
            // 設置會話域中是否有一致的方法定義標誌
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            // 分割不一樣的方法
            String[] methods = methodsStr.split(",");
            // 若是方法不止一個,則分割後遍歷查詢,找到了則設置爲true
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        // 若是沒有該方法,則打印告警日誌
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    // 設置遠程地址
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    // 調用下一個調用鏈
    Result result = invoker.invoke(inv);
    //  返回CompletableFuture<Result>
    return result.completionFuture().thenApply(Function.identity());
}

上述代碼有些變化,是最新的代碼,加入了CompletableFuture,這個我會在後續的異步化改造中講到。這裏主要關注的是又開始跟客戶端發請求同樣執行invoke調用鏈了。

(十一)ProtocolFilterWrapper的CallbackRegistrationInvoker的invoke

能夠直接參考《dubbo源碼解析(四十六)消費端發送請求過程》的(六)ProtocolFilterWrapper的內部類CallbackRegistrationInvoker的invoke。

(十二)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實例的invoke方法。

能夠直接參考《dubbo源碼解析(四十六)消費端發送請求過程》的(七)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實例的invoke方法。

(十三)EchoFilter的invoke

能夠參考《dubbo源碼解析(二十)遠程調用——Filter》的(八)EchoFilter

public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
    // 若是調用的方法是回聲測試的方法 則直接返回結果,不然 調用下一個調用鏈
    if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
        // 建立一個默認的AsyncRpcResult返回
        return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
    }
    return invoker.invoke(inv);
}

該過濾器就是對回聲測試的調用進行攔截,上述源碼跟鏈接中源碼惟一區別就是改成了AsyncRpcResult。

(十四)ClassLoaderFilter的invoke

能夠參考《dubbo源碼解析(二十)遠程調用——Filter》的(三)ClassLoaderFilter,用來作類加載器的切換。

(十五)GenericFilter的invoke

能夠參考《dubbo源碼解析(二十)遠程調用——Filter》的(十一)GenericFilter。

public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
    // 若是是泛化調用
    if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
            && inv.getArguments() != null
            && inv.getArguments().length == 3
            && !GenericService.class.isAssignableFrom(invoker.getInterface())) {
        // 得到請求名字
        String name = ((String) inv.getArguments()[0]).trim();
        // 得到請求參數類型
        String[] types = (String[]) inv.getArguments()[1];
        // 得到請求參數
        Object[] args = (Object[]) inv.getArguments()[2];
        try {
            // 得到方法
            Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
            // 得到該方法的參數類型
            Class<?>[] params = method.getParameterTypes();
            if (args == null) {
                args = new Object[params.length];
            }
            // 得到附加值
            String generic = inv.getAttachment(GENERIC_KEY);

            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            // 若是附加值爲空,在用上下文攜帶的附加值
            if (StringUtils.isEmpty(generic)
                    || ProtocolUtils.isDefaultGenericSerialization(generic)) {
                // 直接進行類型轉化
                args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
            } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (byte[].class == args[i].getClass()) {
                        try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
                            // 使用nativejava方式反序列化
                            args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                    .getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
                                    .deserialize(null, is).readObject();
                        } catch (Exception e) {
                            throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                        }
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_NATIVE_JAVA +
                                        "] only support message type " +
                                        byte[].class +
                                        " and your message type is " +
                                        args[i].getClass());
                    }
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (args[i] instanceof JavaBeanDescriptor) {
                        // 用JavaBean方式反序列化
                        args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_BEAN +
                                        "] only support message type " +
                                        JavaBeanDescriptor.class.getName() +
                                        " and your message type is " +
                                        args[i].getClass().getName());
                    }
                }
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                // as proto3 only accept one protobuf parameter
                if (args.length == 1 && args[0] instanceof String) {
                    try (UnsafeByteArrayInputStream is =
                                 new UnsafeByteArrayInputStream(((String) args[0]).getBytes())) {
                        // 用protobuf-json進行反序列化
                        args[0] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                .getExtension("" + GENERIC_SERIALIZATION_PROTOBUF)
                                .deserialize(null, is).readObject(method.getParameterTypes()[0]);
                    } catch (Exception e) {
                        throw new RpcException("Deserialize argument failed.", e);
                    }
                } else {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_PROTOBUF +
                                    "] only support one" + String.class.getName() +
                                    " argument and your message size is " +
                                    args.length + " and type is" +
                                    args[0].getClass().getName());
                }
            }
            return invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
        } catch (NoSuchMethodException e) {
            throw new RpcException(e.getMessage(), e);
        } catch (ClassNotFoundException e) {
            throw new RpcException(e.getMessage(), e);
        }
    }
    return invoker.invoke(inv);
}

static class GenericListener implements Listener {

    @Override
    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation inv) {
        // 若是是泛化調用
        if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !GenericService.class.isAssignableFrom(invoker.getInterface())) {

            // 得到序列化方式
            String generic = inv.getAttachment(GENERIC_KEY);
            // 若是爲空,默認獲取會話域中的配置
            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            // 若是回調有異常,直接設置異常
            if (appResponse.hasException() && !(appResponse.getException() instanceof GenericException)) {
                appResponse.setException(new GenericException(appResponse.getException()));
            }
            // 若是是native java形式序列化
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 使用native java形式序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA).serialize(null, os).writeObject(appResponse.getValue());
                    // 加入結果
                    appResponse.setValue(os.toByteArray());
                } catch (IOException e) {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_NATIVE_JAVA +
                                    "] serialize result failed.", e);
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                // 用JavaBean方式序列化
                appResponse.setValue(JavaBeanSerializeUtil.serialize(appResponse.getValue(), JavaBeanAccessor.METHOD));
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 用protobuf-json進行序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class)
                            .getExtension(GENERIC_SERIALIZATION_PROTOBUF)
                            .serialize(null, os).writeObject(appResponse.getValue());
                    appResponse.setValue(os.toString());
                } catch (IOException e) {
                    throw new RpcException("Generic serialization [" +
                            GENERIC_SERIALIZATION_PROTOBUF +
                            "] serialize result failed.", e);
                }
            } else {
                // 直接進行類型轉化而且設置值
                appResponse.setValue(PojoUtils.generalize(appResponse.getValue()));
            }
        }
    }

    @Override
    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {

    }
}

跟鏈接內的代碼對比,第一個就是對過濾器設計發生了變化,這個我在異步化改造裏面會講到,第二個是新增了protobuf-json的泛化序列化方式。

(十六)ContextFilter的invoke

能夠參考《dubbo源碼解析(二十)遠程調用——Filter》的(六)ContextFilter,最新代碼幾乎差很少,除了由於對Filter的設計作了修改之外,還有新增了tag路由的相關邏輯,tag相關部分我會在後續文章中講解,該類主要是作了初始化rpc上下文。

(十七)TraceFilter的invoke

能夠參考《dubbo源碼解析(二十四)遠程調用——dubbo協議》的(十三)TraceFilter,該過濾器是加強的功能是通道的跟蹤,會在通道內把最大的調用次數和如今的調用數量放進去。方便使用telnet來跟蹤服務的調用次數等。

(十八)TimeoutFilter的invoke

能夠參考《dubbo源碼解析(二十)遠程調用——Filter》的(十三)TimeoutFilter,該過濾器是當服務調用超時的時候,記錄告警日誌。

(十九)MonitorFilter的invoke

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    // 若是開啓監控
    if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
        // 設置開始監控時間
        invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        // 對同時在線數量加1
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
    }
    return invoker.invoke(invocation); // proceed invocation chain
}
class MonitorListener implements Listener {

    @Override
    public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
        // 若是開啓監控
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 收集監控對數據,而且更新監控數據
            collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
            // 同時在線監控數減1
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    @Override
    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 收集監控對數據,而且更新監控數據
            collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
            // 同時在線監控數減1
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        try {
            // 得到監控的url
            URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
            // 經過該url得到Monitor實例
            Monitor monitor = monitorFactory.getMonitor(monitorUrl);
            if (monitor == null) {
                return;
            }
            // 建立一個統計的url
            URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
            // 把收集的信息更新而且發送信息
            monitor.collect(statisticsURL);
        } catch (Throwable t) {
            logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
        }
    }

    private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        // ---- service statistics ----
        // 調用服務消耗的時間
        long elapsed = System.currentTimeMillis() - start; // invocation cost
        // 得到同時監控的數量
        int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
        String application = invoker.getUrl().getParameter(APPLICATION_KEY);
        // 得到服務名
        String service = invoker.getInterface().getName(); // service name
        // 得到調用的方法名
        String method = RpcUtils.getMethodName(invocation); // method name
        // 得到組
        String group = invoker.getUrl().getParameter(GROUP_KEY);
        // 得到版本號
        String version = invoker.getUrl().getParameter(VERSION_KEY);

        int localPort;
        String remoteKey, remoteValue;
        // 若是是消費者端的監控
        if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
            // ---- for service consumer ----
            // 本地端口爲0
            localPort = 0;
            // key爲provider
            remoteKey = MonitorService.PROVIDER;
            // value爲服務ip
            remoteValue = invoker.getUrl().getAddress();
        } else {
            // ---- for service provider ----
            // 端口爲服務端口
            localPort = invoker.getUrl().getPort();
            // key爲consumer
            remoteKey = MonitorService.CONSUMER;
            // value爲遠程地址
            remoteValue = remoteHost;
        }
        String input = "", output = "";
        if (invocation.getAttachment(INPUT_KEY) != null) {
            input = invocation.getAttachment(INPUT_KEY);
        }
        if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
            output = result.getAttachment(OUTPUT_KEY);
        }

        // 返回一個url
        return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
    }

}

在invoke裏面只是作了記錄開始監控的時間以及對同時監控的數量加1操做,當結果回調時,會對結果數據作蒐集計算,最後經過監控服務記錄和發送最新信息。

(二十)ExceptionFilter的invoke

能夠參考《dubbo源碼解析(二十)遠程調用——Filter》的(九)ExceptionFilter,該過濾器主要是對異常的處理。

(二十一)InvokerWrapper的invoke

能夠參考《dubbo源碼解析(二十二)遠程調用——Protocol》的(五)InvokerWrapper。該類用了裝飾模式,不過並無實現實際的功能加強。

(二十二)DelegateProviderMetaDataInvoker的invoke

public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

該類也是用了裝飾模式,不過該類是invoker和配置中心的適配類,其中也沒有進行實際的功能加強。

(二十三)AbstractProxyInvoker的invoke

能夠參考《dubbo源碼解析(二十三)遠程調用——Proxy》的(二)AbstractProxyInvoker。不過代碼已經有更新了,下面貼出最新的代碼。

public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 執行下一步
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        // 把返回結果用CompletableFuture包裹
        CompletableFuture<Object> future = wrapWithFuture(value, invocation);
        // 建立AsyncRpcResult實例
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
        future.whenComplete((obj, t) -> {
            AppResponse result = new AppResponse();
            // 若是拋出異常
            if (t != null) {
                // 屬於CompletionException異常
                if (t instanceof CompletionException) {
                    // 設置異常信息
                    result.setException(t.getCause());
                } else {
                    // 直接設置異常
                    result.setException(t);
                }
            } else {
                // 若是沒有異常,則把結果放入異步結果內
                result.setValue(obj);
            }
            // 完成
            asyncRpcResult.complete(result);
        });
        return asyncRpcResult;
    } catch (InvocationTargetException e) {
        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

這裏主要是由於異步化改造而出現的代碼變化,我會在異步化改造中講到這部分。如今主要來看下一步。

(二十四)JavassistProxyFactory的getInvoker方法中匿名類的doInvoke

這裏默認代理實現方式是Javassist。能夠參考《dubbo源碼解析(二十三)遠程調用——Proxy》的(六)JavassistProxyFactory。其中Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。dubbo 會在運行時經過 Javassist 框架爲 Wrapper 生成實現類,並實現 invokeMethod 方法,該方法最終會根據調用信息調用具體的服務。以 DemoServiceImpl 爲例,Javassist 爲其生成的代理類以下。

/** Wrapper0 是在運行時生成的,你們可以使用 Arthas 進行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其餘方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 類型轉換
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根據方法名調用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}

而後就是直接調用的是對應的方法了。到此,方法執行完成了。

結果返回

能夠看到我上述講到的(八)HeaderExchangeHandler的received和(九)HeaderExchangeHandler的handleRequest有好幾處channel.send方法的調用,也就是當結果返回的返回的時候,會主動發送執行結果給客戶端。固然發送的時候仍是會對結果Response 對象進行編碼,編碼邏輯我就先不在這裏闡述。

當客戶端接收到這個返回的消息時候,進行解碼後,識別爲Response 對象,將該對象派發到線程池中,該過程跟服務端接收到調用請求到邏輯是同樣的,能夠參考上述的解析,區別在於到(八)HeaderExchangeHandler的received方法的時候,執行的是handleResponse方法。

(九)HeaderExchangeHandler的handleResponse

static void handleResponse(Channel channel, Response response) throws RemotingException {
    // 若是響應不爲空,而且不是心跳事件的響應,則調用received
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}

(十)DefaultFuture的received

能夠參考《dubbo源碼解析(十)遠程通訊——Exchange層》的(七)DefaultFuture,不過該類的繼承的是CompletableFuture,由於對異步化的改造,該類已經作了一些變化。

public static void received(Channel channel, Response response) {
    received(channel, response, false);
}

public static void received(Channel channel, Response response, boolean timeout) {
    try {
        // future集合中移除該請求的future,(響應id和請求id一一對應的)
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            //得到超時
            Timeout t = future.timeoutCheckTask;
            // 若是沒有超時,則取消timeoutCheckTask
            if (!timeout) {
                // decrease Time
                t.cancel();
            }
            // 接收響應結果
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        // 通道集合移除該請求對應的通道,表明着這一次請求結束
        CHANNELS.remove(response.getId());
    }
}

該方法中主要是對超時的處理,還有對應的請求和響應的匹配,也就是返回相應id的future。

(十一)DefaultFuture的doReceived

能夠參考《dubbo源碼解析(十)遠程通訊——Exchange層》的(七)DefaultFuture,不過由於運用了CompletableFuture,因此該方法徹底重寫了。這部分的改造我也會在異步化改造中講述到。

private void doReceived(Response res) {
    // 若是結果爲空,則拋出異常
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    // 若是結果的狀態碼爲ok
    if (res.getStatus() == Response.OK) {
        // 則future調用完成
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        // 若是超時,則返回一個超時異常
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        // 不然返回一個RemotingException
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}

隨後用戶線程便可從 DefaultFuture 實例中獲取到相應結果。

後記

該文章講解了dubbo調服務端接收到請求後一系列處理的全部步驟以及服務端怎麼把結果發送給客戶端,是目前最新代碼的解析。由於裏面涉及到不少流程,因此能夠本身debug一步一步看一看整個過程。能夠看到本文涉及到異步化改造已經不少了,這也是我在講解異步化改造前想先講解這些過程的緣由。只有弄清楚這些過程,才能更加理解對於異步化改造的意義。下一篇文將講解異步化改造。

相關文章
相關標籤/搜索