源碼剖析 | 螞蟻金服 mPaaS 框架下的 RPC 調用歷程

背景

mPaaS-RPC 是支付寶原生的 RPC 調用庫。git

在客戶端開發過程當中,只須要簡單的調用庫裏封裝的函數便可完成一次數據請求過程,可是遇到異常狀況每每會面對各類異常碼殊不知所云。因此這篇文章帶領你們瞭解一下 mPaaS-RPC 的調用過程以及各類異常碼產生的緣由。json

1. 使用方法

在 Android 端,RPC 的調用是很簡單的,大概分爲如下幾個過程:網絡

1.1 定義 RPC 接口

首先須要定義 RPC 接口,包括接口名,接口特性(是否須要登陸,是否須要簽名等),接口的定義方式以下:session

public interface LegoRpcService {
    @CheckLogin
    @SignCheck
    @OperationType("alipay.lego.card.style")
    LegoCardPB viewCard(MidPageCardRequestPB var1);
}
複製代碼

當客戶端調用 viewCard 方法時,框架就會請求 alipay.lego.card.style 對應的 RPC 接口,接口的參數定義在 MidPageCardRequestPB 中。架構

這個接口調用前須要檢查是否登陸以及客戶端簽名是否正確,所以如此簡單的一個接口就定義了一個 RPC 請求。框架

1.2 調用 RPC 請求

定義完接口後,須要調用 RPC 請求,調用方法以下:async

//建立參數對象
MidPageCardRequestPB request = new MidPageCardRequestPB();
//填充參數
...

//獲取 RpcService
RpcService rpcService = (RpcService)LauncherApplicationAgent.getInstance().getMicroApplicationContext().findServiceByInterface(RpcService.class.getName());
//獲取Rpc請求代理類
LegoRpcService service = rpcService.getRpcProxy(LegoRpcService.class);
//調用方法
LegoCardPB result = service.viewFooter(request);
複製代碼

調用過程大概就分爲以上幾步。ide

值得注意的是,整個過程當中咱們並無去實現 LegoRpcService 這個接口,而是經過 rpcService.getRpcProxy 獲取了一個代理,這裏用的了 Java 動態代理的概念,後面的內容會涉及。模塊化

2. 源碼解析

接下來咱們就來看看這套框架是怎麼運行的。函數

2.1 建立參數對象

參數對象是一個 PB 的對象,這個對象的序列化和反序列化過程須要和服務端對應起來。簡單來講,就是這個參數在客戶端序列化,做爲請求的參數發送請求,而後服務端收到請求後反序列化,根據參數執行請求,返回結果。

MidPageCardRequestPB request = new MidPageCardRequestPB();
複製代碼

2.2 獲取 RPCService

RPCService 的具體實現是 mpaas-commonservice-git Bundle 中的 RPCServiceImpl

這個添加的過程是在 mPaaS 啓動時,調用 CommonServiceLoadAgent 的 load 方法。

@Override
    public final void load() {
        ...
        registerLazyService(RpcService.class.getName(), RpcServiceImpl.class.getName());
        ...
    }
複製代碼

RpcServiceImpl 中 getRpcProxy 方法調用的是 RpcFactory 的 getRpcProxy 方法。

@Override
    public <T> T getRpcProxy(Class<T> clazz) {
        return mRpcFactory.getRpcProxy(clazz);
    }
複製代碼

2.3 獲取 RPC 請求代理類

mRpcFactory 這個對象在 mPaas-Rpc Bundle 中。

public <T> T getRpcProxy(Class<T> clazz) {
        LogCatUtil.info("RpcFactory","clazz=["+clazz.getName()+"]");
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz},
            new RpcInvocationHandler(mConfig,clazz, mRpcInvoker));
    }
複製代碼

這裏就是根據接口建立動態代理的過程,這是 Java 原生支持的一個特性。

