[dubbo 源碼之 ]2. 服務消費方如何啓動服務

啓動流程

消費者在啓動以後,會經過ReferenceConfig#get()來生成遠程調用代理類。在get方法中,會啓動一系列調用函數,咱們來一個個解析。java

配置一樣包含2種:spring

  • XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="first-consumer-xml"/>
    <dubbo:registry address="zookeeper://127.0.0.1:2181/"/>
    <dubbo:reference proxy="javassist" scope="remote"
                     id="demoService"
                     generic="false"
                     check="false"
                     async="false"
                     group="dubbo-sxzhongf-group"
                     version="1.0.0"
                     interface="com.sxzhongf.deep.in.dubbo.api.service.IGreetingService">
        <dubbo:method name="sayHello" retries="3" timeout="5000" mock="false" />
        <dubbo:method name="testGeneric" retries="3" timeout="5000" mock="false" />
    </dubbo:reference>
</beans>
  • Java API
public class ApiConsumerApplication {
    public static void main(String[] args) {
        // 1. 建立服務引用對象實例
        ReferenceConfig<IGreetingService> referenceConfig = new ReferenceConfig<IGreetingService>();
        // 2. 設置應用程序信息
        referenceConfig.setApplication(new ApplicationConfig("deep-in-dubbo-first-consumer"));
        // 3. 設置註冊中心
        referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181/"));
        // 4. 設置服務接口和超時時間
        referenceConfig.setInterface(IGreetingService.class);
        // 默認重試3次
        referenceConfig.setTimeout(5000);
        // 5 設置服務分組和版本
        referenceConfig.setGroup("dubbo-sxzhongf-group");
        referenceConfig.setVersion("1.0.0");
        // 6. 引用服務
        IGreetingService greetingService = referenceConfig.get();
        // 7. 設置隱式參數
        RpcContext.getContext().setAttachment("company", "sxzhongf");
        // 獲取provider端傳遞的隱式參數(FIXME: 須要後續追蹤)
//        System.out.println("年齡是:" + RpcContext.getContext().getAttachment("age"));
        //8. 調用服務
        System.out.println(greetingService.sayHello("pan"));
    }
}
1. new ReferenceConfig

在此階段,會初始化org.apache.dubbo.config.AbstractConfig & org.apache.dubbo.config.ReferenceConfig的靜態變量以及靜態代碼塊。apache

2. ReferenceConfig#get
  • ReferenceConfig#initjson

    1. 經過DubboBootstrap啓動dubbo。
    2. 繼而初始化服務的元數據信息,URL.buildKey(interfaceName, group, version)這段用來生成惟一服務的key,因此咱們以前說dubbo的惟一標識是經過全路徑和group以及version來決定的。
    3. 接下來經過org.apache.dubbo.config.utils.ConfigValidationUtils#checkMock來檢查咱們mock是否設置正確。
    4. 設置一系列要用的參數(系統運行參數、是否爲consumer、是否爲泛型調用等等),檢查dubbo的註冊地址,默認爲當前主機IP
  • ReferenceConfig#createProxy 建立調用代理開始
  1. ReferenceConfig#shouldJvmRefer首先判斷是否爲Injvm調用
  2. 若是不爲injvm,判斷是否爲peer to peer端對端設置,若是爲p2p,那麼就直連url
  3. 檢查註冊中心是否存在(註冊中心有可能有多個)
  4. 循環檢查註冊中心是否有效
  5. 配置轉換URLapi

    registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=deep-in-dubbo-first-consumer&dubbo=2.0.2&pid=9780&refer=application%3Ddeep-in-dubbo-first-consumer%26dubbo%3D2.0.2%26group%3Ddubbo-sxzhongf-group%26interface%3Dcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%26methods%3DsayHello%2CtestGeneric%26pid%3D9780%26register.ip%3D192.168.14.99%26release%3D2.7.5%26revision%3D1.0.0%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D5000%26timestamp%3D1582959441066%26version%3D1.0.0&registry=zookeeper&release=2.7.5&timestamp=1582961922459
  6. 若是隻有一個註冊中心,執行REF_PROTOCOL.refer(interfaceClass, urls.get(0));來將URL轉爲Invoker對象,由於private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();是擴展是Adaptive,所以在這裏會執行Protocol$Adaptive#refer方法,又因爲protocol參數值爲registry,所以會只是RegistryProtocol#refer,又因爲被Wrapper類裝配,所以會先執行三個Wrapper類,最後才能執行到RegistryProtocol#refer -> RegistryProtocol#doRefer,在doRefer方法中會訂閱服務提供者地址,最後返回Invoker對象。1582964279895.png1582977260255.png

