六、Dubbo的服務導出1之導出到本地

一、尋找Dubbo服務導出的入口方法

context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});

// 刪除了一些步驟
public void refresh() throws BeansException, IllegalStateException {
    synchronized (this.startupShutdownMonitor) {
        try {
            // 1. 裏面的核心代碼就是初始化了applicationEventMulticaster,用於後面發佈事件使用
            // this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
            initApplicationEventMulticaster();

            // 2. 初始化非延遲加載的bean,這裏就會初始化dubbo配置的一些bean,包括ServiceBean,用於服務導出
            finishBeanFactoryInitialization(beanFactory);

            // 3. 發佈容器刷新事件,這裏面是服務導出的入口
            finishRefresh();
        }
    }
}
// 步驟2分析
// 這裏Spring容器會初始化非延遲加載的bean,包括<dubbo:service/>表示的bean
// <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
finishBeanFactoryInitialization(beanFactory);
// Spring容器初始化<dubbo:service/>表示的ServiceBean時會建立ServiceBean對象,因爲ServiceBean實現了
// ApplicationContextAware接口,因此Spring容器會先調用setApplicationContext給其注入Spring容器
class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
          ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
   @Override
   public void setApplicationContext(ApplicationContext applicationContext) {
       this.applicationContext = applicationContext;
       // 給SpringExtensionFactory注入Spring容器
       SpringExtensionFactory.addApplicationContext(applicationContext);
       if (applicationContext != null) {
          SPRING_CONTEXT = applicationContext;
          try {
              // method是addListener方法,調用該方法用於給applicationEventMulticaster
              // 添加listener                                    
              method.invoke(applicationContext, new Object[]{this});
              supportedApplicationListener = true;
          } 
       }
   }     
}
// 步驟3分析,發佈相關事件,這裏會發佈容器刷新事件
finishRefresh();

protected void finishRefresh() {
    initLifecycleProcessor();
    getLifecycleProcessor().onRefresh();

    // 1). 發佈容器刷新事件,ServiceBean監聽的就是該事件
    // ServiceBean implements ApplicationListener<ContextRefreshedEvent>
    publishEvent(new ContextRefreshedEvent(this));

    LiveBeansView.registerApplicationContext(this);
}

// 2). 步驟1會走到這裏,這裏會獲取以前的applicationEventMulticaster,用於發佈事件
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);

// 3). 走到這裏,listener就是以前調用 method.invoke(applicationContext, new Object[]{this});
// 加進去的ServiceBean,this表示ServiceBean,也是listener,event就是容器刷新事件
doInvokeListener(listener, event);

// 4) 走到這裏,最終調用ServiceBean實現的onApplicationEvent方法
listener.onApplicationEvent(event);

這樣,就走到了Dubbo暴露服務的入口的方法.這也是Dubbo官方文檔中說起的入口方法,參考: 服務導出html

public void onApplicationEvent(ContextRefreshedEvent event) {
    // 若是服務沒有被暴露而且服務沒有被取消暴露,則打印日誌
    if (isDelay() && !isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        // 導出服務
        export();
    }
}

二、Dubbo服務導出

接下來研究一下 Dubbo 導出服務的過程。Dubbo 服務導出過程始於Spring容器發佈刷新事件,Dubbo在接收到事件後,會當即執行服務導出邏輯。整個邏輯大體可分爲三個部分,第一部分是前置工做,主要用於檢查參數,組裝 URL。第二部分是導出服務,包含導出服務到本地 (JVM),和導出服務到遠程兩個過程。第三部分是向註冊中心註冊服務,用於服務發現。下面將會對這三個部分代碼進行詳細的分析。spring

2.一、服務導出的前置工做

服務導出的入口方法是ServiceBean的onApplicationEvent。onApplicationEvent 是一個事件響應方法,該方法會在收到Spring上下文刷新事件後執行服務導出操做。方法代碼以下apache

public void onApplicationEvent(ContextRefreshedEvent event) {
    // 若是服務沒有被暴露而且服務沒有被取消暴露,則打印日誌
    if (isDelay() && !isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        // 導出服務
        export();
    }
}