簡單來講,動態代理即 JVM 會根據接口生成一個代理類,調用接口方法時,會先調用代理類的 invoke 方法,在 invoke 方法中你能夠根據實際狀況來作操做從而實現動態代理的做用。

2.4 調用方法

動態代理類便調用 RpcInvocationHandler 方法: 即調用定義的 RPC 接口時,會調用到 RpcInvocationHandler 的 invoke 方法:

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws RpcException {
        return mRpcInvoker.invoke(proxy, mClazz, method, args, buildRpcInvokeContext(method));
    }
複製代碼

invoke 方法調用的是 mRpcInvoker 的 invoke 方法,而 mRpcInvoker 是建立 RpcInvocationHandler 傳過來的。invoke 方法在原來參數的基礎上又傳遞了 mClazzbuildRpcInvokeContext(method)

mClazz 很好理解,由於 RpcInvocationHandler 是對應某個類,而 mRpcInvoker 是個單例:它並不知道是在代理哪一個類的方法,因此須要明確告訴它。

buildRpcInvokeContext(method) 從名字來看是一個上下文對象,這裏面保存了請求的上下文信息。

接下來就到了 RpcInvoker 的 invoke 方法:介紹一下 RPC 框架的設計理念,在 invoke 方法中只定義流程,不作具體操做。框架中會註冊不少 Interceptor,每一個流程交給都交給 Interceptor 去作。這個思想在網絡層的設計架構上很常見,好比很著名的 Spring。遺憾的是,我以爲這套 RPC 框架在後期開發過程當中,這個思想有點稍稍被打亂,而 invoke 方法針對一些細節進行了優化處理。

言歸正傳,回到這個 invoke 方法,我總結了一些調用流程,以下圖所示:

簡單來講,即依次調用了 preHandle,singleCall 和 postHandle。下面咱們來看下代碼:

public Object invoke(Object proxy, Class<?> clazz, Method method, Object[] args, InnerRpcInvokeContext invokeContext) throws RpcException {
            ...
            preHandle(proxy, clazz, method, args, method.getAnnotations(),invokeContext);// 前置攔截
           ...
          try{
                response = singleCall(method, args, RpcInvokerUtil.getOperationTypeValue(method, args), id, invokeContext,protoDesc);
                returnObj = processResponse(method,response,protoDesc);
        } catch (RpcException exception) {
            exceptionHandle(proxy, response!=null?response.getResData():null, clazz, method, args, method.getAnnotations(), exception, invokeContext);// 異常攔截
        }
          ...
        postHandle(proxy, response!=null?response.getResData():null, clazz, method, args, method.getAnnotations(), invokeContext);// 後置攔截
         ...
        return returnObj;
    }
複製代碼

2.5 前置攔截

首先來看下前置攔截 preHandle 方法:

private void preHandle(final Object proxy, final Class<?> clazz, final Method method, final Object[] args, Annotation[] annotations, InnerRpcInvokeContext invokeContext) throws RpcException {
        handleAnnotations(annotations, new Handle() {
            @Override
            public boolean handle(RpcInterceptor rpcInterceptor, Annotation annotation) throws RpcException {
                if (!rpcInterceptor.preHandle(proxy, RETURN_VALUE, new byte[]{}, clazz, method,
                        args, annotation, EXT_PARAM)) {
                    throw new RpcException(RpcException.ErrorCode.CLIENT_HANDLE_ERROR,
                            rpcInterceptor + "preHandle stop this call.");
                }
                return true;
            }
        });

        RpcInvokerUtil.preHandleForBizInterceptor(proxy, clazz, method, args, invokeContext, EXT_PARAM, RETURN_VALUE);

        //mock RPC 限流
        RpcInvokerUtil.mockRpcLimit(mRpcFactory.getContext(),method, args);
    }
複製代碼

前置攔截有三個過程:首先 handleAnnotations 處理方法的註解,而後執行業務層定義的攔截器,最後模擬 RPC 限流。

