Motan系列-Motan的服務調用

Motan系列文章


0 @MotanReferer註解是個啥

@MotanReferer 標註的 setter 方法或 field 會被motan在啓動時掃描,併爲其建立動態代理,並將動態代理的實例賦值給這個 field。遠程服務的調用都是在這個代理中實現的。java

下面以註解在 field 的狀況爲例,說明 @MotanReferer 的解析以及建立動態代理的過程:node

@MotanReferer(basicReferer = "ad-commonBasicRefererConfigBean", application = "ad-filter", version = "1.1.0")
private AdCommonRPC adCommonRPC;
複製代碼

關於如何掃描,在 Motan如何完成與Spring的集成 一文中有詳細說明,這裏再也不贅述。git

1 @MotanReferer註解的解析

這個過程始於 AnnotationBean 中對掃描到的bean的field的解析。Motan會解析出帶有 @MotanReferer 的field,應調用 AnnotationBean 的 refer 方法初始化並建立代理。field的解析以下:github

Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
    try {
        if (!field.isAccessible()) {
            field.setAccessible(true);
        }
        MotanReferer reference = field.getAnnotation(MotanReferer.class);
        if (reference != null) {
            // 調用 refer 方法初始化並建立動態代理
            // 並將field的引用指向這個代理對象
            Object value = refer(reference, field.getType());
            if (value != null) {
                field.set(bean, value);
            }
        }
    } catch (Throwable t) {
        throw new BeanInitializationException("Failed to init remote service reference at filed " + field.getName()
                + " in class " + bean.getClass().getName(), t);
    }
}
複製代碼

2 @MotanReferer的初始化

相似於服務註冊的過程,MotanReferer是經過 RefererConfigBean 類來管理配置、註冊中心、URL、HA、LoadBalance、Proxy等資源的。服務器

仍是先來個 RefererConfigBean 的UML,熟悉一下整個體系都有啥東西。app

Motan_RefererConfigBean_UML

其中,註冊中心、URL、Protocol、HA、LoadBalance策略等都是在 RefererConfigclusterSupports 中管理的。負載均衡

來繼續看這個refer方法,這個方法中首先將 @MotanReferer 註解中的配置信息解析到 RefererConfigBean 中,而後依然是調用 afterPropertiesSet() 方法作一些校驗,最後調用 RefererConfigBeangetRef() 方法,各個組件的初始化以及Proxy都在這裏建立。異步

private <T> Object refer(MotanReferer reference, Class<?> referenceClass) {
    // 解析接口名
    String interfaceName;
    if (!void.class.equals(reference.interfaceClass())) {
        interfaceName = reference.interfaceClass().getName();
    } else if (referenceClass.isInterface()) {
        interfaceName = referenceClass.getName();
    } else {
        throw new IllegalStateException("The @Reference undefined interfaceClass or interfaceName, and the property type "
                + referenceClass.getName() + " is not a interface.");
    }
    String key = reference.group() + "/" + interfaceName + ":" + reference.version();
    RefererConfigBean<T> referenceConfig = referenceConfigs.get(key);
    if (referenceConfig == null) {
        referenceConfig = new RefererConfigBean<T>();
        referenceConfig.setBeanFactory(beanFactory);
        if (void.class.equals(reference.interfaceClass())
                && referenceClass.isInterface()) {
            referenceConfig.setInterface((Class<T>) referenceClass);
        } else if (!void.class.equals(reference.interfaceClass())) {
            referenceConfig.setInterface((Class<T>) reference.interfaceClass());
        }

        if (beanFactory != null) {
            // ... 省略,初始化 @MotanReferer的配置信息
            try {
                // 校驗basicReferer配置、Protocol、Registry配置,與服務註冊過程類似
                referenceConfig.afterPropertiesSet();
            } catch (RuntimeException e) {
                throw (RuntimeException) e;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        referenceConfigs.putIfAbsent(key, referenceConfig);
        referenceConfig = referenceConfigs.get(key);
    }
    // 建立Proxy
    return referenceConfig.getRef();
}
複製代碼

getRef() 方法中實際是調用 initRef() 方法來建立Proxy的。async

public synchronized void initRef() {
    // ... 校驗 interface 和 protocols 是否非空
    checkInterfaceAndMethods(interfaceClass, methods);

    clusterSupports = new ArrayList<>(protocols.size());
    List<Cluster<T>> clusters = new ArrayList<>(protocols.size());
    String proxy = null;

    ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);

    // 解析註冊中心地址
    List<URL> registryUrls = loadRegistryUrls();
    // 解析本機IP
    String localIp = getLocalHostAddress(registryUrls);
    for (ProtocolConfig protocol : protocols) {
        Map<String, String> params = new HashMap<>();
        params.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_REFERER);
        params.put(URLParamType.version.getName(), URLParamType.version.getValue());
        params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));

        collectConfigParams(params, protocol, basicReferer, extConfig, this);
        collectMethodConfigParams(params, this.getMethods());

        String path = StringUtils.isBlank(serviceInterface) ? interfaceClass.getName() : serviceInterface;
        URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, path, params);
        // 初始化ClusterSupport
        ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);

        clusterSupports.add(clusterSupport);
        clusters.add(clusterSupport.getCluster());

        if (proxy == null) {
            // 獲取建立proxy的方式,默認是JDK動態代理
            String defaultValue = StringUtils.isBlank(serviceInterface) ? URLParamType.proxy.getValue() : MotanConstants.PROXY_COMMON;
            proxy = refUrl.getParameter(URLParamType.proxy.getName(), defaultValue);
        }
    }
    // 建立代理
    ref = configHandler.refer(interfaceClass, clusters, proxy);

    initialized.set(true);
}
複製代碼