這個方法首先會根據條件決定是否導出服務,好比有些服務設置了延時導出,那麼此時就不該該在此處導出。還有一些服務已經被導出了,或者當前服務被取消導出了,此時也不能再次導出相關服務。注意這裏的 isDelay 方法,這個方法字面意思是「是否延遲導出服務」,返回 true 表示延遲導出,false 表示不延遲導出。可是該方法真實意思卻並不是如此,當方法返回 true 時,表示無需延遲導出。返回 false 時,表示須要延遲導出。與字面意思偏偏相反,這個須要你們注意一下。
前置工做主要包含兩個部分,分別是配置檢查,以及 URL 裝配。在導出服務以前,Dubbo 須要檢查用戶的配置是否合理,或者爲用戶補充缺省配置。配置檢查完成後,接下來須要根據這些配置組裝 URL。在 Dubbo 中,URL 的做用十分重要。Dubbo 使用 URL 做爲配置載體,全部的拓展點都是經過 URL 獲取配置。這一點,官方文檔中有所說明。下面的export方法會走到doExport()方法。segmentfault

public synchronized void export() {
    if (provider != null) {
        if (export == null) {
            export = provider.getExport();
        }
        if (delay == null) {
            delay = provider.getDelay();
        }
    }
    if (export != null && !export) {
        return;
    }
    if (delay != null && delay > 0) {
        delayExportExecutor.schedule(new Runnable() {
            @Override
            public void run() {
                doExport();
            }
        }, delay, TimeUnit.MILLISECONDS);
    } else {
        doExport();
    }
}

如下是配置檢查的相關分析,代碼比較多,須要你們耐心看一下。下面對配置檢查的邏輯進行簡單的總結,以下:
1) 檢測 <dubbo:service> 標籤的 interface 屬性合法性,不合法則拋出異常
2) 檢測 ProviderConfig、ApplicationConfig 等核心配置類對象是否爲空,若爲空,則嘗試從其餘配置類對象中獲取相應的實例。
3) 檢測並處理泛化服務和普通服務類
4) 檢測本地存根配置,並進行相應的處理
5) 對 ApplicationConfig、RegistryConfig 等配置類進行檢測,爲空則嘗試建立,若沒法建立則拋出異常配置檢查並不是本文重點,所以這裏不打算對 doExport 方法所調用的方法進行分析(doExportUrls 方法除外)。在這些方法中,除了appendProperties方法稍微複雜一些,其餘方法邏輯不是很複雜。所以,你們可自行分析。app

protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
        if (interfaceName == null || interfaceName.length() == 0) {
            // 拋異常
        }
        checkDefault();
if (provider != null) {
            if (application == null) {
                application = provider.getApplication();
            }
            if (module == null) {
                module = provider.getModule();
            }
            if (registries == null) {
                registries = provider.getRegistries();
            }
            if (monitor == null) {
                monitor = provider.getMonitor();
            }
            if (protocols == null) {
                protocols = provider.getProtocols();
            }
        }
if (module != null) {
            if (registries == null) {
                registries = module.getRegistries();
            }
            if (monitor == null) {
                monitor = module.getMonitor();
            }
        }
        if (application != null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }
// 檢測ref是否爲泛化服務類型
        if (ref instanceof GenericService) {
            // 設置interfaceClass爲GenericService
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                // 設置generic = true
                generic = Boolean.TRUE.toString();
            }
        } else {
            try {
                // 得到接口類型
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            // 對interfaceClass,以及<dubbo:method>標籤中的必要字段進行檢查
            checkInterfaceAndMethods(interfaceClass, methods);
            // 對ref合法性進行檢測
            checkRef();
            generic = Boolean.FALSE.toString();
        }
// stub local同樣都是配置本地存根
        if (local != null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local";
            }
            Class<?> localClass;
            try {
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } 
        }
        if (stub != null) {
            if ("true".equals(stub)) {
                stub = interfaceName + "Stub";
            }
            Class<?> stubClass;
            try {
                stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
            } 
        }
checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        // 本地存根、mock合法性校驗
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }

        // 核心代碼,暴露服務、註冊邏輯就在其中
        doExportUrls();

        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}