2.6 處理註解

這是前置攔截中最重要的一步,主要調用了 handleAnnotations 方法。handleAnnotations 方法給了一個回調,回調的參數是註解以及對應的 RpcInterceptor,返回後調用 RpcInterceptor 的 preHandle 方法。

介紹 handleAnnotations 以前先簡單說一下 RpcInterceptor,這個在框架中叫作攔截器:

public interface RpcInterceptor {
    public boolean preHandle(Object proxy,ThreadLocal<Object> retValue, byte[] retRawValue, Class<?> clazz, Method method, Object[] args, Annotation annotation,ThreadLocal<Map<String,Object>> extParams) throws RpcException;

    public boolean postHandle(Object proxy,ThreadLocal<Object> retValue, byte[] retRawValue, Class<?> clazz, Method method, Object[] args, Annotation annotation) throws RpcException;

    public boolean exceptionHandle(Object proxy,ThreadLocal<Object> retValue, byte[] retRawValue, Class<?> clazz, Method method, Object[] args, RpcException exception, Annotation annotation) throws RpcException;
}
複製代碼

簡單來講,程序一開始會註冊幾個攔截器,每一個攔截器對應一種註解。invoke 方法中處理註解的時候,會找到對應的攔截器,而後調用攔截器相應的方法。正如前面流程圖中所說的,若是攔截器返回 true,繼續往下運行,若是返回 false,則拋出相應的異常。

接下來繼續說 handleAnnotations: handleAnnotations 其實就是查找攔截器的方法,看一下具體實現:

private boolean handleAnnotations(Annotation[] annotations, Handle handle) throws RpcException {
            for (Annotation annotation : annotations) {
                Class<? extends Annotation> c = annotation.annotationType();
                RpcInterceptor rpcInterceptor = mRpcFactory.findRpcInterceptor(c);
                ret = handle.handle(rpcInterceptor, annotation);

            }
}
複製代碼

咱們調用了 mRpcFactory.findRpcInterceptor(c) 方法去查找攔截器:mRpcFactory.findRpcInterceptor(c) 查找了兩個地方:

  • 一個是 mInterceptors;
  • 另外一個是 GLOBLE_INTERCEPTORS 裏。
public RpcInterceptor findRpcInterceptor(Class<? extends Annotation> clazz) {
        RpcInterceptor rpcInterceptor = mInterceptors.get(clazz);
        if (rpcInterceptor != null) {
            return rpcInterceptor;
        }
        return GLOBLE_INTERCEPTORS.get(clazz);
    }
複製代碼

這兩個地方的攔截器實際上是同樣的,由於 addRpcInterceptor 的時候會往這兩個地方都添加一次。

public void addRpcInterceptor(Class<? extends Annotation> clazz, RpcInterceptor rpcInterceptor) {
        mInterceptors.put(clazz, rpcInterceptor);
        addGlobelRpcInterceptor(clazz,rpcInterceptor);
    }
複製代碼

而以前提到的 Spring 的處理方式是:每種狀況,多個攔截器依次處理,擴展性比較好。

這裏攔截器的添加是在 commonbiz Bundle 中 CommonServiceLoadAgent 的 afterBootLoad 方法中,這個方法也是在 mPaaS 框架啓動的時候調用的。

rpcService.addRpcInterceptor(CheckLogin.class, new LoginInterceptor());
rpcService.addRpcInterceptor(OperationType.class, new CommonInterceptor());
rpcService.addRpcInterceptor(UpdateDeviceInfo.class, new CtuInterceptor(mMicroAppContext.getApplicationContext()));
複製代碼

一共針對三種註解添加了三個攔截器,在文章的最後咱們分析一下每種攔截器作了什麼操做:

handleAnnotations 中找到了對應的攔截器,就調用其 preHandle 方法進行前置攔截;

