本篇文章,咱們來研究一下 Dubbo 導出服務的過程。Dubbo 服務導出過程始於 Spring 容器發佈刷新事件,Dubbo 在接收到事件後,會當即執行服務導出邏輯。整個邏輯大體可分爲三個部分,第一是前置工做,主要用於檢查參數,組裝 URL。第二是導出服務,包含導出服務到本地 (JVM),和導出服務到遠程兩個過程。第三是向註冊中心註冊服務,用於服務發現。本篇文章將會對這三個部分代碼進行詳細的分析,在分析以前,咱們先來了解一下服務的導出過程。java
Dubbo 支持兩種服務導出方式,分別延遲導出和當即導出。延遲導出的入口是 ServiceBean 的 afterPropertiesSet 方法,當即導出的入口是 ServiceBean 的 onApplicationEvent 方法。本文打算分析服務延遲導出過程,所以不會分析 afterPropertiesSet 方法。下面從 onApplicationEvent 方法提及,該方法收到 Spring 容器的刷新事件後,會調用 export 方法執行服務導出操做。服務導出以前,要進行對一系列的配置進行檢查,以及生成 URL。準備工做作完,隨後開始導出服務。首先導出到本地,而後再導出到遠程。導出到本地就是將服務導出到 JVM 中,此過程比較簡單。導出到遠程的過程則要複雜的多,以 dubbo 協議爲例,DubboProtocol 類的 export 方法將會被調用。該方法主要用於建立 Exporter 和 ExchangeServer。ExchangeServer 自己並不具有通訊能力,須要藉助更底層的 Server 實現通訊功能。所以,在建立 ExchangeServer 實例時,須要先建立 NettyServer 或者 MinaServer 實例,並將實例做爲參數傳給 ExchangeServer 實現類的構造方法。ExchangeServer 實例建立完成後,導出服務到遠程的過程也就接近尾聲了。服務導出結束後,服務消費者便可經過直聯的方式消費服務。固然,通常咱們不會使用直聯的方式消費服務。因此,在服務導出結束後,緊接着要作的事情是向註冊中心註冊服務。此時,客戶端便可從註冊中心發現服務。git
以上就是 Dubbo 服務導出的過程,比較複雜。下面開始分析源碼,從源碼的角度展示整個過程。github
一場 Dubbo 源碼分析的馬拉松比賽即將開始,如今咱們站在賽道的起點進行熱身準備。本次比賽的起點位置位於 ServiceBean 的 onApplicationEvent 方法處。好了,發令槍響了,我將和一些朋友從 onApplicationEvent 方法處出發,探索 Dubbo 服務導出的全過程。下面咱們來看一下 onApplicationEvent 方法的源碼。正則表達式
public void onApplicationEvent(ContextRefreshedEvent event) { // 是否有延遲導出 && 是否已導出 && 是否是已被取消導出 if (isDelay() && !isExported() && !isUnexported()) { // 導出服務 export(); } }
onApplicationEvent 是一個事件響應方法,該方法會在收到 Spring 上下文刷新事件後執行。這個方法首先會根據條件決定是否導出服務,好比有些服務設置了延時導出,那麼此時就不該該在此處導出。還有一些服務已經被導出了,或者當前服務被取消導出了,此時也不能再次導出相關服務。注意這裏的 isDelay 方法,這個方法字面意思是「是否延遲導出服務」,返回 true 表示延遲導出,false 表示不延遲導出。可是該方法真實意思卻並不是如此,當方法返回 true 時,表示無需延遲導出。返回 false 時,表示須要延遲導出。與字面意思偏偏相反,讓人以爲很奇怪。下面咱們來看一下這個方法的邏輯。apache
// -☆- ServiceBean private boolean isDelay() { // 獲取 delay Integer delay = getDelay(); ProviderConfig provider = getProvider(); if (delay == null && provider != null) { // 若是前面獲取的 delay 爲空,這裏繼續獲取 delay = provider.getDelay(); } // 判斷 delay 是否爲空,或者等於 -1 return supportedApplicationListener && (delay == null || delay == -1); }
暫時忽略 supportedApplicationListener 這個條件,當 delay 爲空,或者等於-1時,該方法返回 true,而不是 false。這個方法的返回值讓人有點困惑,所以我重構了該方法的代碼,並給 Dubbo 提了一個 Pull Request,最終這個 PR 被合到了 Dubbo 主分支中。詳細請參見 Dubbo #2686。bootstrap
如今解釋一下 supportedApplicationListener 變量含義,該變量用於表示當前的 Spring 容器是否支持 ApplicationListener,這個值初始爲 false。在 Spring 容器將本身設置到 ServiceBean 中時,ServiceBean 的 setApplicationContext 方法會檢測 Spring 容器是否支持 ApplicationListener。若支持,則將 supportedApplicationListener 置爲 true。代碼就不分析了,你們自行查閱瞭解。數組
ServiceBean 是 Dubbo 與 Spring 框架進行整合的關鍵,能夠看作是兩個框架之間的橋樑。具備一樣做用的類還有 ReferenceBean。ServiceBean 實現了 Spring 的一些拓展接口,有 FactoryBean、ApplicationContextAware、ApplicationListener、DisposableBean 和 BeanNameAware。這些接口我在 Spring 源碼分析系列文章中介紹過,你們能夠參考一下,這裏就不贅述了。緩存
如今咱們知道了 Dubbo 服務導出過程的起點。那麼接下來,咱們馬不停蹄,繼續進行比賽。賽程預告,下一站是「服務導出的前置工做」。服務器
前置工做主要包含兩個部分,分別是配置檢查,以及 URL 裝配。在導出服務以前,Dubbo 須要檢查用戶的配置是否合理,或者爲用戶補充缺省配置。配置檢查完成後,接下來須要根據這些配置組裝 URL。在 Dubbo 中,URL 的做用十分重要。Dubbo 使用 URL 做爲配置載體,全部的拓展點都是經過 URL 獲取配置。這一點,官方文檔中有所說明。app
採用 URL 做爲配置信息的統一格式,全部擴展點都經過傳遞 URL 攜帶配置信息。
接下來,咱們先來分析配置檢查部分的源碼,隨後再來分析 URL 組裝部分的源碼。
本節咱們接着前面的源碼向下分析,前面說過 onApplicationEvent 方法在通過一些判斷後,會決定是否調用 export 方法導出服務。那麼下面咱們從 export 方法開始進行分析,以下:
public synchronized void export() { if (provider != null) { // 獲取 export 和 delay 配置 if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } // 若是 export 爲 false,則不導出服務 if (export != null && !export) { return; } if (delay != null && delay > 0) { // delay > 0,延時導出服務 delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { // 當即導出服務 doExport(); } }
export 對兩個配置進行了檢查,並配置執行相應的動做。首先是 export,這個配置決定了是否導出服務。有時候咱們只是想本地啓動服務進行一些調試工做,這個時候咱們並不但願把本地啓動的服務暴露出去給別人調用。此時,咱們就能夠經過配置 export 禁止服務導出,好比:
<dubbo:provider export="false" />
delay 見名知意了,用於延遲導出服務。下面,咱們繼續分析源碼,此次要分析的是 doExport 方法。
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; // 檢測 interfaceName 是否合法 if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("interface not allow null!"); } // 檢測 provider 是否爲空,爲空則新建一個,並經過系統變量爲其初始化 checkDefault(); // 下面幾個 if 語句用於檢測 provider、application 等核心配置類對象是否爲空, // 若爲空,則嘗試從其餘配置類對象中獲取相應的實例。 if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) {...} if (monitor == null) {...} if (protocols == null) {...} } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) {...} } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) {...} } // 檢測 ref 是否泛化服務類型 if (ref instanceof GenericService) { // 設置 interfaceClass 爲 GenericService.class interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { // 設置 generic = "true" generic = Boolean.TRUE.toString(); } } else { // ref 非 GenericService 類型 try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 對 interfaceClass,以及 <dubbo:method> 必要字段進行檢查 checkInterfaceAndMethods(interfaceClass, methods); // 對 ref 合法性進行檢測 checkRef(); // 設置 generic = "false" generic = Boolean.FALSE.toString(); } // local 屬性 Dubbo 官方文檔中沒有說明,不過 local 和 stub 在功能應該是一致的,用於配置本地存根 if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { // 獲取本地存根類 localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 檢測本地存根類是否可賦值給接口類,若不可賦值則會拋出異常,提醒使用者本地存根類類型不合法 if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName); } } // stub 和 local 均用於配置本地存根 if (stub != null) { // 此處的代碼和上一個 if 分支的代碼基本一致,這裏省略了 } // 檢測各類對象是否爲空,爲空則新建,或者拋出異常 checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // 導出服務 doExportUrls(); // ProviderModel 表示服務提供者模型,此對象中存儲了和服務提供者相關的信息。 // 好比服務的配置信息,服務實例等。每一個被導出的服務對應一個 ProviderModel。 // ApplicationModel 持有全部的 ProviderModel。 ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
以上就是配置檢查的相關分析,代碼比較多,須要你們耐心看一下。下面對配置檢查的邏輯進行簡單的總結,以下:
配置檢查並不是本文重點,所以我不打算對 doExport 方法所調用的方法進行分析(doExportUrls 方法除外)。在這些方法中,除了 appendProperties 方法稍微複雜一些,其餘方法都還好。所以,你們可自行進行分析。好了,其餘的就很少說了,繼續向下分析。
Dubbo 容許咱們使用不一樣的協議導出服務,也容許咱們向多個註冊中心註冊服務。Dubbo 在 doExportUrls 方法中對多協議,多註冊中心進行了支持。相關代碼以下:
private void doExportUrls() { // 加載註冊中心連接 List<URL> registryURLs = loadRegistries(true); // 遍歷 protocols,導出每一個服務 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
上面代碼比較簡單,首先是經過 loadRegistries 加載註冊中心連接,而後再遍歷 ProtocolConfig 集合導出每一個服務。並在導出服務的過程當中,將服務註冊到註冊中心處。下面,咱們先來看一下 loadRegistries 方法的邏輯。
protected List<URL> loadRegistries(boolean provider) { // 檢測是否存在註冊中心配置類,不存在則拋出異常 checkRegistry(); List<URL> registryList = new ArrayList<URL>(); if (registries != null && !registries.isEmpty()) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (address == null || address.length() == 0) { // 若 address 爲空,則將其設爲 0.0.0.0 address = Constants.ANYHOST_VALUE; } // 從系統屬性中加載註冊中心地址 String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } // 判斷 address 是否合法 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 添加 ApplicationConfig 中的字段信息到 map 中 appendParameters(map, application); // 添加 RegistryConfig 字段信息到 map 中 appendParameters(map, config); map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } // 解析獲得 URL 列表,address 可能包含多個註冊中心 ip, // 所以解析獲得的是一個 URL 列表 List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); // 將 URL 協議頭設置爲 registry url = url.setProtocol(Constants.REGISTRY_PROTOCOL); // 經過判斷條件,決定是否添加 url 到 registryList 中,條件以下: // (服務提供者 && register = true 或 null) // || (非服務提供者 && subscribe = true 或 null) if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
上面代碼不是很複雜,包含以下邏輯:
關於多協議多註冊中心導出服務就先分析到這,代碼不是不少,就不過多敘述了。接下來分析 URL 組裝過程。
配置檢查完畢後,緊接着要作的事情是根據配置,以及其餘一些信息組裝 URL。前面說過,URL 是 Dubbo 配置的載體,經過 URL 可以讓 Dubbo 的各類配置在各個模塊之間傳遞。URL 之於 Dubbo,猶如水之於魚,很是重要。你們在閱讀 Dubbo 服務導出相關源碼的過程當中,要注意 URL 內容的變化。既然 URL 如此重要,那麼下面咱們來了解一下 URL 組裝的過程。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); // 若是協議名爲空,或空串,則將協議名變量設置爲 dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); // 添加 side、版本、時間戳以及進程號等信息到 map 中 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 經過反射將對象的字段信息到 map 中 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); // methods 爲 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標籤的配置信息 if (methods != null && !methods.isEmpty()) { // 這段代碼用於添加 Callback 配置到 map 中,代碼太長,待會單獨分析 } // 檢測 generic 是否爲 "true",並根據檢測結果向 map 中添加不一樣的信息 if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } // 爲接口生成包裹類 Wrapper,Wrapper 中包含了接口的詳細信息,好比接口方法名數組,字段信息等 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 添加方法名到 map 中,若是包含多個方法名,則用逗號隔開,好比 method = init,destroy if (methods.length == 0) { logger.warn("NO method found in service interface ..."); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { // 將逗號做爲分隔符鏈接方法名,並將鏈接後的字符串放入 map 中 map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // 添加 token 到 map 中 if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } // 判斷協議名是否爲 injvm if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // 獲取上下文路徑 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } // 獲取 host 和 port String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // 組裝 URL URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); // 省略無關代碼 }
上面的代碼首先是將一些信息,好比版本、時間戳、方法名以及各類配置對象的字段信息放入到 map 中,map 中的內容將做爲 URL 的查詢字符串。構建好 map 後,緊接着是獲取上下文路徑、主機名以及端口號等信息。最後將 map 和主機名等數據傳給 URL 構造方法建立 URL 對象。須要注意的是,這裏出現的 URL 並不是 java.net.URL,而是 com.alibaba.dubbo.common.URL。
上面省略了一段代碼,這裏簡單分析一下。這段代碼用於檢測 <dubbo:argument> 標籤中的配置信息,並將相關配置添加到 map 中。代碼以下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // ... // methods 爲 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標籤的配置信息 if (methods != null && !methods.isEmpty()) { for (MethodConfig method : methods) { // 添加 MethodConfig 對象的字段信息到 map 中,鍵 = 方法名.屬性名。 // 好比存儲 <dubbo:method name="sayHello" retries="2"> 對應的 MethodConfig, // 鍵 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 檢測 MethodConfig retry 是否爲 false,如果,則設置重試次數爲0 if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } // 獲取 ArgumentConfig 列表 List<ArgumentConfig> arguments = method.getArguments(); if (arguments != null && !arguments.isEmpty()) { for (ArgumentConfig argument : arguments) { // 檢測 type 屬性是否爲空,或者空串(分支1 ⭐️) if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // 比對方法名,查找目標方法 if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); if (argument.getIndex() != -1) { // 檢測 ArgumentConfig 中的 type 屬性與方法參數列表 // 中的參數名稱是否一致,不一致則拋出異常(分支2 ⭐️) if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { // 添加 ArgumentConfig 字段信息到 map 中, // 鍵前綴 = 方法名.index,好比: // map = {"sayHello.3": true} appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("argument config error: ..."); } } else { // 分支3 ⭐️ for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; // 從參數類型列表中查找類型名稱爲 argument.type 的參數 if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("argument config error: ..."); } } } } } } } // 用戶未配置 type 屬性,但配置了 index 屬性,且 index != -1 } else if (argument.getIndex() != -1) { // 分支4 ⭐️ // 添加 ArgumentConfig 字段信息到 map 中 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("argument config must set index or type"); } } } } } // ... }
上面這段代碼 for 循環和 if else 分支嵌套太多,致使層次太深,不利於閱讀,須要耐心看一下。你們在看這段代碼時,注意把幾個重要的條件分支找出來。只要理解了這幾個分支的意圖,就能夠弄懂這段代碼。我在上面代碼中用⭐️符號標識出了4個重要的分支,下面用僞代碼解釋一下這幾個分支的含義。
// 獲取 ArgumentConfig 列表 for (遍歷 ArgumentConfig 列表) { if (type 不爲 null,也不爲空串) { // 分支1 1. 經過反射獲取 interfaceClass 的方法列表 for (遍歷方法列表) { 1. 比對方法名,查找目標方法 2. 經過反射獲取目標方法的參數類型數組 argtypes if (index != -1) { // 分支2 1. 從 argtypes 數組中獲取下標 index 處的元素 argType 2. 檢測 argType 的名稱與 ArgumentConfig 中的 type 屬性是否一致 3. 添加 ArgumentConfig 字段信息到 map 中,或拋出異常 } else { // 分支3 1. 遍歷參數類型數組 argtypes,查找 argument.type 類型的參數 2. 添加 ArgumentConfig 字段信息到 map 中 } } } else if (index != -1) { // 分支4 1. 添加 ArgumentConfig 字段信息到 map 中 } }
在本節分析的源碼中,appendParameters 這個方法出現的次數比較多,該方法用於將對象字段信息添加到 map 中。實現上則是經過反射獲取目標對象的 getter 方法,並調用該方法獲取屬性值。而後再經過 getter 方法名解析出屬性名,好比從方法名 getName 中可解析出屬性 name。若是用戶傳入了屬性名前綴,此時須要將屬性名加入前綴內容。最後將 <屬性名,屬性值> 鍵值對存入到 map 中就好了。限於篇幅緣由,這裏就不分析 appendParameters 方法的源碼了,你們請自行分析。
前置工做作完,接下來就能夠進行服務導出工做。服務導出,分爲導出到本地 (JVM),和導出到遠程。在深刻分析服務導出源碼前,咱們先來從宏觀層面上看一下服務導出邏輯。以下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 省略無關代碼 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { // 加載 ConfiguratorFactory,並生成 Configurator 配置 url url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); // 若是 scope = none,則什麼都不作 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // scope != remote,導出到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // scope != local,導出到遠程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // 加載監視器連接 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { // 將監視器連接做爲參數添加到 url 中 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } // 爲服務提供類(ref)生成 Invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // DelegateProviderMetaDataInvoker 僅用於持有 Invoker 和 ServiceConfig DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 導出服務,並生成 Exporter Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { // 不存在註冊中心,僅導出服務 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } } this.urls.add(url); }
上面代碼根據 url 中的 scope 參數決定服務導出方式,分別以下:
無論是導出到本地,仍是遠程。進行服務導出以前,均須要先建立 Invoker。這是一個很重要的步驟,所以接下來我會先分析 Invoker 的建立過程。
在 Dubbo 中,Invoker 是一個很是重要的模型。在服務提供端,以及服務引用端均會出現 Invoker。Dubbo 官方文檔中對 Invoker 進行了說明,這裏引用一下。
Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它表明一個可執行體,可向它發起 invoke 調用,它有多是一個本地的實現,也多是一個遠程的實現,也可能一個集羣實現。
既然 Invoker 如此重要,那麼咱們頗有必要搞清楚 Invoker 的用途。Invoker 是由 ProxyFactory 建立而來,Dubbo 默認的 ProxyFactory 實現類是 JavassistProxyFactory。下面咱們到 JavassistProxyFactory 代碼中,探索 Invoker 的建立過程。以下:
-- JavassistProxyFactory public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 爲目標類建立 Wrapper final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 建立匿名 Invoker 類對象,並實現 doInvoke 方法。 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 調用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調用目標方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
如上,JavassistProxyFactory 建立了一個繼承自 AbstractProxyInvoker 類的匿名對象,並覆寫了抽象方法 doInvoke。覆寫後的 doInvoke 邏輯比較簡單,僅是將調用請求轉發給了 Wrapper 類的 invokeMethod 方法。Wrapper 用於「包裹」目標類,Wrapper 是一個抽象類,僅可經過 getWrapper(Class) 方法建立子類。在建立 Wrapper 子類的過程當中,子類代碼生成邏輯會對 getWrapper 方法傳入的 Class 對象進行解析,拿到諸如類方法,類成員變量等信息。以及生成 invokeMethod 方法代碼,和其餘一些方法代碼。代碼生成完畢後,經過 Javassist 生成 Class 對象,最後再經過反射建立 Wrapper 實例。相關的代碼以下:
public static Wrapper getWrapper(Class<?> c) { while (ClassGenerator.isDynamicClass(c)) c = c.getSuperclass(); if (c == Object.class) return OBJECT_WRAPPER; // 訪存 Wrapper ret = WRAPPER_MAP.get(c); if (ret == null) { // 緩存未命中,建立 Wrapper ret = makeWrapper(c); // 寫入緩存 WRAPPER_MAP.put(c, ret); } return ret; }
getWrapper 方法只是包含了一些緩存操做邏輯,非重點。下面咱們重點關注 makeWrapper 方法。
private static Wrapper makeWrapper(Class<?> c) { // 檢測 c 是否爲私有類型,如果則拋出異常 if (c.isPrimitive()) throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c); String name = c.getName(); ClassLoader cl = ClassHelper.getClassLoader(c); // c1 用於存儲 setPropertyValue 方法代碼 StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ "); // c2 用於存儲 getPropertyValue 方法代碼 StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ "); // c3 用於存儲 invokeMethod 方法代碼 StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ "); // 生成類型轉換代碼及異常捕捉代碼,好比: // DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); } c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); // pts 用於存儲成員變量名和類型 Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); // ms 用於存儲方法描述信息(可理解爲方法簽名)及 Method 實例 Map<String, Method> ms = new LinkedHashMap<String, Method>(); // mns 爲方法名列表 List<String> mns = new ArrayList<String>(); // dmns 用於存儲定義在當前類中的方法的名稱 List<String> dmns = new ArrayList<String>(); // --------------------------------✨ 分割線1 ✨------------------------------------- // 獲取 public 訪問級別的字段,併爲全部字段生成條件判斷語句 for (Field f : c.getFields()) { String fn = f.getName(); Class<?> ft = f.getType(); if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) // 忽略關鍵字 static 或 transient 修飾的變量 continue; // 生成條件判斷及賦值語句,好比: // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;} // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;} c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }"); // 生成條件判斷及返回語句,好比: // if( $2.equals("name") ) { return ($w)w.name; } c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }"); // 存儲 <字段名, 字段類型> 鍵值對到 pts 中 pts.put(fn, ft); } // --------------------------------✨ 分割線2 ✨------------------------------------- Method[] methods = c.getMethods(); // 檢測 c 中是否包含在當前類中聲明的方法 boolean hasMethod = hasMethods(methods); if (hasMethod) { c3.append(" try{"); } for (Method m : methods) { if (m.getDeclaringClass() == Object.class) // 忽略 Object 中定義的方法 continue; String mn = m.getName(); // 生成方法名判斷語句,示例以下: // if ( "sayHello".equals( $2 ) c3.append(" if( \"").append(mn).append("\".equals( $2 ) "); int len = m.getParameterTypes().length; // 生成運行時傳入參數的數量與方法的參數列表長度判斷語句,示例以下: // && $3.length == 2 c3.append(" && ").append(" $3.length == ").append(len); boolean override = false; for (Method m2 : methods) { // 檢測方法是否存在重載狀況,條件爲:方法對象不一樣 && 方法名相同 if (m != m2 && m.getName().equals(m2.getName())) { override = true; break; } } // 對重載方法進行處理,考慮下面的方法: // 1. void sayHello(Integer, String) // 2. void sayHello(Integer, Integer) // 方法名相同,參數列表長度也相同,所以不能僅經過這兩項判斷兩個方法是否相等。 // 須要進一步判斷方法的參數類型 if (override) { if (len > 0) { for (int l = 0; l < len; l++) { // && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String") c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"") .append(m.getParameterTypes()[l].getName()).append("\")"); } } } // 添加 ) {,完成方法判斷語句,此時生成的方法可能以下(已格式化): // if ("sayHello".equals($2) // && $3.length == 2 // && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")) { c3.append(" ) { "); // 根據返回值類型生成目標方法調用語句 if (m.getReturnType() == Void.TYPE) // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null; c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;"); else // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");"); // 添加 }, 當前」方法判斷條件「代碼生成完畢,示例代碼以下(已格式化): // if ("sayHello".equals($2) // && $3.length == 2 // && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")) { // // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); // return null; // } c3.append(" }"); // 添加方法名到 mns 集合中 mns.add(mn); // 檢測當前方法是否在 c 中被聲明的 if (m.getDeclaringClass() == c) // 如果,則將當前方法名添加到 dmns 中 dmns.add(mn); ms.put(ReflectUtils.getDesc(m), m); } if (hasMethod) { // 添加異常捕捉語句 c3.append(" } catch(Throwable e) { "); c3.append(" throw new java.lang.reflect.InvocationTargetException(e); "); c3.append(" }"); } // 添加 NoSuchMethodException 異常拋出代碼 c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }"); // --------------------------------✨ 分割線3 ✨------------------------------------- Matcher matcher; // 處理 get/set 方法 for (Map.Entry<String, Method> entry : ms.entrySet()) { String md = entry.getKey(); Method method = (Method) entry.getValue(); // 匹配以 get 開頭的方法 if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { // 獲取屬性名 String pn = propertyName(matcher.group(1)); // 生成屬性判斷以及返回語句,示例以下: // if( $2.equals("name") ) { return ($w).w.getName(); } c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }"); pts.put(pn, method.getReturnType()); // 匹配以 is/has/can 開頭的方法 } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) { String pn = propertyName(matcher.group(1)); // 生成屬性判斷以及返回語句,示例以下: // if( $2.equals("dream") ) { return ($w).w.hasDream(); } c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }"); pts.put(pn, method.getReturnType()); // 匹配以 set 開頭的方法 } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { Class<?> pt = method.getParameterTypes()[0]; String pn = propertyName(matcher.group(1)); // 生成屬性判斷以及 setter 調用語句,示例以下: // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; } c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }"); pts.put(pn, pt); } } // 添加 NoSuchPropertyException 異常拋出代碼 c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }"); c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }"); // --------------------------------✨ 分割線4 ✨------------------------------------- long id = WRAPPER_CLASS_COUNTER.getAndIncrement(); // 建立類生成器 ClassGenerator cc = ClassGenerator.newInstance(cl); // 設置類名及超類 cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id); cc.setSuperClass(Wrapper.class); // 添加默認構造方法 cc.addDefaultConstructor(); // 添加字段 cc.addField("public static String[] pns;"); cc.addField("public static " + Map.class.getName() + " pts;"); cc.addField("public static String[] mns;"); cc.addField("public static String[] dmns;"); for (int i = 0, len = ms.size(); i < len; i++) cc.addField("public static Class[] mts" + i + ";"); // 添加方法代碼 cc.addMethod("public String[] getPropertyNames(){ return pns; }"); cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }"); cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }"); cc.addMethod("public String[] getMethodNames(){ return mns; }"); cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }"); cc.addMethod(c1.toString()); cc.addMethod(c2.toString()); cc.addMethod(c3.toString()); try { // 生成類 Class<?> wc = cc.toClass(); // 設置字段值 wc.getField("pts").set(null, pts); wc.getField("pns").set(null, pts.keySet().toArray(new String[0])); wc.getField("mns").set(null, mns.toArray(new String[0])); wc.getField("dmns").set(null, dmns.toArray(new String[0])); int ix = 0; for (Method m : ms.values()) wc.getField("mts" + ix++).set(null, m.getParameterTypes()); // 建立 Wrapper 實例 return (Wrapper) wc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Throwable e) { throw new RuntimeException(e.getMessage(), e); } finally { cc.release(); ms.clear(); mns.clear(); dmns.clear(); } }
上面代碼很長,你們耐心看一下。我在上面代碼中作了大量的註釋,並按功能對代碼進行了分塊,以幫助你們理解代碼邏輯。下面對這段代碼進行講解。首先咱們把目光移到分割線1之上的代碼,這段代碼主要用於進行一些初始化操做。好比建立 c一、c二、c3 以及 pts、ms、mns 等變量,以及向 c一、c二、c3 中添加方法定義和類型類型轉換代碼。接下來是分割線1到分割線2之間的代碼,這段代碼用於爲 public 級別的字段生成條件判斷取值與賦值代碼。這段代碼不是很難看懂,就很少說了。繼續向下看,分割線2和分隔線3之間的代碼用於爲定義在當前類中的方法生成判斷語句,和方法調用語句。由於須要對方法重載進行校驗,所以到這這段代碼看起來有點複雜。不過耐心開一下,也不是很難理解。接下來是分割線3和分隔線4之間的代碼,這段代碼用於處理 getter、setter 以及以 is/has/can 開頭的方法。處理方式是經過正則表達式獲取方法類型(get/set/is/...),以及屬性名。以後爲屬性名生成判斷語句,而後爲方法生成調用語句。最後咱們再來看一下分隔線4如下的代碼,這段代碼經過 ClassGenerator 爲剛剛生成的代碼構建 Class 類,並經過反射建立對象。ClassGenerator 是 Dubbo 本身封裝的,該類的核心是 toClass() 的重載方法 toClass(ClassLoader, ProtectionDomain),該方法經過 javassist 構建 Class。這裏就不分析 toClass 方法了,你們請自行分析。
閱讀 Wrapper 類代碼須要對 javassist 框架有所瞭解。關於 javassist,你們若是不熟悉,請自行查閱資料,本節不打算介紹 javassist 相關內容。
好了,關於 Wrapper 類生成過程就分析到這。若是你們看的不是很明白,能夠單獨爲 Wrapper 建立單元測試,而後單步調試。並將生成的代碼拷貝出來,格式化後再進行觀察和理解。好了,本節先到這。
本節咱們來看一下服務導出相關的代碼,按照代碼執行順序,本節先來分析導出服務到本地的過程。相關代碼以下:
private void exportLocal(URL url) { // 若是 URL 的協議頭等於 injvm,說明已經導出到本地了,無需再次導出 if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) // 設置協議頭爲 injvm .setHost(LOCALHOST) .setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 建立 Invoker,並導出服務,這裏的 protocol 會在運行時調用 InjvmProtocol 的 export 方法 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); } }
exportLocal 方法比較簡單,首先根據 URL 協議頭決定是否導出服務。若需導出,則建立一個新的 URL 並將協議頭、主機名以及端口設置成新的值。而後建立 Invoker,並調用 InjvmProtocol 的 export 方法導出服務。下面咱們來看一下 InjvmProtocol 的 export 方法都作了哪些事情。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 建立 InjvmExporter return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
如上,InjvmProtocol 的 export 方法僅建立了一個 InjvmExporter,無其餘邏輯。到此導出服務到本地就分析完了,接下來,咱們繼續分析導出服務到遠程的過程。
與導出服務到本地相比,導出服務到遠程的過程要複雜很多,其包含了服務導出與服務註冊兩個過程。這兩個過程涉及到了大量的調用,所以比較複雜。不過無論再難,咱們都要看一下,萬一看懂了呢。按照代碼執行順序,本節先來分析服務導出邏輯,服務註冊邏輯將在下一節進行分析。下面開始分析,咱們把目光移動到 RegistryProtocol 的 export 方法上。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 導出服務 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 獲取註冊中心 URL,以 zookeeper 註冊中心爲例,獲得的示例 URL 以下: // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider URL registryUrl = getRegistryUrl(originInvoker); // 根據 URL 加載 Registry 實現類,好比 ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // 獲取已註冊的服務提供者 URL,好比: // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // 獲取 register 參數 boolean register = registeredProviderUrl.getParameter("register", true); // 向服務提供者與消費者註冊表中註冊服務提供者 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 根據 register 的值決定是否註冊服務 if (register) { // 向註冊中心註冊服務 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // 獲取訂閱 URL,好比: // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // 建立監聽器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 向註冊中心進行訂閱 override 數據 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 建立並返回 DestroyableExporter return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
上面代碼看起來比較複雜,主要作以下一些操做:
在以上操做中,除了建立並返回 DestroyableExporter 沒啥難度外,其餘幾步操做都不是很簡單。這其中,導出服務和註冊服務是本章要重點分析的邏輯。 訂閱 override 數據這個是非重點內容,後面會簡單介紹一下。下面開始本節的分析,先來分析 doLocalExport 方法的邏輯,以下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); // 訪問緩存 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { // 建立 Invoker 爲委託類對象 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 調用 protocol 的 export 方法導出服務 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // 寫緩存 bounds.put(key, exporter); } } } return exporter; }
上面的代碼是典型的雙重檢查,這個你們應該都知道。接下來,咱們把重點放在 Protocol 的 export 方法上。假設運行時協議爲 dubbo,此處的 protocol 會在運行時加載 DubboProtocol,並調用 DubboProtocol 的 export 方法。咱們目光轉移到 DubboProtocol 的 export 方法上,相關分析以下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // 獲取服務標識,理解成服務座標也行。由服務組名,服務名,服務版本號以及端口組成。好比: // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880 String key = serviceKey(url); // 建立 DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 將 <key, exporter> 鍵值對放入緩存中 exporterMap.put(key, exporter); // 如下代碼應該和本地存根有關,代碼不難看懂,但具體用途暫時不清楚,先忽略 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { // 省略日誌打印代碼 } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 啓動服務器 openServer(url); // 優化序列化 optimizeSerialization(url); return exporter; }
如上,咱們重點關注 DubboExporter 的建立以及 openServer 方法,其餘邏輯看不懂也不要緊,不影響理解服務導出過程。另外,DubboExporter 的代碼比較簡單,就不分析了。下面分析 openServer 方法。
private void openServer(URL url) { // 獲取 host:port,並將其做爲服務器實例的 key,用於標識當前的服務器實例 String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { // 訪問緩存 ExchangeServer server = serverMap.get(key); if (server == null) { // 建立服務器實例 serverMap.put(key, createServer(url)); } else { // 服務器已建立,則根據 url 中的配置重置服務器 server.reset(url); } } }
如上,在同一臺機器上(單網卡),同一個端口上僅容許啓動一個服務器實例。若某個端口上已有服務器實例,此時則調用 reset 方法重置服務器的一些配置。考慮到篇幅問題,關於服務器實例重置的代碼就不分析了。接下來分析服務器實例的建立過程。以下:
private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, // 添加心跳檢測配置到 url 中 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 獲取 server 參數,默認爲 netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 經過 SPI 檢測是否存在 server 參數所表明的 Transporter 拓展,不存在則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 添加編碼解碼器參數 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 建立 ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server..."); } // 獲取 client 參數,可指定 netty,mina str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { // 獲取全部的 Transporter 實現類名稱集合,好比 supportedTypes = [netty, mina] Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // 檢測當前 Dubbo 所支持的 Transporter 實現類名稱列表中, // 是否包含 client 所表示的 Transporter,若不包含,則拋出異常 if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type..."); } } return server; }
如上,createServer 包含三個核心的操做。第一是檢測是否存在 server 參數所表明的 Transporter 拓展,不存在則拋出異常。第二是建立服務器實例。第三是檢測是否支持 client 參數所表示的 Transporter 拓展,不存在也是拋出異常。兩次檢測操做所對應的代碼比較直白了,無需多說。但建立服務器的操做目前還不是很清晰,咱們繼續往下看。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 獲取 Exchanger,默認爲 HeaderExchanger。 // 緊接着調用 HeaderExchanger 的 bind 方法建立 ExchangeServer 實例 return getExchanger(url).bind(url, handler); }
上面代碼比較簡單,就很少說了。下面看一下 HeaderExchanger 的 bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 建立 HeaderExchangeServer 實例,該方法包含了多步操做,本別以下: // 1. new HeaderExchangeHandler(handler) // 2. new DecodeHandler(new HeaderExchangeHandler(handler)) // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前咱們僅需關心 Transporters 的 bind 方法邏輯便可。該方法的代碼以下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { // 若是 handlers 元素數量大於1,則建立 ChannelHandler 分發器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取自適應 Transporter 實例,並調用實例方法 return getTransporter().bind(url, handler); }
如上,getTransporter() 方法獲取的 Transporter 是在運行時動態建立的,類名爲 Transporter$Adaptive,也就是自適應拓展類。我在上一篇文章中詳細分析了自適應拓展類的生成過程,對自適應拓展類不瞭解的同窗能夠參考我以前的文章,這裏再也不贅述。Transporter$Adaptive 會在運行時根據傳入的 URL 參數決定加載什麼類型的 Transporter,默認爲 NettyTransporter。下面咱們繼續跟下去,此次分析的是 NettyTransporter 的 bind 方法。
public Server bind(URL url, ChannelHandler listener) throws RemotingException { // 建立 NettyServer return new NettyServer(url, listener); }
這裏僅有一句建立 NettyServer 的代碼,沒啥好講的,咱們繼續向下看。
public class NettyServer extends AbstractServer implements Server { public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // 調用父類構造方法 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } } public abstract class AbstractServer extends AbstractEndpoint implements Server { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { // 調用父類構造方法,這裏就不用跟進去了,沒什麼複雜邏輯 super(url, handler); localAddress = getUrl().toInetSocketAddress(); // 獲取 ip 和端口 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { // 設置 ip 爲 0.0.0.0 bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); // 獲取最大可接受鏈接數 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { // 調用模板方法 doOpen 啓動服務器 doOpen(); } catch (Throwable t) { throw new RemotingException("Failed to bind "); } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); } protected abstract void doOpen() throws Throwable; protected abstract void doClose() throws Throwable; }
上面多數代碼爲賦值代碼,不須要多講。咱們重點關注 doOpen 抽象方法,該方法須要子類實現。下面回到 NettyServer 中。
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); // 建立 boss 和 worker 線程池 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); // 建立 ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setOption("child.tcpNoDelay", true); // 設置 PipelineFactory bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // 綁定到指定的 ip 和端口上 channel = bootstrap.bind(getBindAddress()); }
以上就是 NettyServer 建立的過程,dubbo 默認使用的 NettyServer 是基於 netty 3.x 版本實現的,比較老了。所以 Dubbo 中另外提供了 netty 4.x 版本的 NettyServer,你們可在使用 Dubbo 的過程當中按需進行配置。
到此,關於服務導出的過程就分析完了。整個過程比較複雜,你們在分析的過程當中耐心一些。而且多寫 Demo 進行進行調試,以便可以更好的理解代碼邏輯。好了,本節內容先到這裏,接下來分析服務導出的另外一塊邏輯 -- 服務註冊。
本節咱們來分析服務註冊過程,服務註冊操做對於 Dubbo 來講不是必需的,經過服務直連的方式就能夠繞過註冊中心。但一般咱們不會這麼作,直連方式不利於服務治理,僅推薦在測試環境測試服務時使用。對於 Dubbo 來講,註冊中心雖不是必需,但倒是必要的。所以,關於註冊中心以及服務註冊相關邏輯,咱們也須要搞懂。
本節內容以 Zookeeper 註冊中心做爲分析目標,其餘類型註冊中心你們可自行分析。下面從服務註冊的入口方法開始分析,咱們把目光再次移到 RegistryProtocol 的 export 方法上。以下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // ${導出服務} // 省略其餘代碼 boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // 註冊服務 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 訂閱 override 數據 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 省略部分代碼 }
RegistryProtocol 的 export 方法包含了服務導出,註冊,以及數據訂閱等邏輯。其中服務導出邏輯上一節已經分析過了,本節將分析服務註冊邏輯,數據訂閱邏輯將在下一節進行分析。下面開始本節的分析,相關代碼以下:
public void register(URL registryUrl, URL registedProviderUrl) { // 獲取 Registry Registry registry = registryFactory.getRegistry(registryUrl); // 註冊服務 registry.register(registedProviderUrl); }
register 方法包含兩步操做,第一步是獲取註冊中心實例,第二步是向註冊中心註冊服務。接下來,我分兩節內容對這兩步操做進行分析。按照順序,先來分析獲取註冊中心的邏輯。
本節內容以 Zookeeper 註冊中心爲例進行分析。下面先來看一下 getRegistry 方法的源碼,這個方法由 AbstractRegistryFactory 實現。以下:
public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); LOCK.lock(); try { // 訪問緩存 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 緩存未命中,建立 Registry 實例 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry..."); } // 寫入緩存 REGISTRIES.put(key, registry); return registry; } finally { LOCK.unlock(); } } protected abstract Registry createRegistry(URL url);
如上,getRegistry 方法先訪問緩存,緩存未命中則調用 createRegistry 建立 Registry,而後寫入緩存。這裏的 createRegistry 是一個模板方法,由具體的子類實現。所以,下面咱們到 ZookeeperRegistryFactory 中探究一番。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory { // zookeeperTransporter 由 SPI 在運行時注入,類型爲 ZookeeperTransporter$Adaptive private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } @Override public Registry createRegistry(URL url) { // 建立 ZookeeperRegistry return new ZookeeperRegistry(url, zookeeperTransporter); } }
ZookeeperRegistryFactory 的 createRegistry 方法僅包含一句代碼,無需解釋,繼續跟下去。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 獲取組名,默認爲 dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { // group = "/" + group group = Constants.PATH_SEPARATOR + group; } this.root = group; // 建立 Zookeeper 客戶端,默認爲 CuratorZookeeperTransporter zkClient = zookeeperTransporter.connect(url); // 添加狀態監聽器 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
在上面的代碼代碼中,咱們重點關注 ZookeeperTransporter 的 connect 方法調用,這個方法用於建立 Zookeeper 客戶端。建立好 Zookeeper 客戶端,意味着註冊中心的建立過程就結束了。不過,顯然咱們不能就此中止,難道你們沒有興趣瞭解一下 Zookeeper 客戶端的建立過程嗎?若是有,那麼繼續向下看。沒有的話,直接跳到下一節。那我接着分析了。
前面說過,這裏的 zookeeperTransporter 類型爲自適應拓展類,所以 connect 方法會在被調用時決定加載什麼類型的 ZookeeperTransporter 拓展,默認爲 CuratorZookeeperTransporter。下面咱們到 CuratorZookeeperTransporter 中看一看。
public ZookeeperClient connect(URL url) { // 建立 CuratorZookeeperClient return new CuratorZookeeperClient(url); }
上面方法僅用於建立 CuratorZookeeperClient 實例,沒什麼好說的,繼續往下看。
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> { private final CuratorFramework client; public CuratorZookeeperClient(URL url) { super(url); try { // 建立 CuratorFramework 構造器 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } // 構建 CuratorFramework 實例 client = builder.build(); // 添加監聽器 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); // 啓動客戶端 client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
CuratorZookeeperClient 構造方法主要用於建立和啓動 CuratorFramework 實例。以上基本上都是 Curator 框架的代碼,你們若是對 Curator 框架不是很瞭解,能夠參考 Curator 官方文檔,並寫點 Demo 跑跑。
本節分析了 ZookeeperRegistry 實例的建立過程,整個過程並非很複雜。你們在看完分析後,能夠自行調試,以加深印象。如今註冊中心實例建立好了,接下來要作的事情是向註冊中心註冊服務,咱們繼續往下看。
以 Zookeeper 爲例,所謂的服務註冊,本質上是將服務配置數據寫入到 Zookeeper 的某個路徑的節點下。爲了驗證這個說法,下面咱們將 Dobbo 官方提供提供的實例跑起來,而後經過 Zookeeper 可視化客戶端 ZooInspector 查看節點數據。以下:
從上圖中能夠看到 com.alibaba.dubbo.demo.DemoService 這個服務對應的配置信息(存儲在 URL 中)最終被註冊到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 節點下。搞懂了服務註冊的本質,那麼接下來咱們就能夠去閱讀服務註冊的代碼了。服務註冊的接口爲 register(URL),這個方法定義在 FailbackRegistry 抽象類中。方法代碼以下:
public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 模板方法,由子類實現 doRegister(url); } catch (Exception e) { Throwable t = e; // 獲取 check 參數,若 check = true 將會直接拋出異常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register"); } else { logger.error("Failed to register"); } // 記錄註冊失敗的連接 failedRegistered.add(url); } } protected abstract void doRegister(URL url);
如上,咱們重點關注 doRegister 方法調用便可,其餘的代碼先忽略。doRegister 方法是一個模板方法,所以咱們到 FailbackRegistry 子類 ZookeeperRegistry 中進行分析。以下:
protected void doRegister(URL url) { try { // 經過 Zookeeper 客戶端建立節點,節點路徑由 toUrlPath 方法生成,路徑格式以下: // /${group}/${serviceInterface}/providers/${url} // 好比 // /dubbo/com.tianxiaobo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1...... zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register..."); } }
如上,ZookeeperRegistry 在 doRegister 中調用了 Zookeeper 客戶端建立服務節點。節點路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來分析 create 方法,以下:
public void create(String path, boolean ephemeral) { if (!ephemeral) { // 若是要建立的節點類型非臨時節點,那麼這裏要檢測節點是否存在 if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); // 遞歸建立上一級路徑 } // 根據 ephemeral 的值建立臨時或持久節點 if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
上面方法先是經過遞歸建立當前節點的上一級路徑,而後再根據 ephemeral 的值決定建立臨時仍是持久節點。createEphemeral 和 createPersistent 這兩個方法都比較簡單,這裏簡單分析其中的一個。以下:
public void createEphemeral(String path) { try { // 經過 Curator 框架建立節點 client.create().withMode(CreateMode.EPHEMERAL).forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
好了,到此關於服務註冊的過程就分析完了。整個過程可簡單總結爲:先建立註冊中心實例,以後再經過註冊中心實例註冊服務。本節先到這,接下來分析數據訂閱過程。
訂閱 override 數據對應的代碼我粗略看了一遍,這部分代碼的主要目的是爲了在服務配置發生變化時,從新導出服務。具體的使用場景應該當咱們經過 Dubbo 管理後臺修改了服務配置後,Dubbo 獲得服務配置被修改的通知,而後從新導出服務。這個使用場景只是猜想,我並未進行過驗證。若是你們有興趣能夠自行驗證。
override 數據訂閱相關代碼也不是不多,考慮到文章篇幅問題以及重要性,遂決定不對此邏輯進行詳細的分析。若是你們有興趣,可自行分析。
本篇文章詳細分析了 Dubbo 服務導出過程,包括配置檢測,URL 組裝,Invoker 建立過程、導出服務以及註冊服務等等。篇幅比較大,須要你們耐心閱讀。對於這篇文章,我建議你們當成一個工具書使用。須要的時候跳到指定章節看一下,通讀可能會有點累。因爲文章篇幅比較大,所以可能會隱藏一些我沒意識到的錯誤。若你們在閱讀的過程當中發現了錯誤,還請指出。若是可以不吝賜教,那就更好了,先在這裏說聲謝謝。
好了,本篇文章就到這了。謝謝閱讀。
本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
做者:田小波
本文同步發佈在個人我的博客:http://www.tianxiaobo.com
本做品採用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。