那麼到底是如何生成的Invoker對象呢?咱們來看下具體代碼:服務器

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 1.能夠查尋RegistryDirectory & StaticDirectory 區別
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    //2. 封裝訂閱所用URL
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        registry.register(directory.getRegisteredConsumerUrl());
    }
    //3.build路由規則鏈
    directory.buildRouterChain(subscribeUrl);
    //4.訂閱服務提供者地址
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    //5.封裝集羣策略到虛擬invoker
    Invoker invoker = cluster.join(directory);
    return invoker;
}

上述代碼中,步驟1根據URL生成了一個RegistryDirectory(關於Directory接口的做用,能夠自行查詢一些,直白一些就是將一堆Invoker對象封裝成一個List<Invoker>,只有2種實現RegistryDirectory & StaticDirectory,從命名可看出一個是動態可變,一個不可變),代碼2 封裝了訂閱所要使用的參數信息,代碼3則是封裝綁定路由規則鏈,代碼4訂閱。代碼5調用 Cluster$Adaptive#join方法生成Invoker對象。app

在代碼2種從zk獲取服務提供者信息:
1582978175936.png
一旦zk返回服務提供者列表以後,就會調用RegistryDirectory#notify,以下:
1582978319055.png負載均衡

org.apache.dubbo.common.utils.UrlUtils#isMatch中對provider和consumer的api進行匹配校驗。繼續跟蹤:RegistryDirectory#notify -> RegistryDirectory#refreshOverrideAndInvoker -> RegistryDirectory#refreshInvoker -> RegistryDirectory#toInvokerstoInvokers正式將URL轉換爲Invoker,經過invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); 在這裏protocol#refer一樣執行順序如:jvm

(dubbo 2.7.5) protocol#refer -> protocol$Adaptive#refer -> QosProtocolWrapper#refer -> ProtocolListenerWrapper#refer -> ProtocolFilterWrapper#refer ->AbstractProtocol#refer->DubboProtocol#protocolBindingRefer,調用代碼以下:async

@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

關注getClients,其中執行了DubboProtocol#getSharedClient -> DubboProtocol#initClient 建立netty client進行鏈接。

由於這裏使用的是明確的DubboInvoker,在回調的時候,Wrapper會對該Invoker進行包裝,執行效果以下:
1582984563602.png

所以,會執行到ProtocolFilterWrapper#buildInvokerChain,該函數會對服務進行調用鏈跟蹤:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 獲取全部在MATA-INF文件中配置的激活的責任連接口
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    Result asyncResult;
                    try {
                        asyncResult = filter.invoke(next, invocation);
                    } catch (Exception e) {
                        if (filter instanceof ListenableFilter) {// Deprecated!
                            Filter.Listener listener = ((ListenableFilter) filter).listener();
                            if (listener != null) {
                                listener.onError(e, invoker, invocation);
                            }
                        } else if (filter instanceof Filter.Listener) {
                            Filter.Listener listener = (Filter.Listener) filter;
                            listener.onError(e, invoker, invocation);
                        }
                        throw e;
                    } finally {

                    }
                    return asyncResult.whenCompleteWithContext((r, t) -> {
                        if (filter instanceof ListenableFilter) {// Deprecated!
                            Filter.Listener listener = ((ListenableFilter) filter).listener();
                            if (listener != null) {
                                if (t == null) {
                                    listener.onMessage(r, invoker, invocation);
                                } else {
                                    listener.onError(t, invoker, invocation);
                                }
                            }
                        } else if (filter instanceof Filter.Listener) {
                            Filter.Listener listener = (Filter.Listener) filter;
                            if (t == null) {
                                listener.onMessage(r, invoker, invocation);
                            } else {
                                listener.onError(t, invoker, invocation);
                            }
                        } else {// Deprecated!
                            filter.onResponse(r, invoker, invocation);
                        }
                    });
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }

    return last;
}

全部的負載均衡、容錯策略等都是在這裏綁定的。
7.若是有多個註冊中心,將會循環執行步驟6,將URL轉換爲Invoker對象,而後添加到一個List,分別進行註冊以後,而後判斷最後一個註冊中心url是否有效,針對多訂閱的場景,URL中添加cluster參數,默認使用org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster策略,使用org.apache.dubbo.rpc.cluster.Cluster#join將多個Invoker對象封裝一個虛擬的Invoker中,不然若是最後一個註冊中心是無效的,直接封裝Invoker對象。
8.建立服務代理ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>),由於ProxyFactory是一個適配類。那麼一樣這裏會調用ProxyFactory$Adaptive#getProxy,這裏最終就是返回一個代理服務的Invoker對象。

至此,咱們的消費端的綁定遠程zk的服務就已經結束了。那麼,咱們在調用服務器方法的時候服務器端和客戶端都是如何處理的呢?下節咱們將繼續分析。

相關文章
相關標籤/搜索