preHandle 中處理完註解的前置攔截,會在 preHandleForBizInterceptor 處理上下文對象中帶的攔截器;

上下文對象中的攔截器和註解攔截器道理是同樣的,這個階段我看了一下上下文對象 Context 中並無設置攔截器。若是有的話,即取出來而後依次調用對應方法。

preHandle 的最後一步是模擬網絡限流,這是在測試中使用的,若是測試打開 RPC 限流功能,那麼這裏會限制 RPC 的訪問來模擬限流的狀況。

2.7 singlecall

處理完前置攔截,又回到 RpcInvoker 的 invoke 方法,接下來會調用 singleCall 去發起網絡請求。

private Response singleCall(Method method, Object[] args, String operationTypeValue, int id, InnerRpcInvokeContext invokeContext,RPCProtoDesc protoDesc) throws RpcException {
        checkLogin(method,invokeContext);
        Serializer serializer = getSerializer(method, args, operationTypeValue,id,invokeContext,protoDesc);
    if (EXT_PARAM.get() != null) {
        serializer.setExtParam(EXT_PARAM.get());
    }
    byte[] body = serializer.packet();
    HttpCaller caller = new HttpCaller(mRpcFactory.getConfig(), method, id, operationTypeValue, body,
    serializerFactory.getContentType(protoDesc), mRpcFactory.getContext(),invokeContext);
        addInfo2Caller(method, serializer, caller, operationTypeValue, body, invokeContext);
        Response response = (Response) caller.call();// 同步
        return response;
    }
複製代碼

singleCall 一開始又 checkLogin,接着根據參數的類型獲取了序列化器 Serializer,並進行了參數的序列化操做,序列化後的參數做爲整個請求的 body。而後一個 HttpCaller 被建立,HttpCaller 如下是網絡傳輸層的封裝代碼,這篇文章中暫時不關注。

在實際發送請求以前,須要調用 addInfo2Caller 往 HttpCaller 中添加一些通用信息,好比序列化版本,contentType,時間戳,還有根據 SignCheck 註解決定要不要在請求裏添加簽名信息。

按照個人理解,其實這塊也應該在各個 Intercepter 中處理,否則把 SignCheck 這個註解單獨拿出來處理,代碼實在是很差看。

最後咱們能夠去實際調用 caller.call 方法發送請求了,而後收到服務端的回覆。

2.8 ProcessResponse

這樣就執行完 singleCall 方法,得到了服務端的回覆。

過程又回到了 invoke 裏,得到服務端 response 後,調用 processResponse 去處理這個回覆。處理回覆的過程其實就是把服務端的返回結果反序列化,是前面發送請求的一個逆過程,代碼以下:

private Object processResponse(Method method, Response response,RPCProtoDesc protoDesc) {
        Type retType = method.getGenericReturnType();
        Deserializer deserializer = this.serializerFactory.getDeserializer(retType,response,protoDesc);
        Object object = deserializer.parser();
        if (retType != Void.TYPE) {// 非void
            RETURN_VALUE.set(object);
        }
        return object;
    }
複製代碼

在 preHandle,singleCall 和 processResponse 這三個過程當中,若是有 RpcException 拋出(處理過程當中全部的異常狀況都是以 RpcException 的形式拋出),invoke 中會調用 exceptionHandle 異常。

2.9 異常處理

exceptionHandle 也是一樣地去三個 Interceptor 中找相應註釋的攔截器,並調用 exceptionHandle:若是返回 true,說明須要繼續處理,再把這個異常拋出去,交給業務方處理;若是返回 false,則表明異常被吃掉了,不須要被繼續處理了。