2.二、多協議多註冊中心導出服務

Dubbo 容許咱們使用不一樣的協議導出服務,也容許咱們向多個註冊中心註冊服務。Dubbo在doExportUrls方法中對多協議,多註冊中心進行了支持。相關代碼以下dom

/**
 * 多協議多註冊中心暴露服務進行支持
 */
private void doExportUrls() {
    // 加載註冊中心連接
    List<URL> registryURLs = loadRegistries(true);
    // 遍歷protocols,並在每一個協議下暴露
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

上面代碼首先是經過loadRegistries加載註冊中心連接,而後再遍歷ProtocolConfig集合導出每一個服務。並在導出服務的過程當中,將服務註冊到註冊中心。咱們先來看一下loadRegistries方法的邏輯。先能夠打開看下該方法能夠獲得什麼。jvm

<!-- provider's application name, used for tracing dependency relationship -->
<dubbo:application name="demo-provider"/>

<!-- use multicast registry center to export service -->
<dubbo:registry address="zookeeper://10.101.99.127:2181"/>

<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol name="dubbo" port="20880"/>

<dubbo:provider server="netty4"/>

<!-- service implementation, as same as regular local bean -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>

<!-- declare the service interface to be exported -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

clipboard.png

protected List<URL> loadRegistries(boolean provider) {
    checkRegistry();
    List<URL> registryList = new ArrayList<URL>();
    // 若是registries爲空,直接返回空集合
    if (registries != null && !registries.isEmpty()) {
        // 遍歷註冊中心配置集合registries
        for (RegistryConfig config : registries) {
            // 得到地址
            String address = config.getAddress();
            // 若地址爲空,則設置爲0.0.0.0
            if (address == null || address.length() == 0) {
                address = Constants.ANYHOST_VALUE;
            }
            String sysaddress = System.getProperty("dubbo.registry.address");
            if (sysaddress != null && sysaddress.length() > 0) {
                address = sysaddress;
            }
// 若是地址爲N/A,則跳過
            if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                Map<String, String> map = new HashMap<String, String>();
                // 添加ApplicationConfig中的字段信息到map中
                appendParameters(map, application);
                // 添加RegistryConfig字段信息到map中
                appendParameters(map, config);
                // 添加path、協議版本
                map.put("path", RegistryService.class.getName());
                map.put("dubbo", Version.getProtocolVersion());
                map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                if (ConfigUtils.getPid() > 0) {
                    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                }
                // 若是map中沒有protocol,則默認爲使用dubbo協議
                if (!map.containsKey("protocol")) {
                    if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).
                                                                     hasExtension("remote")) {
                        map.put("protocol", "remote");
                    } else {
                        map.put("protocol", "dubbo");
                    }
                }
// 解析獲得URL列表,address可能包含多個註冊中心ip,所以解析獲得的是一個URL列表
                List<URL> urls = UrlUtils.parseURLs(address, map);
                // 遍歷URL 列表
                for (URL url : urls) {
                    // 將URL協議頭設置爲registry
                    url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                    // 這裏將協議設置爲了registry,這也是後面調用的是RegistryProtocol的export()方法緣由
                    url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                    // 經過判斷條件,決定是否添加url到registryList中,條件以下:
                    // 若是是服務提供者,而且是註冊中心服務或者是消費者端,而且是訂閱服務,則加入到registryList
                    if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                            || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}

ProtocolConfig主要封裝了<dubbo:protocol name="dubbo" port="20880"/>標籤的信息,意思是使用Dubbo協議暴露服務。
clipboard.pngide

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 獲取協議名
    String name = protocolConfig.getName();
    // 若是爲空,則是默認的dubbo
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }

    Map<String, String> map = new HashMap<String, String>();
    // 設置服務提供者側
    map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }

    // 這段代碼其實完成了子節點配置信息對父節點的覆蓋
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider, Constants.DEFAULT_KEY);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
// 若是method的配置列表不爲空
    if (methods != null && !methods.isEmpty()) {
        // 遍歷method配置列表
        for (MethodConfig method : methods) {
            // 把方法名加入map
            appendParameters(map, method, method.getName());
            // 添加 MethodConfig對象的字段信息到map中,鍵=方法名.屬性名
            // 好比存儲<dubbo:method name="sayHello" retries="2">對應的MethodConfig,
            // 鍵=sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                // 若是retryValue爲false,則不重試,設置值爲0
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }

clipboard.png

if (ProtocolUtils.isGeneric(generic)) {
        map.put(Constants.GENERIC_KEY, generic);
        map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
    } else {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            map.put(Constants.METHODS_KEY, 
                                 StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(Constants.TOKEN_KEY, token);
        }
    }
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
        protocolConfig.setRegister(false);
        map.put("notify", "false");
    }
    // export service
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }

    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    URL url = new URL(name, host, port, 
           (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                                                                .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                               .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }
String scope = url.getParameter(Constants.SCOPE_KEY);
    // don't export when none is configured
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        // 暴露到本地
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        // 暴露到遠程
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            // 後面分析
        }
    }
    this.urls.add(url); 
}

