在上一篇文章中,我詳細的分析了服務導出的原理。本篇文章咱們趁熱打鐵,繼續分析服務引用的原理。在 Dubbo 中,咱們能夠經過兩種方式引用遠程服務。第一種是使用服務直聯的方式引用服務,第二種方式是基於註冊中心進行引用。服務直聯的方式僅適合在調試或測試服務的場景下使用,不適合在線上環境使用。所以,本文我將重點分析經過註冊中心引用服務的過程。從註冊中心中獲取服務配置只是服務引用過程當中的一環,除此以外,服務消費者還須要經歷 Invoker 建立、代理類建立等步驟。這些步驟,我將在後續章節中一一進行分析。java
Dubbo 服務引用的時機有兩個,第一個是在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務,第二個是在 ReferenceBean 對應的服務被注入到其餘類中時引用。這兩個引用服務的時機區別在於,第一個是餓漢式的,第二個是懶漢式的。默認狀況下,Dubbo 使用懶漢式引用服務。若是須要使用餓漢式,可經過配置 <dubbo:reference> 的 init 屬性開啓。下面咱們按照 Dubbo 默認配置進行分析,整個分析過程從 ReferenceBean 的 getObject 方法開始。當咱們的服務被注入到其餘類中時,Spring 會第一時間調用 getObject 方法,並由該方法執行服務引用邏輯。按照慣例,在進行具體工做以前,需先進行配置檢查與收集工做。接着根據收集到的信息決定服務用的方式,有三種,第一種是引用本地 (JVM) 服務,第二是經過直聯方式引用遠程服務,第三是經過註冊中心引用遠程服務。不論是哪一種引用方式,最後都會獲得一個 Invoker 實例。若是有多個註冊中心,多個服務提供者,這個時候會獲得一組 Invoker 實例,此時須要經過集羣管理類 Cluster 將多個 Invoker 合併成一個實例。合併後的 Invoker 實例已經具有調用本地或遠程服務的能力了,但並不能將此實例暴露給用戶使用,這會對用戶業務代碼形成侵入。此時框架還須要經過代理工廠類 (ProxyFactory) 爲服務接口生成代理類,並讓代理類去調用 Invoker 邏輯。避免了 Dubbo 框架代碼對業務代碼的侵入,同時也讓框架更容易使用。git
以上就是 Dubbo 引用服務的大體原理,下面咱們深刻到代碼中,詳細分析服務引用細節。github
服務引用的入口方法爲 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實現了這個方法。實現代碼以下:apache
public Object getObject() throws Exception { return get(); } public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } // 檢測 ref 是否爲空,爲空則經過 init 方法建立 if (ref == null) { // init 方法主要用於處理配置,以及調用 createProxy 生成代理類 init(); } return ref; }
這裏兩個方法代碼都比較簡短,並不難理解。不過這裏須要特別說明一下,若是你們從 getObject 方法進行代碼調試時,會碰到比較詫異的問題。這裏假設你使用 IDEA,且保持了 IDEA 的默認配置。當你面調試到 get 方法的if (ref == null)
時,你會驚奇的發現 ref 不爲空,致使你沒法進入到 init 方法中繼續調試。致使這個現象的緣由是 Dubbo 框架自己有點小問題,這個小問題會引起一些讓人詫異的現象。關於這個問題,我進行了將近兩個小時的排查。查明問題後,我給 Dubbo 提交了一個 pull request (#2754) 修復了此問題。另外,beiwei30 前輩開了一個 issue (#2757 ) 介紹這個問題,有興趣的朋友能夠去看看。你們若是想規避這個問題,能夠修改一下 IDEA 的配置。在配置面板中搜索 toString,而後取消Enable 'toString' object view
前的對號。具體以下:數組
講完須要注意的點,咱們繼續向下分析,接下來將分析配置的處理過程。緩存
Dubbo 提供了豐富的配置,用於調整和優化框架行爲,性能等。Dubbo 在引用或導出服務時,首先會對這些配置進行檢查和處理,以保證配置到正確性。若是你們不是很熟悉 Dubbo 配置,建議先閱讀如下官方文檔。配置解析的方法爲 ReferenceConfig 的 init 方法,下面來看一下方法邏輯。服務器
private void init() { if (initialized) { return; } initialized = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("interface not allow null!"); } // 檢測 consumer 變量是否爲空,爲空則建立 checkDefault(); appendProperties(this); if (getGeneric() == null && getConsumer() != null) { // 設置 generic setGeneric(getConsumer().getGeneric()); } // 檢測是否爲泛化接口 if (ProtocolUtils.isGeneric(getGeneric())) { interfaceClass = GenericService.class; } else { try { // 加載類 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); } // -------------------------------✨ 分割線1 ✨------------------------------ // 從系統變量中獲取與接口名對應的屬性值 String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { // 從系統屬性中獲取解析文件路徑 resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { // 從指定位置加載配置文件 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { // 獲取文件絕對路徑 resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); // 從文件中加載配置 properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload ..., cause:..."); } finally { try { if (null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } // 獲取與接口名對應的配置 resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { // 將 resolve 賦值給 url url = resolve; } // -------------------------------✨ 分割線2 ✨------------------------------ if (consumer != null) { if (application == null) { // 從 consumer 中獲取 Application 實例,下同 application = consumer.getApplication(); } if (module == null) { module = consumer.getModule(); } if (registries == null) { registries = consumer.getRegistries(); } if (monitor == null) { monitor = consumer.getMonitor(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } // 檢測本地 Application 和本地存根配置合法性 checkApplication(); checkStubAndMock(interfaceClass); // -------------------------------✨ 分割線3 ✨------------------------------ Map<String, String> map = new HashMap<String, String>(); Map<Object, Object> attributes = new HashMap<Object, Object>(); // 添加 side、協議版本信息、時間戳和進程號等信息到 map 中 map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (!isGeneric()) { // 非泛化服務 // 獲取版本 String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } // 獲取接口方法列表,並添加到 map 中 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } map.put(Constants.INTERFACE_KEY, interfaceName); // 將 ApplicationConfig、ConsumerConfig、ReferenceConfig 等對象的字段信息添加到 map 中 appendParameters(map, application); appendParameters(map, module); appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, this); // -------------------------------✨ 分割線4 ✨------------------------------ String prefix = StringUtils.getServiceKey(map); if (methods != null && !methods.isEmpty()) { // 遍歷 MethodConfig 列表 for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; // 檢測 map 是否包含 methodName.retry if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { // 添加劇試次數配置 methodName.retries map.put(method.getName() + ".retries", "0"); } } // 添加 MethodConfig 中的「屬性」字段到 attributes // 好比 onreturn、onthrow、oninvoke 等 appendAttributes(attributes, method, prefix + "." + method.getName()); checkAndConvertImplicitConfig(method, map, attributes); } } // -------------------------------✨ 分割線5 ✨------------------------------ // 獲取服務消費者 ip 地址 String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); if (hostToRegistry == null || hostToRegistry.length() == 0) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property..." ); } map.put(Constants.REGISTER_IP_KEY, hostToRegistry); // 存儲 attributes 到系統上下文中 StaticContext.getSystemContext().putAll(attributes); // 建立代理類 ref = createProxy(map); // 根據服務名,ReferenceConfig,代理類構建 ConsumerModel, // 並將 ConsumerModel 存入到 ApplicationModel 中 ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); }
上面的代碼很長,作的事情比較多。這裏我根據代碼邏輯,對代碼進行了分塊,下面咱們一塊兒來看一下。多線程
首先是方法開始到分割線1之間的代碼。這段代碼主要用於檢測 ConsumerConfig 實例是否存在,如不存在則建立一個新的實例,而後經過系統變量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。接着是檢測泛化配置,並根據配置設置 interfaceClass 的值。本段代碼邏輯大體就是這些,接着來看分割線1到分割線2之間的邏輯。這段邏輯用於從系統屬性或配置文件中加載與接口名相對應的配置,並將解析結果賦值給 url 字段。url 字段的做用通常是用於點對點調用。繼續向下看,分割線2和分割線3之間的代碼用於檢測幾個核心配置類是否爲空,爲空則嘗試從其餘配置類中獲取。分割線3與分割線4之間的代碼主要是用於收集各類配置,並將配置存儲到 map 中。分割線4和分割線5之間的代碼用於處理 MethodConfig 實例。該實例包含了事件通知配置,好比 onreturn、onthrow、oninvoke 等。分割線5到方法結尾的代碼主要用於解析服務消費者 ip,以及調用 createProxy 建立代理對象。關於該方法的詳細分析,將會在接下來的章節中展開。app
到這裏,關於配置的檢查與處理過長就分析完了。這部分邏輯不是很難理解,但比較繁雜,你們須要耐心看一下。好了,本節先到這,接下來分析服務引用的過程。框架
本節咱們要從 createProxy 開始看起。createProxy 這個方法表面上看起來只是用於建立代理對象,但實際上並不是如此。該方法還會調用其餘方法構建以及合併 Invoker 實例。具體細節以下。
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { // url 配置被指定,則不作本地引用 if (url != null && url.length() > 0) { isJvmRefer = false; // 根據 url 的協議、scope 以及 injvm 等參數檢測是否須要本地引用 // 好比若是用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { isJvmRefer = true; } else { isJvmRefer = false; } } else { // 獲取 injvm 配置值 isJvmRefer = isInjvm().booleanValue(); } // 本地引用 if (isJvmRefer) { // 生成本地引用 URL,協議爲 injvm URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); // 調用 refer 方法構建 InjvmInvoker 實例 invoker = refprotocol.refer(interfaceClass, url); // 遠程引用 } else { // url 不爲空,代表用戶可能想進行點對點調用 if (url != null && url.length() > 0) { // 當須要配置多個 url 時,可用分號進行分割,這裏會進行切分 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { // 設置接口全限定名爲 url 路徑 url = url.setPath(interfaceName); } // 檢測 url 協議是否爲 registry,如果,代表用戶想使用指定的註冊中心 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 將 map 轉換爲查詢字符串,並做爲 refer 參數的值添加到 url 中 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { // 合併 url,移除服務提供者的一些配置(這些配置來源於用戶配置的 url 屬性), // 好比線程池相關配置。並保留服務提供者的部分配置,好比版本,group,時間戳等 // 最後將合併後的配置設置爲 url 查詢字符串中。 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 加載註冊中心 url List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 添加 refer 參數到 url 中,並將 url 添加到 urls 中 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } // 未配置註冊中心,拋出異常 if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference..."); } } // 單個註冊中心或服務提供者(服務直聯,下同) if (urls.size() == 1) { // 調用 RegistryProtocol 的 refer 構建 Invoker 實例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 多個註冊中心或多個服務提供者,或者二者混合 } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; // 獲取全部的 Invoker for (URL url : urls) { // 經過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時 // 根據 url 協議頭加載指定的 Protocol 實例,並調用實例的 refer 方法 invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null) { // 若是註冊中心連接不爲空,則將使用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 建立 StaticDirectory 實例,並由 Cluster 對多個 Invoker 進行合併 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; } // invoker 可用性檢查 if (c && !invoker.isAvailable()) { throw new IllegalStateException("No provider available for the service..."); } // 生成代理類 return (T) proxyFactory.getProxy(invoker); }
上面代碼不少,不過邏輯比較清晰。首先根據配置檢查是否爲本地調用,如果,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例。若不是,則讀取直聯配置項,或註冊中心 url,並將讀取到的 url 存儲到 urls 中。而後,根據 urls 元素數量進行後續操做。若 urls 元素數量爲1,則直接經過 Protocol 自適應拓展構建 Invoker 實例接口。若 urls 元素數量大於1,即存在多個註冊中心或服務直聯 url,此時先根據 url 構建 Invoker。而後再經過 Cluster 合併多個 Invoker,最後調用 ProxyFactory 生成代理類。這裏,Invoker 的構建過程以及代理類的過程比較重要,所以我將分兩小節對這兩個過程進行分析。
Invoker 是 Dubbo 的核心模型,表明一個可執行體。在服務提供方,Invoker 用於調用服務提供類。在服務消費方,Invoker 用於執行遠程調用。Invoker 在 Dubbo 中的位置十分重要,所以咱們有必要去搞懂它。從前面的代碼中可知,Invoker 是由 Protocol 實現類構建的。Protocol 實現類有不少,這裏我會分析最經常使用的兩個,分別是 RegistryProtocol 和 DubboProtocol,其餘的你們自行分析。下面先來分析 DubboProtocol 的 refer 方法源碼。以下:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // 建立 DubboInvoker DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
上面方法看起來比較簡單,不過這裏有一個調用須要咱們注意一下,即 getClients。這個方法用於獲取客戶端實例,實例類型爲 ExchangeClient。ExchangeClient 實際上並不具有通訊能力,所以它須要更底層的客戶端實例進行通訊。好比 NettyClient、MinaClient 等,默認狀況下,Dubbo 使用 NettyClient 進行通訊。接下來,咱們簡單看一下 getClients 方法的邏輯。
private ExchangeClient[] getClients(URL url) { // 是否共享鏈接 boolean service_share_connect = false; // 獲取鏈接數,默認爲0,表示未配置 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // 若是未配置 connections,則共享鏈接 if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { // 獲取共享客戶端 clients[i] = getSharedClient(url); } else { // 初始化新的客戶端 clients[i] = initClient(url); } } return clients; }
這裏根據 connections 數量決定是獲取共享客戶端仍是建立新的客戶端實例,默認狀況下,使用共享客戶端實例。不過 getSharedClient 方法中也會調用 initClient 方法,所以下面咱們一塊兒看一下這兩個方法。
private ExchangeClient getSharedClient(URL url) { String key = url.getAddress(); // 獲取帶有「引用計數」功能的 ExchangeClient ReferenceCountExchangeClient client = referenceClientMap.get(key); if (client != null) { if (!client.isClosed()) { // 增長引用計數 client.incrementAndGetCount(); return client; } else { referenceClientMap.remove(key); } } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { if (referenceClientMap.containsKey(key)) { return referenceClientMap.get(key); } // 建立 ExchangeClient 客戶端 ExchangeClient exchangeClient = initClient(url); // 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這裏明顯用了裝飾模式 client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); locks.remove(key); return client; } }
上面方法先訪問緩存,若緩存未命中,則經過 initClient 方法建立新的 ExchangeClient 實例,並將該實例傳給 ReferenceCountExchangeClient 構造方法建立一個帶有引用技術功能的 ExchangeClient 實例。ReferenceCountExchangeClient 內部實現比較簡單,就不分析了。下面咱們再來看一下 initClient 方法的代碼。
private ExchangeClient initClient(URL url) { // 獲取客戶端類型,默認爲 netty String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); // 添加編解碼和心跳包參數到 url 中 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 檢測客戶端類型是否存在,不存在則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: ..."); } ExchangeClient client; try { // 獲取 lazy 配置,並根據配置值決定建立的客戶端類型 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { // 建立懶加載 ExchangeClient 實例 client = new LazyConnectExchangeClient(url, requestHandler); } else { // 建立普通 ExchangeClient 實例 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..."); } return client; }
initClient 方法首先獲取用戶配置的客戶端類型,默認爲 netty。而後檢測用戶配置的客戶端類型是否存在,不存在則拋出異常。最後根據 lazy 配置決定建立什麼類型的客戶端。這裏的 LazyConnectExchangeClient 代碼並非很複雜,該類會在 request 方法被調用時經過 Exchangers 的 connect 方法建立 ExchangeClient 客戶端,這裏就不分析 LazyConnectExchangeClient 的代碼了。下面咱們分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 獲取 Exchanger 實例,默認爲 HeaderExchangeClient return getExchanger(url).connect(url, handler); }
如上,getExchanger 會經過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,你們本身看一下吧。接下來分析 HeaderExchangeClient 的實現。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 這裏包含了多個調用,分別以下: // 1. 建立 HeaderExchangeHandler 對象 // 2. 建立 DecodeHandler 對象 // 3. 經過 Transporters 構建 Client 實例 // 4. 建立 HeaderExchangeClient 對象 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
這裏的調用比較多,咱們這裏重點看一下 Transporters 的 connect 方法。以下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { // 若是 handler 數量大於1,則建立一個 ChannelHandler 分發器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取 Transporter 自適應拓展類,並調用 connect 方法生成 Client 實例 return getTransporter().connect(url, handler); }
這裏,getTransporter 方法返回的是自適應拓展類,該類會在運行時根據客戶端類型加載指定的 Transporter 實現類。若用戶未顯示配置客戶端類型,則默認加載 NettyTransporter,並調用該類的 connect 方法。以下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException { // 建立 NettyClient 對象 return new NettyClient(url, listener); }
到這裏就不繼續跟下去了,在往下就是經過 Netty 提供的接口構建 Netty 客戶端了,你們有興趣本身看看。到這裏,關於 DubboProtocol 的 refer 方法就分析完了。接下來,繼續分析 RegistryProtocol 的 refer 方法邏輯。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 取 registry 參數值,並將其設置爲協議頭 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 獲取註冊中心實例 Registry registry = registryFactory.getRegistry(url); // 這個判斷暫時不知道有什麼意圖,爲何要給 RegistryService 類型生成 Invoker ? if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 將 url 查詢字符串轉爲 Map Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); // 獲取 group 配置 String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // 經過 SPI 加載 MergeableCluster 實例,並調用 doRefer 繼續執行引用服務邏輯 return doRefer(getMergeableCluster(), registry, type, url); } } // 調用 doRefer 繼續執行引用服務邏輯 return doRefer(cluster, registry, type, url); }
上面代碼首先爲 url 設置協議頭,而後根據 url 參數加載註冊中心實例。接下來對 RegistryService 繼續針對性處理,這個處理邏輯我不是很明白,不知道爲何要爲 RegistryService 類型生成 Invoker,有知道同窗麻煩告知一下。而後就是獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。這裏的重點是 doRefer 方法,以下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 建立 RegistryDirectory 實例 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 設置註冊中心和協議 directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 生成服務消費者連接 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 註冊服務消費者,在 consumers 目錄下新節點 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 訂閱 providers、configurators、routers 等節點數據 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 一個註冊中心可能有多個服務提供者,所以這裏須要將多個服務提供者合併爲一個 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
如上,doRefer 方法建立一個 RegistryDirectory 實例,而後生成服務者消費者連接,並向註冊中心進行註冊。註冊完畢後,緊接着訂閱 providers、configurators、routers 等節點下的數據。完成訂閱後,RegistryDirectory 會收到這幾個節點下的子節點信息,好比能夠獲取到服務提供者的配置信息。因爲一個服務可能部署在多臺服務器上,這樣就會在 providers 產生多個節點,這個時候就須要 Cluster 將多個服務節點合併爲一個,並生成一個 Invoker。關於 RegistryDirectory 和 Cluster,本文不打算進行分析,相關分析將會在隨後的文章中展開。
好了,關於 Invoker 的建立的邏輯就先分析到這。邏輯比較多,你們耐心看一下。
Invoker 建立完畢後,接下來要作的事情是爲服務接口生成代理對象。有了代理對象,咱們就能夠經過代理對象進行遠程調用。代理對象生成的入口方法爲在 ProxyFactory 的 getProxy,接下來進行分析。
public <T> T getProxy(Invoker<T> invoker) throws RpcException { // 調用重載方法 return getProxy(invoker, false); } public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null; // 獲取接口列表 String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { // 切分接口列表 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class<?>[types.length + 2]; // 設置服務接口類和 EchoService.class 到 interfaces 中 interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { // 加載接口類 interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } // 爲 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827 if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; // 建立新的 interfaces 數組 interfaces = new Class<?>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); // 設置 GenericService.class 到數組中 interfaces[len] = GenericService.class; } // 調用重載方法 return getProxy(invoker, interfaces); } public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
如上,上面大段代碼都是用來獲取 interfaces 數組的,所以咱們須要繼續往下看。getProxy(Invoker, Class<?>[]) 這個方法是一個抽象方法,下面咱們到 JavassistProxyFactory 類中看一下該方法的實現代碼。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { // 生成 Proxy 子類(Proxy 是抽象類)。並調用Proxy 子類的 newInstance 方法生成 Proxy 實例 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
上面代碼並很少,首先是經過 Proxy 的 getProxy 方法獲取 Proxy 子類,而後建立 InvokerInvocationHandler 對象,並將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現自 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調用。該類邏輯比較簡單,這裏就不分析了。下面咱們重點關注一下 Proxy 的 getProxy 方法,以下。
public static Proxy getProxy(Class<?>... ics) { // 調用重載方法 return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { if (ics.length > 65535) throw new IllegalArgumentException("interface limit exceeded"); StringBuilder sb = new StringBuilder(); // 遍歷接口列表 for (int i = 0; i < ics.length; i++) { String itf = ics[i].getName(); // 檢測類型是否爲接口 if (!ics[i].isInterface()) throw new RuntimeException(itf + " is not a interface."); Class<?> tmp = null; try { // 從新加載接口類 tmp = Class.forName(itf, false, cl); } catch (ClassNotFoundException e) { } // 檢測接口是否相同,這裏 tmp 有可能爲空 if (tmp != ics[i]) throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); // 拼接接口全限定名,分隔符爲 ; sb.append(itf).append(';'); } // 使用拼接後接口名做爲 key String key = sb.toString(); Map<String, Object> cache; synchronized (ProxyCacheMap) { cache = ProxyCacheMap.get(cl); if (cache == null) { cache = new HashMap<String, Object>(); ProxyCacheMap.put(cl, cache); } } Proxy proxy = null; synchronized (cache) { do { // 從緩存中獲取 Reference<Proxy> 實例 Object value = cache.get(key); if (value instanceof Reference<?>) { proxy = (Proxy) ((Reference<?>) value).get(); if (proxy != null) { return proxy; } } // 多線程控制,保證只有一個線程能夠進行後續操做 if (value == PendingGenerationMarker) { try { // 其餘線程在此處進行等待 cache.wait(); } catch (InterruptedException e) { } } else { // 放置標誌位到緩存中,並跳出 while 循環進行後續操做 cache.put(key, PendingGenerationMarker); break; } } while (true); } long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null; ClassGenerator ccp = null, ccm = null; try { // 建立 ClassGenerator 對象 ccp = ClassGenerator.newInstance(cl); Set<String> worked = new HashSet<String>(); List<Method> methods = new ArrayList<Method>(); for (int i = 0; i < ics.length; i++) { // 檢測接口訪問級別是否爲 protected 或 privete if (!Modifier.isPublic(ics[i].getModifiers())) { // 獲取接口包名 String npkg = ics[i].getPackage().getName(); if (pkg == null) { pkg = npkg; } else { if (!pkg.equals(npkg)) // 非 public 級別的接口必須在同一個包下,否者拋出異常 throw new IllegalArgumentException("non-public interfaces from different packages"); } } // 添加接口到 ClassGenerator 中 ccp.addInterface(ics[i]); // 遍歷接口方法 for (Method method : ics[i].getMethods()) { // 獲取方法描述,可理解爲方法簽名 String desc = ReflectUtils.getDesc(method); // 若是已包含在 worked 中,則忽略。考慮這種狀況, // A 接口和 B 接口中包含一個徹底相同的方法 if (worked.contains(desc)) continue; worked.add(desc); int ix = methods.size(); // 獲取方法返回值類型 Class<?> rt = method.getReturnType(); // 獲取參數列表 Class<?>[] pts = method.getParameterTypes(); // 生成 Object[] args = new Object[1...N] StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];"); for (int j = 0; j < pts.length; j++) // 生成 args[1...N] = ($w)$1...N; code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); // 生成 InvokerHandler 接口的 invoker 方法調用語句,以下: // Object ret = handler.invoke(this, methods[1...N], args); code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);"); // 返回值不爲 void if (!Void.TYPE.equals(rt)) // 生成返回語句,形如 return (java.lang.String) ret; code.append(" return ").append(asArgument(rt, "ret")).append(";"); methods.add(method); // 添加方法名、訪問控制符、參數列表、方法代碼等信息到 ClassGenerator 中 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } if (pkg == null) pkg = PACKAGE_NAME; // 構建接口代理類名稱:pkg + ".proxy" + id,好比 com.tianxiaobo.proxy0 String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); ccp.addField("public static java.lang.reflect.Method[] methods;"); // 生成 private java.lang.reflect.InvocationHandler handler; ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); // 爲接口代理類添加帶有 InvocationHandler 參數的構造方法,好比: // porxy0(java.lang.reflect.InvocationHandler arg0) { // handler=$1; // } ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); // 爲接口代理類添加默認構造方法 ccp.addDefaultConstructor(); // 生成接口代理類 Class<?> clazz = ccp.toClass(); clazz.getField("methods").set(null, methods.toArray(new Method[0])); // 構建 Proxy 子類名稱,好比 Proxy1,Proxy2 等 String fcn = Proxy.class.getName() + id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); ccm.addDefaultConstructor(); ccm.setSuperClass(Proxy.class); // 爲 Proxy 的抽象方法 newInstance 生成實現代碼,形如: // public Object newInstance(java.lang.reflect.InvocationHandler h) { // return new com.tianxiaobo.proxy0($1); // } ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); // 生成 Proxy 實現類 Class<?> pc = ccm.toClass(); // 經過反射建立 Proxy 實例 proxy = (Proxy) pc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { if (ccp != null) // 釋放資源 ccp.release(); if (ccm != null) ccm.release(); synchronized (cache) { if (proxy == null) cache.remove(key); else // 寫緩存 cache.put(key, new WeakReference<Proxy>(proxy)); // 喚醒其餘等待線程 cache.notifyAll(); } } return proxy; }
上面代碼比較複雜,我也寫了不少註釋。你們在閱讀這段代碼時,要搞清楚 ccp 和 ccm 的用途,否則會被搞暈。ccp 用於爲服務接口生成代理類,好比咱們有一個 DemoService 接口,這個接口代理類就是由 ccp 生成的。ccm 則是用於爲 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現 Proxy 的抽象方法。下面以 org.apache.dubbo.demo.DemoService 這個接口爲例,來看一下該接口代理類代碼大體是怎樣的(忽略 EchoService 接口)。
package org.apache.dubbo.common.bytecode; public class proxy0 implements org.apache.dubbo.demo.DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0() { } public proxy0(java.lang.reflect.InvocationHandler arg0) { handler = $1; } public java.lang.String sayHello(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = ($w) $1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String) ret; } }
好了,到這裏代理類生成邏輯就分析完了。整個過程比較複雜,你們須要耐心看一下,本節點到這裏。
本篇文章對服務引用的過程進行了較爲詳盡的分析,之因此說是較爲詳盡,是由於還有一些地方沒有分析到。好比 Directory、Cluster 等實現類的代碼並未進行詳細分析,因爲這些類功能比較獨立,所以我打算後續單獨成文進行分析。暫時咱們能夠先把這些類當作黑盒,只要知道這些類的用途便可。引用服務過程涉及到的調用也很是多,你們在閱讀相關代碼的中耐心些,並多進行調試。
好了,本篇文章就先到這裏了。謝謝閱讀。
本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
做者:田小波
本文同步發佈在個人我的博客:http://www.tianxiaobo.com
本做品採用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。