private void exceptionHandle(final Object proxy, final byte[] rawResult, final Class<?> clazz, final Method method, final Object[] args, Annotation[] annotations, final RpcException exception,InnerRpcInvokeContext invokeContext) throws RpcException {
        boolean processed = handleAnnotations(annotations, new Handle() {
            @Override
            public boolean handle(RpcInterceptor rpcInterceptor, Annotation annotation) throws RpcException {
                if (rpcInterceptor.exceptionHandle(proxy, RETURN_VALUE, rawResult, clazz, method,
                        args, exception, annotation)) {
                    LogCatUtil.error(TAG, exception + " need process");
                    // throw exception;
                    return true;
                } else {
                    LogCatUtil.error(TAG, exception + " need not process");
                    return false;
                }
            }
        });
        if (processed) {
            throw exception;
        }

    }
複製代碼

2.10 後置處理

處理完異常以後,invoke 方法繼續往下執行,下一步是調用 postHandle 進行後置攔截。流程跟前置攔截徹底同樣,先是去找三個默認的攔截器處理,而後再去 invokeContext 去找業務定製的攔截器,目前這一塊沒有任何實現。

處理 preHandle,singeCall,exceptionHandle 和 postHandle 這幾個主要的流程,invoke 會調用 asyncNotifyRpcHeaderUpdateEvent 去通知關心 Response Header 的 Listener,而後打印返回結果的信息,以後整個流程結束,返回請求結果。

3. 默認攔截器實現

默認攔截器一共有三個,針對三種不一樣的註解:

rpcService.addRpcInterceptor(CheckLogin.class, new LoginInterceptor());
rpcService.addRpcInterceptor(OperationType.class, new CommonInterceptor());
rpcService.addRpcInterceptor(UpdateDeviceInfo.class, new CtuInterceptor(mMicroAppContext.getApplicationContext()));
複製代碼

3.1 LoginInterceptor

第一個 LoginInterceptor,顧名思義,就是檢查登陸的攔截器。這裏咱們只實現了前置攔截的方法 preHandle:

public boolean preHandle(Object proxy, ThreadLocal<Object> retValue, byte[] retRawValue, Class<?> clazz, Method method, Object[] args, Annotation annotation,ThreadLocal<Map<String,Object>> extParams) throws RpcException {
        AuthService authService = AlipayApplication.getInstance().getMicroApplicationContext().getExtServiceByInterface(AuthService.class.getName());
        if (!authService.isLogin() && !ActivityHelper.isBackgroundRunning()) {//未登陸
            LoggerFactory.getTraceLogger().debug("LoginInterceptor", "start login:" + System.currentTimeMillis());

            Bundle params = prepareParams(annotation);
            checkLogin(params);
            LoggerFactory.getTraceLogger().debug("LoginInterceptor", "finish login:" + System.currentTimeMillis());
            fail(authService);
        }
        return true;
    }
複製代碼

檢查是否登陸:

  • 若是沒登陸,拋出 CLIENT_LOGIN_FAIL_ERROR = 11 則異常。

3.2 CommonInterceptor

通用攔截器,攔截的是 OperationType 註解,這個註解的 value 是 RPC 請求的方法名,因此能夠看出 CommonInterceptor 會處理全部的 RPC 請求,這也是爲何叫 CommonInterceptor。

public boolean preHandle(Object proxy, ThreadLocal<Object> retValue, byte[] retRawValue, Class<?> clazz, Method method, Object[] args, Annotation annotation, ThreadLocal<Map<String, Object>> extParams) throws RpcException {
        checkWhiteList(method, args);
        checkThrottle();
        ...
        writeMonitorLog(ACTION_STATUS_RPC_REQUEST, clazz, method, args);

        for (RpcInterceptor i : RpcCommonInterceptorManager.getInstance().getInterceptors()) {
            i.preHandle(proxy, retValue, retRawValue, clazz, method, args, annotation, extParams);
        }

        return true;
    }
複製代碼
  • 第一步:檢查白名單:

在啓動的前3s內,爲了保證性能,只有白名單的請求才能發送。若是不在白名單,會拋出 CLIENT_NOTIN_WHITELIST = 17

  • 第二步:檢查是否限流:

