context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"}); // 刪除了一些步驟 public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { try { // 1. 裏面的核心代碼就是初始化了applicationEventMulticaster,用於後面發佈事件使用 // this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); initApplicationEventMulticaster(); // 2. 初始化非延遲加載的bean,這裏就會初始化dubbo配置的一些bean,包括ServiceBean,用於服務導出 finishBeanFactoryInitialization(beanFactory); // 3. 發佈容器刷新事件,這裏面是服務導出的入口 finishRefresh(); } } }
// 步驟2分析 // 這裏Spring容器會初始化非延遲加載的bean,包括<dubbo:service/>表示的bean // <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> finishBeanFactoryInitialization(beanFactory);
// Spring容器初始化<dubbo:service/>表示的ServiceBean時會建立ServiceBean對象,因爲ServiceBean實現了 // ApplicationContextAware接口,因此Spring容器會先調用setApplicationContext給其注入Spring容器 class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware { @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; // 給SpringExtensionFactory注入Spring容器 SpringExtensionFactory.addApplicationContext(applicationContext); if (applicationContext != null) { SPRING_CONTEXT = applicationContext; try { // method是addListener方法,調用該方法用於給applicationEventMulticaster // 添加listener method.invoke(applicationContext, new Object[]{this}); supportedApplicationListener = true; } } } }
// 步驟3分析,發佈相關事件,這裏會發佈容器刷新事件 finishRefresh(); protected void finishRefresh() { initLifecycleProcessor(); getLifecycleProcessor().onRefresh(); // 1). 發佈容器刷新事件,ServiceBean監聽的就是該事件 // ServiceBean implements ApplicationListener<ContextRefreshedEvent> publishEvent(new ContextRefreshedEvent(this)); LiveBeansView.registerApplicationContext(this); } // 2). 步驟1會走到這裏,這裏會獲取以前的applicationEventMulticaster,用於發佈事件 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); // 3). 走到這裏,listener就是以前調用 method.invoke(applicationContext, new Object[]{this}); // 加進去的ServiceBean,this表示ServiceBean,也是listener,event就是容器刷新事件 doInvokeListener(listener, event); // 4) 走到這裏,最終調用ServiceBean實現的onApplicationEvent方法 listener.onApplicationEvent(event);
這樣,就走到了Dubbo暴露服務的入口
的方法.這也是Dubbo官方文檔中說起的入口方法,參考: 服務導出html
public void onApplicationEvent(ContextRefreshedEvent event) { // 若是服務沒有被暴露而且服務沒有被取消暴露,則打印日誌 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 導出服務 export(); } }
接下來研究一下 Dubbo 導出服務的過程。Dubbo 服務導出過程始於Spring容器發佈刷新事件,Dubbo在接收到事件後,會當即執行服務導出邏輯。整個邏輯大體可分爲三個部分,第一部分是前置工做,主要用於檢查參數,組裝 URL。第二部分是導出服務,包含導出服務到本地 (JVM),和導出服務到遠程兩個過程。第三部分是向註冊中心註冊服務,用於服務發現。下面將會對這三個部分代碼進行詳細的分析。spring
服務導出的入口方法是ServiceBean的onApplicationEvent
。onApplicationEvent 是一個事件響應方法,該方法會在收到Spring上下文刷新事件後執行服務導出操做。方法代碼以下apache
public void onApplicationEvent(ContextRefreshedEvent event) { // 若是服務沒有被暴露而且服務沒有被取消暴露,則打印日誌 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 導出服務 export(); } }
這個方法首先會根據條件決定是否導出服務,好比有些服務設置了延時導出,那麼此時就不該該在此處導出。還有一些服務已經被導出了,或者當前服務被取消導出了,此時也不能再次導出相關服務。注意這裏的 isDelay 方法,這個方法字面意思是「是否延遲導出服務」,返回 true 表示延遲導出,false 表示不延遲導出。可是該方法真實意思卻並不是如此,當方法返回 true 時,表示無需延遲導出。返回 false 時,表示須要延遲導出。與字面意思偏偏相反,這個須要你們注意一下。
前置工做主要包含兩個部分,分別是配置檢查,以及 URL 裝配。在導出服務以前,Dubbo 須要檢查用戶的配置是否合理,或者爲用戶補充缺省配置。配置檢查完成後,接下來須要根據這些配置組裝 URL。在 Dubbo 中,URL 的做用十分重要。Dubbo 使用 URL 做爲配置載體,全部的拓展點都是經過 URL 獲取配置。這一點,官方文檔中有所說明。下面的export方法會走到doExport()
方法。segmentfault
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }
如下是配置檢查的相關分析,代碼比較多,須要你們耐心看一下。下面對配置檢查的邏輯進行簡單的總結,以下:
1) 檢測 <dubbo:service> 標籤的 interface 屬性合法性,不合法則拋出異常
2) 檢測 ProviderConfig、ApplicationConfig 等核心配置類對象是否爲空,若爲空,則嘗試從其餘配置類對象中獲取相應的實例。
3) 檢測並處理泛化服務和普通服務類
4) 檢測本地存根配置,並進行相應的處理
5) 對 ApplicationConfig、RegistryConfig 等配置類進行檢測,爲空則嘗試建立,若沒法建立則拋出異常配置檢查並不是本文重點,所以這裏不打算對 doExport 方法所調用的方法進行分析(doExportUrls 方法除外)。在這些方法中,除了appendProperties方法稍微複雜一些,其餘方法邏輯不是很複雜。所以,你們可自行分析。app
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { // 拋異常 } checkDefault();
if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) { registries = provider.getRegistries(); } if (monitor == null) { monitor = provider.getMonitor(); } if (protocols == null) { protocols = provider.getProtocols(); } }
if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } }
// 檢測ref是否爲泛化服務類型 if (ref instanceof GenericService) { // 設置interfaceClass爲GenericService interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { // 設置generic = true generic = Boolean.TRUE.toString(); } } else { try { // 得到接口類型 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 對interfaceClass,以及<dubbo:method>標籤中的必要字段進行檢查 checkInterfaceAndMethods(interfaceClass, methods); // 對ref合法性進行檢測 checkRef(); generic = Boolean.FALSE.toString(); }
// stub local同樣都是配置本地存根 if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } } if (stub != null) { if ("true".equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } }
checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); // 本地存根、mock合法性校驗 checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // 核心代碼,暴露服務、註冊邏輯就在其中 doExportUrls(); ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
Dubbo 容許咱們使用不一樣的協議導出服務,也容許咱們向多個註冊中心註冊服務。Dubbo在doExportUrls方法中對多協議,多註冊中心進行了支持。相關代碼以下dom
/** * 多協議多註冊中心暴露服務進行支持 */ private void doExportUrls() { // 加載註冊中心連接 List<URL> registryURLs = loadRegistries(true); // 遍歷protocols,並在每一個協議下暴露 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
上面代碼首先是經過loadRegistries加載註冊中心連接,而後再遍歷ProtocolConfig集合導出每一個服務。並在導出服務的過程當中,將服務註冊到註冊中心。咱們先來看一下loadRegistries方法的邏輯。先能夠打開看下該方法能夠獲得什麼。jvm
<!-- provider's application name, used for tracing dependency relationship --> <dubbo:application name="demo-provider"/> <!-- use multicast registry center to export service --> <dubbo:registry address="zookeeper://10.101.99.127:2181"/> <!-- use dubbo protocol to export service on port 20880 --> <dubbo:protocol name="dubbo" port="20880"/> <dubbo:provider server="netty4"/> <!-- service implementation, as same as regular local bean --> <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <!-- declare the service interface to be exported --> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
protected List<URL> loadRegistries(boolean provider) { checkRegistry(); List<URL> registryList = new ArrayList<URL>(); // 若是registries爲空,直接返回空集合 if (registries != null && !registries.isEmpty()) { // 遍歷註冊中心配置集合registries for (RegistryConfig config : registries) { // 得到地址 String address = config.getAddress(); // 若地址爲空,則設置爲0.0.0.0 if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; }
// 若是地址爲N/A,則跳過 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 添加ApplicationConfig中的字段信息到map中 appendParameters(map, application); // 添加RegistryConfig字段信息到map中 appendParameters(map, config); // 添加path、協議版本 map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 若是map中沒有protocol,則默認爲使用dubbo協議 if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class). hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } }
// 解析獲得URL列表,address可能包含多個註冊中心ip,所以解析獲得的是一個URL列表 List<URL> urls = UrlUtils.parseURLs(address, map); // 遍歷URL 列表 for (URL url : urls) { // 將URL協議頭設置爲registry url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); // 這裏將協議設置爲了registry,這也是後面調用的是RegistryProtocol的export()方法緣由 url = url.setProtocol(Constants.REGISTRY_PROTOCOL); // 經過判斷條件,決定是否添加url到registryList中,條件以下: // 若是是服務提供者,而且是註冊中心服務或者是消費者端,而且是訂閱服務,則加入到registryList if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
ProtocolConfig主要封裝了<dubbo:protocol name="dubbo" port="20880"/>標籤的信息,意思是使用Dubbo協議暴露服務。
ide
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 獲取協議名 String name = protocolConfig.getName(); // 若是爲空,則是默認的dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); // 設置服務提供者側 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 這段代碼其實完成了子節點配置信息對父節點的覆蓋 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this);
// 若是method的配置列表不爲空 if (methods != null && !methods.isEmpty()) { // 遍歷method配置列表 for (MethodConfig method : methods) { // 把方法名加入map appendParameters(map, method, method.getName()); // 添加 MethodConfig對象的字段信息到map中,鍵=方法名.屬性名 // 好比存儲<dubbo:method name="sayHello" retries="2">對應的MethodConfig, // 鍵=sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 若是retryValue爲false,則不重試,設置值爲0 if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } }
if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } }
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // export service String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }
String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // 暴露到遠程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // 後面分析 } } this.urls.add(url); }
前置工做作完,接下來就能夠進行服務導出了。服務導出分爲導出到本地(JVM)和導出到遠程。ui
// 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } private void exportLocal(URL url) { // 若是協議不是injvm if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 生成本地的url,分別把協議改成injvm,設置host和port URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST).setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 經過代理工程建立invoker // 再調用export方法進行暴露服務,生成Exporter // 這裏的protocol是生成的拓展代理對象,具體可看https://segmentfault.com/a/1190000020384210 // 它是在運行時才根據URL中的protocol參數去決定運行哪一個Protocol實例的export方法,這裏因爲前面 // setProtocol(Constants.LOCAL_PROTOCOL),因此調用的是InjvmProtocol的export方法 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); // 把生成的暴露者加入集合 exporters.add(exporter); } }
下面兩個是url和local的具體值,由於Dubbo採用自適應拓展機制,exportLocal(URL url)中用到的protocol是自適應拓展,protocol的export方法會用到URL中protocol參數從而決定具體生成protocol的哪一個實例,因此URL的protocol值能夠關注下。this
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
下面分析下面這句代碼。它是核心方法,分爲兩步。
Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); 1) proxyFactory.getInvoker(ref, (Class) interfaceClass, local) -> 返回invoker 2) protocol.export(invoker)
// 步驟1)分析 // proxyFactory也是自適應拓展代理帶,它默認使用JavassistProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); // 這裏調用的就是JavassistProxyFactory的getInvoker方法 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 建立Wrapper對象 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 建立匿名Invoker類對象,並實現doInvoke方法 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 調用Wrapper的invokeMethod方法,invokeMethod最終會調用目標方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
在 Dubbo 中,Invoker是一個很是重要的模型
。在服務提供端,以及服務引用端均會出現Invoker。Dubbo 官方文檔中對Invoker進行了說明,這裏引用一下。Invoker是實體域,它是Dubbo的核心模型,其它模型都向它靠擾,或轉換成它,它表明一個可執行體,可向它發起invoke調用
,它有多是一個本地的實現,也多是一個遠程的實現,也可能一個集羣實現。這裏面getInvoker方法建立了一個匿名Invoker對象,我理解是經過invoke實行遠程調用時,會走wrapper.invokeMethod方法,而wrapper其實是一個代理類,調用wrapper.invokeMethod最終會走proxy,也就是DemoService的sayHello方法。Wrapper建立比較複雜,能夠參考 Dubbo中JavaAssist的Wrapper.getWrapper生成代理分析。
// 步驟2分析,調用的是InjvmProtocol的export方法 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 該方法只是建立了一個,由於暴露到本地,因此在同一個jvm中,因此不須要其餘操做 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }