Dubbo源碼分析(三)Dubbo的服務引用Refer

Dubbo的服務引用

服務引用

先從Dubbo的配置文件看起java

<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
複製代碼

源碼入口: 根據上一篇說的,咱們經過DubboNamespaceHandler類找到ReferenceBean類,在afterPropertiesSet()方法中咱們找到關鍵代碼getObject()
進入ReferenceConfig類中的get()方法,這個get() 方法是一個同步方法,調用了init()方法
咱們看到init()方法中的最後一行代碼ref = createProxy(map);咱們從這這個方法開始分析:緩存

private T createProxy(Map<String, String> map) {
        ···
            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最後一個registry url
                    }
                }
                if (registryURL != null) { // 有 註冊中心協議的URL
                    // 對有註冊中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // 不是 註冊中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
        ···
        // 建立服務代理
        return (T) proxyFactory.getProxy(invoker);
    }
複製代碼

先看invoker = refprotocol.refer(interfaceClass, urls.get(0))這行代碼
此時的refprotocol= Protocol$Adatptive,進入refer方法:app

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
複製代碼

此時的extName=registry,因此extension=ProtocolFilterWrapper(ProtocolListenerWrapper(RegistryProtocol)),咱們直接進入RegistryProtocol.refer()方法中框架

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
}
複製代碼

看到第二行代碼Registry registry = registryFactory.getRegistry(url);這裏從字面上理解應該是創建和註冊中心的鏈接,這裏的代碼和服務端發佈是同樣的,這裏跳過,繼續往下走group,Dubbo裏面是能夠對服務進行分組,這裏不影響主流程走向,咱們跳過,看到最後一行代碼,咱們進入ide

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);
    }
複製代碼

先看subscribeUrl是啥,這裏的url是consumer開頭的url,看到registry.register()方法,這裏是向註冊中心去註冊消費端信息,具體註冊的節點是:/dubbo/com.alibaba.dubbo.demo.DemoService/consumers
directory.subscribe(),這句代碼一看就明白,應該是向註冊中心訂閱咱們剛剛註冊的地址,咱們進入到這個方法裏面去看看若是目錄地址有變化,怎麼通知,該作什麼樣的處理,最終的實現類是ZookeeperRegistry.doSubscribe()方法中,這裏用到了模板方法,咱們看到doSubscribe()方法中的這段代碼notify(url, listener, urls)源碼分析

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
       ···
        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // 將失敗的通知請求記錄到失敗列表,定時重試
            Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
            ···
            listeners.put(listener, urls);
           ···
        }
    }
複製代碼

這裏面執行了doNotify方法,若是執行失敗,對應的經過定時策略去重試,繼續進入doNotify方法post

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        ···
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            saveProperties(url);
            listener.notify(categoryList);
        }
    }
複製代碼

這個是AbstractRegistry類中的方法,咱們看到saveProperties方法,做用是把消費端註冊的url信息緩存到本地this

registryCacheExecutor.execute(new SaveProperties(version));
複製代碼

而後經過線程池來定時緩存數據,咱們繼續看一下listener.notify(categoryList)這句代碼,這裏的listener是RegistryDirectory編碼

public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
       ···
        // providers
        refreshInvoker(invokerUrls);
    }
複製代碼

看到最後一段代碼refreshInvoker(invokerUrls)url

/** * 根據invokerURL列表轉換爲invoker列表。轉換規則以下: * 1.若是url已經被轉換爲invoker,則不在從新引用,直接從緩存中獲取,注意若是url中任何一個參數變動也會從新引用 * 2.若是傳入的invoker列表不爲空,則表示最新的invoker列表 * 3.若是傳入的invokerUrl列表是空,則表示只是下發的override規則或route規則,須要從新交叉對比,決定是否須要從新引用。 * * @param invokerUrls 傳入的參數不能爲null */
    // TODO: FIXME 使用線程池去刷新地址,不然可能會致使任務堆積
    private void refreshInvoker(List<URL> invokerUrls) {
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // 禁止訪問
            this.methodInvokerMap = null; // 置空列表
            destroyAllInvokers(); // 關閉全部Invoker
        } else {
            this.forbidden = false; // 容許訪問
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表,便於交叉對比
            }
            if (invokerUrls.size() == 0) {
                return;
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 將URL列表轉成Invoker列表
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表
            // state change
            //若是計算錯誤,則不進行處理.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 關閉未使用的Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }
複製代碼

這段代碼的最終目的是刷新urlInvokerMap緩存,而且關閉關閉未使用的Invoker 接下來咱們繼續cluster.join(directory)這個方法 ,此時的cluster=Cluster$Adaptive

public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover");
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
        com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
複製代碼

此時extension=MockClusterWrapper(FaileOverCluster), 這裏有一個Mock包裝類,猜測一下,這個Mock應該是Dubbo的容錯機制中用到的Mock,進入MockClusterWrapper.join方法

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
複製代碼

這裏new了一個MockClusterInvoker,進入FaileOverCluster.join方法

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
複製代碼

這裏new 了一個FailoverClusterInvoker,而後回到最初的ReferenceConfig.createProxy方法,看到最後一段代碼return (T) proxyFactory.getProxy(invoker);這段代碼的做用是建立服務代理,這裏的invoker就是咱們剛剛new的MockClusterInvoker,這裏的proxyFactory=ProxyFactory$Adaptive,直接貼結果,進入StubProxyFactoryWrapper.getProxy

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        T proxy = proxyFactory.getProxy(invoker);
        if (GenericService.class != invoker.getInterface()) {
            String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
            if (ConfigUtils.isNotEmpty(stub)) {
                Class<?> serviceType = invoker.getInterface();
                if (ConfigUtils.isDefault(stub)) {
                    if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                        stub = serviceType.getName() + "Stub";
                    } else {
                        stub = serviceType.getName() + "Local";
                    }
                }
                try {
                    Class<?> stubClass = ReflectUtils.forName(stub);
                    if (!serviceType.isAssignableFrom(stubClass)) {
                        throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + serviceType.getName());
                    }
                    try {
                        Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                        proxy = (T) constructor.newInstance(new Object[]{proxy});
                        //export stub service
                        URL url = invoker.getUrl();
                        if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
                            url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                            url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
                            try {
                                export(proxy, (Class) invoker.getInterface(), url);
                            } catch (Exception e) {
                                LOGGER.error("export a stub service error.", e);
                            }
                        }
                    } catch (NoSuchMethodException e) {
                        throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implemention class " + stubClass.getName(), e);
                    }
                } catch (Throwable t) {
                    LOGGER.error("Failed to create stub implemention class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
                    // ignore
                }
            }
        }
        return proxy;
    }
複製代碼

咱們先看第一行代碼 T proxy = proxyFactory.getProxy(invoker);
這裏的proxyFactory=JavassitProxyFactory,咱們首先進入的是AbstractProxyFactory.getProxy方法,這裏又是一個模版方法,

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        Class<?>[] interfaces = null;
        String config = invoker.getUrl().getParameter("interfaces");
        ···
        if (interfaces == null) {
            interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
        }
        return getProxy(invoker, interfaces);
    }
複製代碼

進入JavassitProxyFactory.getProxy方法,

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
複製代碼

這裏傳入的interfaces=[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
再進入new InvokerInvocationHandler(invoker),這裏初始化一個InvokerInvocationHandler對象,咱們看下這個對象

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}
複製代碼

這裏用了JDK自帶的動態代理Proxy類和InvocationHandler接口,到這裏proxy代理類建立完成。

總結

從Dubbo官網上找到一張引用服務的時序圖

相關文章
相關標籤/搜索