限流是服務端設定的,若是服務端設置了限流,則在某次請求的時候,服務端會返回 SERVER_INVOKEEXCEEDLIMIT=1002異常,這時候 CommonInterceptor 會從返回結果中取到限流到期時間

if(exception.getCode() == RpcException.ErrorCode.SERVER_INVOKEEXCEEDLIMIT){
            String control = exception.getControl();
            if(control!=null){
                mWrite.lock();
                try{
                    JSONObject jsonObject = new JSONObject(control);
                    if(jsonObject.getString("tag").equalsIgnoreCase("overflow")){
                        mThrottleMsg = exception.getMsg();
                        mControl = control;

                        //若是是own的異常,須要更新限流結束的時間
                        if ( exception.isControlOwn() ){
                            mEndTime = System.currentTimeMillis()+jsonObject.getInt("waittime")*1000;
                        }
                    }
                }
        }
複製代碼
  • 第三步:寫監控日誌
writeMonitorLog(ACTION_STATUS_RPC_REQUEST, clazz, method, args);
複製代碼
  • 第四步:處理業務定製的攔截器
for (RpcInterceptor i : RpcCommonInterceptorManager.getInstance().getInterceptors()) {
            i.preHandle(proxy, retValue, retRawValue, clazz, method, args, annotation, extParams);
        }
複製代碼

前面咱們說過 RPC 框架中一個攔截器是跟一個註解綁定的,好比 CommonIntercetor 是跟 operatorType 註解綁定的。可是若是業務方想定製 operatorType 註解的攔截器怎麼辦,就須要在 CommonIntercetor 下面再綁定攔截器列表。目前這裏沒有實現,能夠忽略。

異常攔截 exceptionHandle 是用來處理服務端返回結果中的異常狀況的,業務方能夠根據本身的服務端返回結果進行定製。舉個例子,假如你本地的 session 失效了,請求服務端結果後,服務端返回了登陸失效的狀態碼 SESSIONSTATUS_FAIL。收到這個異常後業務方能夠進行相應的處理了,好比是否須要使用本地保存的帳號密碼進行自動登陸,或者彈出登陸框請求用戶登陸,或者直接返回讓業務方處理等等各類形式。

後置攔截 postHandle 作了一件事,記錄了服務端返回結果的日誌。

3.3 CtuInterceptor

CtuInterceptor 攔截器對應的是 UpdateDeviceInfo 這個註解。這個註解表示這個 RPC 請求須要設備信息。因此前置攔截的時候將設備信息寫入請求的參數裏面。

RpcDeviceInfo rpcDeviceInfo = new RpcDeviceInfo();
        DeviceInfo deviceInfo = DeviceInfo.getInstance();
        // 添加一些設備信息到deviceInfo
        ....
複製代碼

exceptionHandle 和 postHandle 都沒有處理。

以上就是系統默認的三個攔截器,和整個 mPaaS-RPC Bundle 中進行的流程。其實這樣看起來 mPaaS-RPC 只負責網絡請求的封裝和發送,整個流程仍是很簡單的。然而網絡請求返回後根據不一樣的錯誤碼進行不一樣的處理纔是真正複雜的部分,這部分原本是交給具體業務方去處理的。

不過良心支付寶又提供了一層封裝 RPC-Beehive 組件,這層是在網絡層框架和業務方之間的一層封裝,將通用的一些異常碼進行了處理,好比,請求是轉菊花,或者返回異常後顯示通用異常界面。

以上即是針對 mPaaS-RPC 的源碼剖析,歡迎你們反饋想法或建議,一塊兒討論修正。

往期閱讀

《開篇 | 模塊化與解耦式開發在螞蟻金服 mPaaS 深度實踐探討》

《口碑 App 各 Bundle 之間的依賴分析指南》

關注咱們公衆號,得到第一手 mPaaS 技術實踐乾貨

QRCode
相關文章
相關標籤/搜索