能夠發現,又調用了 configHandler.refer 方法,默認狀況下,這個proxy參數的值是"jdk",即便用JDK自身的動態代理功能建立代理。 另一個比較重要的類是 ClusterSupport,這個類封裝了下面這些信息:ide

private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<String, Protocol>();
// 集羣支持
private Cluster<T> cluster;
// 註冊中心URL
private List<URL> registryUrls;
// 遠程調用URL
private URL url;
private Class<T> interfaceClass;
// 使用的協議
private Protocol protocol;
private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<URL, List<Referer<T>>>();
複製代碼

其中,cluster 在motan的具體實現其實是 ClusterSpi 這個類,他封裝瞭如下信息:

public class ClusterSpi<T> implements Cluster<T> {
    // 高可用策略
    private HaStrategy<T> haStrategy;
    // 負載均衡策略
    private LoadBalance<T> loadBalance;
    private List<Referer<T>> referers;
    private AtomicBoolean available = new AtomicBoolean(false);
    private URL url;
}
複製代碼

這些東西在上面的 createClusterSupport 方法中生成好了,這裏先不關注Cluster、Ha、LoadBalance等,本文主要關注代理的建立及RPC調用。繼續看 configHandler.refer 這個方法。

public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
    ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
    return proxyFactory.getProxy(interfaceClass, clusters);
}
複製代碼

又是一個SPI擴展,默認使用的下面這個JDK的ProxyFactory。

@SpiMeta(name = "jdk")
public class JdkProxyFactory implements ProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clz, List<Cluster<T>> clusters) {
        return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new RefererInvocationHandler<>(clz, clusters));
    }
}
複製代碼

OK,到這裏代理就建立好了。這個代理的實例最終會被 @MotanReferer 註解的field引用。初始化過程結束。

這裏要明確,最終是經過 RefererInvocationHandler 這個類建立的代理。

3 RPC調用

既然 RefererInvocationHandler 代理了咱們的目標接口,那麼接口的每一個方法調用都會走到這個代理類中。因此接下來主要關注代理是咋完成RPC調用的。

這裏只給出關鍵代碼:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 省略 local method 部分

    DefaultRequest request = new DefaultRequest();
    request.setRequestId(RequestIdGenerator.getRequestId());
    request.setArguments(args);
    String methodName = method.getName();
    boolean async = false; // 異步調用支持,暫不關注
    if (methodName.endsWith(MotanConstants.ASYNC_SUFFIX) && method.getReturnType().equals(ResponseFuture.class)) {
        methodName = MotanFrameworkUtil.removeAsyncSuffix(methodName);
        async = true;
    }
    request.setMethodName(methodName);
    request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
    request.setInterfaceName(interfaceName);

    return invokeRequest(request, getRealReturnType(async, this.clz, method, methodName), async);
}
複製代碼