2.三、暴露服務到本地

前置工做作完,接下來就能夠進行服務導出了。服務導出分爲導出到本地(JVM)和導出到遠程。ui

// 暴露到本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
    exportLocal(url);
}
private void exportLocal(URL url) {
    // 若是協議不是injvm
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        // 生成本地的url,分別把協議改成injvm,設置host和port
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(LOCALHOST).setPort(0);
        ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
        // 經過代理工程建立invoker
        // 再調用export方法進行暴露服務,生成Exporter
        // 這裏的protocol是生成的拓展代理對象,具體可看https://segmentfault.com/a/1190000020384210
        // 它是在運行時才根據URL中的protocol參數去決定運行哪一個Protocol實例的export方法,這裏因爲前面 
        // setProtocol(Constants.LOCAL_PROTOCOL),因此調用的是InjvmProtocol的export方法
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        // 把生成的暴露者加入集合
        exporters.add(exporter);  
    }
}

下面兩個是url和local的具體值,由於Dubbo採用自適應拓展機制,exportLocal(URL url)中用到的protocol是自適應拓展,protocol的export方法會用到URL中protocol參數從而決定具體生成protocol的哪一個實例,因此URL的protocol值能夠關注下。this

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

clipboard.png

clipboard.png

下面分析下面這句代碼。它是核心方法,分爲兩步。

Exporter<?> exporter = protocol.export(
                              proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
1) proxyFactory.getInvoker(ref, (Class) interfaceClass, local)  ->  返回invoker
2) protocol.export(invoker)
// 步驟1)分析
// proxyFactory也是自適應拓展代理帶,它默認使用JavassistProxyFactory
proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

// 這裏調用的就是JavassistProxyFactory的getInvoker方法
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // 建立Wrapper對象
    final Wrapper wrapper = 
           Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 建立匿名Invoker類對象,並實現doInvoke方法
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            // 調用Wrapper的invokeMethod方法,invokeMethod最終會調用目標方法
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

在 Dubbo 中,Invoker是一個很是重要的模型。在服務提供端,以及服務引用端均會出現Invoker。Dubbo 官方文檔中對Invoker進行了說明,這裏引用一下。Invoker是實體域,它是Dubbo的核心模型,其它模型都向它靠擾,或轉換成它,它表明一個可執行體,可向它發起invoke調用,它有多是一個本地的實現,也多是一個遠程的實現,也可能一個集羣實現。這裏面getInvoker方法建立了一個匿名Invoker對象,我理解是經過invoke實行遠程調用時,會走wrapper.invokeMethod方法,而wrapper其實是一個代理類,調用wrapper.invokeMethod最終會走proxy,也就是DemoService的sayHello方法。Wrapper建立比較複雜,能夠參考 Dubbo中JavaAssist的Wrapper.getWrapper生成代理分析

// 步驟2分析,調用的是InjvmProtocol的export方法
 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 該方法只是建立了一個,由於暴露到本地,因此在同一個jvm中,因此不須要其餘操做
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
相關文章
相關標籤/搜索