這個方法先將RPC的相關信息封裝到 DefaultRequest 中,而後調用 invokeRequest 方法。

Object invokeRequest(Request request, Class returnType, boolean async) throws Throwable {
    RpcContext curContext = RpcContext.getContext();
    // 省略 初始化 RpcContext 

    // 當 referer配置多個protocol的時候,好比A,B,C,
    // 那麼正常狀況下只會使用A,若是A被開關降級,那麼就會使用B,B也被降級,那麼會使用C
    for (Cluster<T> cluster : clusters) {
        // 若是開關處於關閉狀態,不會去調用這個遠程機器
        String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
        Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
        if (switcher != null && !switcher.isOn()) {
            continue;
        }

        request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
        request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
        // 帶上client的application和module
        request.setAttachment(URLParamType.application.getName(), cluster.getUrl().getApplication());
        request.setAttachment(URLParamType.module.getName(), cluster.getUrl().getModule());

        Response response = null;
        boolean throwException = Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(), URLParamType.throwException.getValue()));
        try {
            MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_INVOKE);
            // 執行調用
            response = cluster.call(request);
            if (async) {
                // 省略異步調用的支持 
            } else {
                Object value = response.getValue();
                if (value != null && value instanceof DeserializableObject) {
                    try {
                        value = ((DeserializableObject) value).deserialize(returnType);
                    } catch (IOException e) {
                        LoggerUtil.error("deserialize response value fail! deserialize type:" + returnType, e);
                        throw new MotanFrameworkException("deserialize return value fail! deserialize type:" + returnType, e);
                    }
                }
                return value;
            }
        } catch (RuntimeException e) {
            // 異常處理,包括處理是否向上遊服務拋出
        }
    }
    throw new MotanServiceException("Referer call Error: cluster not exist, interface=" + interfaceName + " " + MotanFrameworkUtil.toString(request), MotanErrorMsgConstant.SERVICE_UNFOUND);
}
複製代碼

cluster.call()

public Response call(Request request) {
    if (available.get()) {
        try {
            // haStrategy是經過SPI來管理的,默認的HA策略是 failover
            // 即調用失敗時,自動嘗試其餘服務器
            return haStrategy.call(request, loadBalance);
        } catch (Exception e) {
            return callFalse(request, e);
        }
    }
    return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}
複製代碼

haStrategy.call()

public Response call(Request request, LoadBalance<T> loadBalance) {
    // refer列表
    List<Referer<T>> referers = selectReferers(request, loadBalance);
    if (referers.isEmpty()) {
        throw new MotanServiceException(String.format("FailoverHaStrategy No referers for request:%s, loadbalance:%s", request,
                loadBalance));
    }
    URL refUrl = referers.get(0).getUrl();
    // 這裏是配置中配置的 retries 重試次數,默認:0
    int tryCount =
            refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
                    URLParamType.retries.getIntValue());
    // 若是有問題,則設置爲不重試
    if (tryCount < 0) {
        tryCount = 0;
    }

    for (int i = 0; i <= tryCount; i++) {
        Referer<T> refer = referers.get(i % referers.size());
        try {
            request.setRetries(i);
            return refer.call(request); // RPC
        } catch (RuntimeException e) {
            // 對於業務異常,直接拋出
            if (ExceptionUtil.isBizException(e)) {
                throw e;
            } else if (i >= tryCount) {
                throw e;
            }
            LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
        }
    }

    throw new MotanFrameworkException("FailoverHaStrategy.call should not come here!");
}
複製代碼

而後,refer.call()

public Response call(Request request) {
    if (!isAvailable()) {
        throw new MotanFrameworkException(this.getClass().getSimpleName() + " call Error: node is not available, url=" + url.getUri()
                + " " + MotanFrameworkUtil.toString(request));
    }
    // 增長目標server的鏈接數,用於loadBalance
    incrActiveCount(request);
    Response response = null;
    try {
        response = doCall(request); // do rpc
        return response;
    } finally {
        // 調用完要將目標server的鏈接數-1
        decrActiveCount(request, response);
    }
}
複製代碼

到這裏,doCall 方法就是經過Netty調用RPC了。

相關文章
相關標籤/搜索