前面講了服務是如何導出到註冊中心的。其實Dubbo作的一件事就是將服務的URL發佈到註冊中心上。那如今咱們聊一聊消費者一方如何從註冊中心訂閱服務並進行遠程調用的。html
首先總的來用文字說一遍內部的大體機制java
Actor:能夠當作咱們的消費者。當咱們使用@Reference註解將對應服務注入到其餘類中這時候Spring會第一時間調用getObject方法,而getObject中只有一個方法就是get()。這裏能夠理解爲消費者開始引入服務了。算法
餓漢式:在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務。apache
懶漢式:在 ReferenceBean 對應的服務被注入到其餘類中時引用。Dubbo默認使用懶漢式。數組
ReferenceConfig:經過get方法實際上是進入到ReferenceConfig類中執行init()方法。在這個方法裏主要作了下面幾件事情:緩存
1,、對@Reference標註的接口查看是否合法,檢查該接口是否是存在泛型服務器
二、在系統中拿到dubbo.resolve.file這個文件,這個文件是進行配置consumer的接口的。將配置好的consumer信息存到URL中併發
三、將配置好的ApplicationConfig、ConsumerConfig、ReferenceConfig、MethodConfig,以及消費者的IP地址存到系統的上下文中app
四、接下來開始建立代理對象進入到ReferenceConfig的createProxy 。這裏仍是在ReferenceConfig類中。上面的那些配置通通傳入該方法中。上面有提到resolve解析consumer爲URL,如今就根據這個URL首先判斷是否遠程調用仍是本地調用。負載均衡
4.1如果本地調用,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例
4.2如果遠程調用,則讀取直連配置項,或註冊中心 url,並將讀取到的 url 存儲到 urls 中。而後根據 urls 元素數量進行後續操做。若 urls 元素數量爲1,則直接經過 Protocol 自適應拓展類即RegistryProtocol類或者DubboProtocol構建 Invoker 實例接口,這得看URL前面的是registry://開頭仍是以dubbo://。若 urls 元素數量大於1,即存在多個註冊中心或服務直連 url,此時先根據 url 構建 Invoker。而後再經過 Cluster 合併即merge多個 Invoker,最後調用 ProxyFactory 生成代理類。
RegistryProtocol:在refer方法中首先爲 url 設置協議頭,而後根據 url 參數加載註冊中心實例。而後獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。doRefer 方法建立一個 RegistryDirectory 實例,而後生成服務消費者連接,經過registry.register方法向註冊中心註冊消費者的連接,而後經過directory.subscribe向註冊中心訂閱 providers、configurators、routers 等節點下的數據。完成訂閱後,RegistryDirectory 會收到這幾個節點下的子節點信息。因爲一個服務可能部署在多臺服務器上,這樣就會在 providers 產生多個節點,這個時候就須要 Cluster 將多個服務節點合併爲一個,並生成一個 Invoker。一樣Invoker建立過程先不分析,後面會拿一章專門介紹。
ProxyFactory:Invoker 建立完畢後,接下來要作的事情是爲服務接口生成代理對象。有了代理對象,便可進行遠程調用。代理對象生成的入口方法爲的getProxy。獲取須要建立的接口列表,組合成數組。然後將該接口數組傳入 Proxy 的 getProxy 方法獲取 Proxy 子類,而後建立 InvokerInvocationHandler 對象,並將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調用。能夠理解爲AOP或攔截器。也就是在獲取該對象以前會調用到Proxy實例而不會調用到服務提供者對應的類。至於如何建立proxy實例,請看後面源碼的註釋。
服務引用入口源碼ReferenceBean的getObject方法:
1 public Object getObject() throws Exception { 2 return get(); 3 } 4 5 public synchronized T get() { 6 if (destroyed) { 7 throw new IllegalStateException("Already destroyed!"); 8 } 9 // 檢測 ref 是否爲空,爲空則經過 init 方法建立 10 if (ref == null) { 11 // init 方法主要用於處理配置,以及調用 createProxy 生成代理類 12 init(); 13 } 14 return ref; 15 }
ReferenceConfig 的 init 進行消費者一方的配置:
對源碼進行了分割,方便理清邏輯
1 private void init() { 2 // 避免重複初始化 3 if (initialized) { 4 return; 5 } 6 initialized = true; 7 // 檢測接口名合法性 8 if (interfaceName == null || interfaceName.length() == 0) { 9 throw new IllegalStateException("interface not allow null!"); 10 } 11 12 // 檢測 consumer 變量是否爲空,爲空則建立 13 checkDefault(); 14 appendProperties(this); 15 if (getGeneric() == null && getConsumer() != null) { 16 // 設置 generic 17 setGeneric(getConsumer().getGeneric()); 18 } 19 20 // 檢測是否爲泛化接口 21 if (ProtocolUtils.isGeneric(getGeneric())) { 22 interfaceClass = GenericService.class; 23 } else { 24 try { 25 // 加載類 26 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() 27 .getContextClassLoader()); 28 } catch (ClassNotFoundException e) { 29 throw new IllegalStateException(e.getMessage(), e); 30 } 31 checkInterfaceAndMethods(interfaceClass, methods); 32 } 33 34 // -------------------------------分割線1------------------------------ 35 36 // 從系統變量中獲取與接口名對應的屬性值 37 String resolve = System.getProperty(interfaceName); 38 String resolveFile = null; 39 if (resolve == null || resolve.length() == 0) { 40 // 從系統屬性中獲取解析文件路徑 41 resolveFile = System.getProperty("dubbo.resolve.file"); 42 if (resolveFile == null || resolveFile.length() == 0) { 43 // 從指定位置加載配置文件 44 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); 45 if (userResolveFile.exists()) { 46 // 獲取文件絕對路徑 47 resolveFile = userResolveFile.getAbsolutePath(); 48 } 49 } 50 if (resolveFile != null && resolveFile.length() > 0) { 51 Properties properties = new Properties(); 52 FileInputStream fis = null; 53 try { 54 fis = new FileInputStream(new File(resolveFile)); 55 // 從文件中加載配置 56 properties.load(fis); 57 } catch (IOException e) { 58 throw new IllegalStateException("Unload ..., cause:..."); 59 } finally { 60 try { 61 if (null != fis) fis.close(); 62 } catch (IOException e) { 63 logger.warn(e.getMessage(), e); 64 } 65 } 66 // 獲取與接口名對應的配置 67 resolve = properties.getProperty(interfaceName); 68 } 69 } 70 if (resolve != null && resolve.length() > 0) { 71 // 將 resolve 賦值給 url 72 url = resolve; 73 } 74 75 // -------------------------------分割線2------------------------------ 76 if (consumer != null) { 77 if (application == null) { 78 // 從 consumer 中獲取 Application 實例,下同 79 application = consumer.getApplication(); 80 } 81 if (module == null) { 82 module = consumer.getModule(); 83 } 84 if (registries == null) { 85 registries = consumer.getRegistries(); 86 } 87 if (monitor == null) { 88 monitor = consumer.getMonitor(); 89 } 90 } 91 if (module != null) { 92 if (registries == null) { 93 registries = module.getRegistries(); 94 } 95 if (monitor == null) { 96 monitor = module.getMonitor(); 97 } 98 } 99 if (application != null) { 100 if (registries == null) { 101 registries = application.getRegistries(); 102 } 103 if (monitor == null) { 104 monitor = application.getMonitor(); 105 } 106 } 107 108 // 檢測 Application 合法性 109 checkApplication(); 110 // 檢測本地存根配置合法性 111 checkStubAndMock(interfaceClass); 112 113 // -------------------------------分割線3------------------------------ 114 115 Map<String, String> map = new HashMap<String, String>(); 116 Map<Object, Object> attributes = new HashMap<Object, Object>(); 117 118 // 添加 side、協議版本信息、時間戳和進程號等信息到 map 中 119 map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); 120 map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); 121 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); 122 if (ConfigUtils.getPid() > 0) { 123 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); 124 } 125 126 // 非泛化服務 127 if (!isGeneric()) { 128 // 獲取版本 129 String revision = Version.getVersion(interfaceClass, version); 130 if (revision != null && revision.length() > 0) { 131 map.put("revision", revision); 132 } 133 134 // 獲取接口方法列表,並添加到 map 中 135 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); 136 if (methods.length == 0) { 137 map.put("methods", Constants.ANY_VALUE); 138 } else { 139 map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); 140 } 141 } 142 map.put(Constants.INTERFACE_KEY, interfaceName); 143 // 將 ApplicationConfig、ConsumerConfig、ReferenceConfig 等對象的字段信息添加到 map 中 144 appendParameters(map, application); 145 appendParameters(map, module); 146 appendParameters(map, consumer, Constants.DEFAULT_KEY); 147 appendParameters(map, this); 148 149 // -------------------------------分割線4------------------------------ 150 151 String prefix = StringUtils.getServiceKey(map); 152 if (methods != null && !methods.isEmpty()) { 153 // 遍歷 MethodConfig 列表 154 for (MethodConfig method : methods) { 155 appendParameters(map, method, method.getName()); 156 String retryKey = method.getName() + ".retry"; 157 // 檢測 map 是否包含 methodName.retry 158 if (map.containsKey(retryKey)) { 159 String retryValue = map.remove(retryKey); 160 if ("false".equals(retryValue)) { 161 // 添加劇試次數配置 methodName.retries 162 map.put(method.getName() + ".retries", "0"); 163 } 164 } 165 166 // 添加 MethodConfig 中的「屬性」字段到 attributes 167 // 好比 onreturn、onthrow、oninvoke 等 168 appendAttributes(attributes, method, prefix + "." + method.getName()); 169 checkAndConvertImplicitConfig(method, map, attributes); 170 } 171 } 172 173 // -------------------------------✨ 分割線5 ✨------------------------------ 174 175 // 獲取服務消費者 ip 地址 176 String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); 177 if (hostToRegistry == null || hostToRegistry.length() == 0) { 178 hostToRegistry = NetUtils.getLocalHost(); 179 } else if (isInvalidLocalHost(hostToRegistry)) { 180 throw new IllegalArgumentException("Specified invalid registry ip from property..." ); 181 } 182 map.put(Constants.REGISTER_IP_KEY, hostToRegistry); 183 184 // 存儲 attributes 到系統上下文中 185 StaticContext.getSystemContext().putAll(attributes); 186 187 // 建立代理類 188 ref = createProxy(map); 189 190 // 根據服務名,ReferenceConfig,代理類構建 ConsumerModel, 191 // 並將 ConsumerModel 存入到 ApplicationModel 中 192 ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); 193 ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); 194 }
ReferenceConfig 的 createProxy 建立代理對象:
可是不是在這個方法內建立proxy實例,而是對URL進行解析後分三種建立Invoker線路,包括InjvmProtocol中的refer、DubboProtocol的refer與RegistryProtocol中的refer,最後再調用ProxyFactory來對proxy實例進行建立:
1 private T createProxy(Map<String, String> map) { 2 URL tmpUrl = new URL("temp", "localhost", 0, map); 3 final boolean isJvmRefer; 4 if (isInjvm() == null) { 5 // url 配置被指定,則不作本地引用 6 if (url != null && url.length() > 0) { 7 isJvmRefer = false; 8 // 根據 url 的協議、scope 以及 injvm 等參數檢測是否須要本地引用 9 // 好比若是用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true 10 } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { 11 isJvmRefer = true; 12 } else { 13 isJvmRefer = false; 14 } 15 } else { 16 // 獲取 injvm 配置值 17 isJvmRefer = isInjvm().booleanValue(); 18 } 19 20 // 本地引用 21 if (isJvmRefer) { 22 // 生成本地引用 URL,協議爲 injvm 23 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); 24 // 調用 refer 方法構建 InjvmInvoker 實例 25 invoker = refprotocol.refer(interfaceClass, url); 26 27 // 遠程引用 28 } else { 29 // url 不爲空,代表用戶可能想進行點對點調用 30 if (url != null && url.length() > 0) { 31 // 當須要配置多個 url 時,可用分號進行分割,這裏會進行切分 32 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); 33 if (us != null && us.length > 0) { 34 for (String u : us) { 35 URL url = URL.valueOf(u); 36 if (url.getPath() == null || url.getPath().length() == 0) { 37 // 設置接口全限定名爲 url 路徑 38 url = url.setPath(interfaceName); 39 } 40 41 // 檢測 url 協議是否爲 registry,如果,代表用戶想使用指定的註冊中心 42 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 43 // 將 map 轉換爲查詢字符串,並做爲 refer 參數的值添加到 url 中 44 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 45 } else { 46 // 合併 url,移除服務提供者的一些配置(這些配置來源於用戶配置的 url 屬性), 47 // 好比線程池相關配置。並保留服務提供者的部分配置,好比版本,group,時間戳等 48 // 最後將合併後的配置設置爲 url 查詢字符串中。 49 urls.add(ClusterUtils.mergeUrl(url, map)); 50 } 51 } 52 } 53 } else { 54 // 加載註冊中心 url 55 List<URL> us = loadRegistries(false); 56 if (us != null && !us.isEmpty()) { 57 for (URL u : us) { 58 URL monitorUrl = loadMonitor(u); 59 if (monitorUrl != null) { 60 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 61 } 62 // 添加 refer 參數到 url 中,並將 url 添加到 urls 中 63 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 64 } 65 } 66 67 // 未配置註冊中心,拋出異常 68 if (urls.isEmpty()) { 69 throw new IllegalStateException("No such any registry to reference..."); 70 } 71 } 72 73 // 單個註冊中心或服務提供者(服務直連,下同) 74 if (urls.size() == 1) { 75 // 調用 RegistryProtocol 的 refer 構建 Invoker 實例 76 invoker = refprotocol.refer(interfaceClass, urls.get(0)); 77 78 // 多個註冊中心或多個服務提供者,或者二者混合 79 } else { 80 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); 81 URL registryURL = null; 82 83 // 獲取全部的 Invoker 84 for (URL url : urls) { 85 // 經過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時 86 // 根據 url 協議頭加載指定的 Protocol 實例,並調用實例的 refer 方法 87 invokers.add(refprotocol.refer(interfaceClass, url)); 88 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 89 registryURL = url; 90 } 91 } 92 if (registryURL != null) { 93 // 若是註冊中心連接不爲空,則將使用 AvailableCluster 94 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 95 // 建立 StaticDirectory 實例,並由 Cluster 對多個 Invoker 進行合併 96 invoker = cluster.join(new StaticDirectory(u, invokers)); 97 } else { 98 invoker = cluster.join(new StaticDirectory(invokers)); 99 } 100 } 101 } 102 103 Boolean c = check; 104 if (c == null && consumer != null) { 105 c = consumer.isCheck(); 106 } 107 if (c == null) { 108 c = true; 109 } 110 111 // invoker 可用性檢查 112 if (c && !invoker.isAvailable()) { 113 throw new IllegalStateException("No provider available for the service..."); 114 } 115 116 // 生成代理類 117 return (T) proxyFactory.getProxy(invoker); 118 }
一樣Invoker的建立後面會專門拿一篇來說。暫時先把Invoker建立當作一個黑盒,只要咱們調用便可。
RegistryProtocol中的refer:
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 // 取 registry 參數值,並將其設置爲協議頭 3 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 4 // 獲取註冊中心實例 5 Registry registry = registryFactory.getRegistry(url); 6 if (RegistryService.class.equals(type)) { 7 return proxyFactory.getInvoker((T) registry, type, url); 8 } 9 10 // 將 url 查詢字符串轉爲 Map 11 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 12 // 獲取 group 配置 13 String group = qs.get(Constants.GROUP_KEY); 14 if (group != null && group.length() > 0) { 15 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 16 || "*".equals(group)) { 17 // 經過 SPI 加載 MergeableCluster 實例,並調用 doRefer 繼續執行服務引用邏輯 18 return doRefer(getMergeableCluster(), registry, type, url); 19 } 20 } 21 22 // 調用 doRefer 繼續執行服務引用邏輯 23 return doRefer(cluster, registry, type, url); 24 } 25 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 26 // 建立 RegistryDirectory 實例 27 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 28 // 設置註冊中心和協議 29 directory.setRegistry(registry); 30 directory.setProtocol(protocol); 31 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 32 // 生成服務消費者連接 33 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); 34 35 // 註冊服務消費者,在 consumers 目錄下新節點 36 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 37 && url.getParameter(Constants.REGISTER_KEY, true)) { 38 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 39 Constants.CHECK_KEY, String.valueOf(false))); 40 } 41 42 // 訂閱 providers、configurators、routers 等節點數據 43 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 44 Constants.PROVIDERS_CATEGORY 45 + "," + Constants.CONFIGURATORS_CATEGORY 46 + "," + Constants.ROUTERS_CATEGORY)); 47 48 // 一個註冊中心可能有多個服務提供者,所以這裏須要將多個服務提供者合併爲一個 49 Invoker invoker = cluster.join(directory); 50 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); 51 return invoker; 52 }
在Invoker建立完後會返回到ReferenceConfig中,而後進入ProxyFactory中的getProxy方法。
ProxyFactory中的getProxy方法:
1 public <T> T getProxy(Invoker<T> invoker) throws RpcException { 2 // 調用重載方法 3 return getProxy(invoker, false); 4 } 5 6 public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { 7 Class<?>[] interfaces = null; 8 // 獲取接口列表 9 String config = invoker.getUrl().getParameter("interfaces"); 10 if (config != null && config.length() > 0) { 11 // 切分接口列表 12 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); 13 if (types != null && types.length > 0) { 14 interfaces = new Class<?>[types.length + 2]; 15 // 設置服務接口類和 EchoService.class 到 interfaces 中 16 interfaces[0] = invoker.getInterface(); 17 interfaces[1] = EchoService.class; 18 for (int i = 0; i < types.length; i++) { 19 // 加載接口類 20 interfaces[i + 1] = ReflectUtils.forName(types[i]); 21 } 22 } 23 } 24 if (interfaces == null) { 25 interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; 26 } 27 28 // 爲 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827 29 if (!invoker.getInterface().equals(GenericService.class) && generic) { 30 int len = interfaces.length; 31 Class<?>[] temp = interfaces; 32 // 建立新的 interfaces 數組 33 interfaces = new Class<?>[len + 1]; 34 System.arraycopy(temp, 0, interfaces, 0, len); 35 // 設置 GenericService.class 到數組中 36 interfaces[len] = GenericService.class; 37 } 38 39 // 調用重載方法 40 return getProxy(invoker, interfaces); 41 } 42 43 public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
在上面的代碼主要是獲取接口數組再經過抽象的getProxy進入到Proxy中的getProxy。
1 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { 2 // 生成 Proxy 子類(Proxy 是抽象類)。並調用 Proxy 子類的 newInstance 方法建立 Proxy 實例 3 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); 4 }
代碼具體分析上面有寫。接下來進入到Proxy子類的getProxy中
1 public static Proxy getProxy(Class<?>... ics) { 2 // 調用重載方法 3 return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); 4 } 5 6 public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { 7 if (ics.length > 65535) 8 throw new IllegalArgumentException("interface limit exceeded"); 9 10 StringBuilder sb = new StringBuilder(); 11 // 遍歷接口列表 12 for (int i = 0; i < ics.length; i++) { 13 String itf = ics[i].getName(); 14 // 檢測類型是否爲接口 15 if (!ics[i].isInterface()) 16 throw new RuntimeException(itf + " is not a interface."); 17 18 Class<?> tmp = null; 19 try { 20 // 從新加載接口類 21 tmp = Class.forName(itf, false, cl); 22 } catch (ClassNotFoundException e) { 23 } 24 25 // 檢測接口是否相同,這裏 tmp 有可能爲空 26 if (tmp != ics[i]) 27 throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); 28 29 // 拼接接口全限定名,分隔符爲 ; 30 sb.append(itf).append(';'); 31 } 32 33 // 使用拼接後的接口名做爲 key 34 String key = sb.toString(); 35 36 Map<String, Object> cache; 37 synchronized (ProxyCacheMap) { 38 cache = ProxyCacheMap.get(cl); 39 if (cache == null) { 40 cache = new HashMap<String, Object>(); 41 ProxyCacheMap.put(cl, cache); 42 } 43 } 44 45 Proxy proxy = null; 46 synchronized (cache) { 47 do { 48 // 從緩存中獲取 Reference<Proxy> 實例 49 Object value = cache.get(key); 50 if (value instanceof Reference<?>) { 51 proxy = (Proxy) ((Reference<?>) value).get(); 52 if (proxy != null) { 53 return proxy; 54 } 55 } 56 57 // 併發控制,保證只有一個線程能夠進行後續操做 58 if (value == PendingGenerationMarker) { 59 try { 60 // 其餘線程在此處進行等待 61 cache.wait(); 62 } catch (InterruptedException e) { 63 } 64 } else { 65 // 放置標誌位到緩存中,並跳出 while 循環進行後續操做 66 cache.put(key, PendingGenerationMarker); 67 break; 68 } 69 } 70 while (true); 71 } 72 73 long id = PROXY_CLASS_COUNTER.getAndIncrement(); 74 String pkg = null; 75 ClassGenerator ccp = null, ccm = null; 76 try { 77 // 建立 ClassGenerator 對象 78 ccp = ClassGenerator.newInstance(cl); 79 80 Set<String> worked = new HashSet<String>(); 81 List<Method> methods = new ArrayList<Method>(); 82 83 for (int i = 0; i < ics.length; i++) { 84 // 檢測接口訪問級別是否爲 protected 或 privete 85 if (!Modifier.isPublic(ics[i].getModifiers())) { 86 // 獲取接口包名 87 String npkg = ics[i].getPackage().getName(); 88 if (pkg == null) { 89 pkg = npkg; 90 } else { 91 if (!pkg.equals(npkg)) 92 // 非 public 級別的接口必須在同一個包下,否者拋出異常 93 throw new IllegalArgumentException("non-public interfaces from different packages"); 94 } 95 } 96 97 // 添加接口到 ClassGenerator 中 98 ccp.addInterface(ics[i]); 99 100 // 遍歷接口方法 101 for (Method method : ics[i].getMethods()) { 102 // 獲取方法描述,可理解爲方法簽名 103 String desc = ReflectUtils.getDesc(method); 104 // 若是方法描述字符串已在 worked 中,則忽略。考慮這種狀況, 105 // A 接口和 B 接口中包含一個徹底相同的方法 106 if (worked.contains(desc)) 107 continue; 108 worked.add(desc); 109 110 int ix = methods.size(); 111 // 獲取方法返回值類型 112 Class<?> rt = method.getReturnType(); 113 // 獲取參數列表 114 Class<?>[] pts = method.getParameterTypes(); 115 116 // 生成 Object[] args = new Object[1...N] 117 StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];"); 118 for (int j = 0; j < pts.length; j++) 119 // 生成 args[1...N] = ($w)$1...N; 120 code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); 121 // 生成 InvokerHandler 接口的 invoker 方法調用語句,以下: 122 // Object ret = handler.invoke(this, methods[1...N], args); 123 code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);"); 124 125 // 返回值不爲 void 126 if (!Void.TYPE.equals(rt)) 127 // 生成返回語句,形如 return (java.lang.String) ret; 128 code.append(" return ").append(asArgument(rt, "ret")).append(";"); 129 130 methods.add(method); 131 // 添加方法名、訪問控制符、參數列表、方法代碼等信息到 ClassGenerator 中 132 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); 133 } 134 } 135 136 if (pkg == null) 137 pkg = PACKAGE_NAME; 138 139 // 構建接口代理類名稱:pkg + ".proxy" + id,好比 org.apache.dubbo.proxy0 140 String pcn = pkg + ".proxy" + id; 141 ccp.setClassName(pcn); 142 ccp.addField("public static java.lang.reflect.Method[] methods;"); 143 // 生成 private java.lang.reflect.InvocationHandler handler; 144 ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); 145 146 // 爲接口代理類添加帶有 InvocationHandler 參數的構造方法,好比: 147 // porxy0(java.lang.reflect.InvocationHandler arg0) { 148 // handler=$1; 149 // } 150 ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); 151 // 爲接口代理類添加默認構造方法 152 ccp.addDefaultConstructor(); 153 154 // 生成接口代理類 155 Class<?> clazz = ccp.toClass(); 156 clazz.getField("methods").set(null, methods.toArray(new Method[0])); 157 158 // 構建 Proxy 子類名稱,好比 Proxy1,Proxy2 等 159 String fcn = Proxy.class.getName() + id; 160 ccm = ClassGenerator.newInstance(cl); 161 ccm.setClassName(fcn); 162 ccm.addDefaultConstructor(); 163 ccm.setSuperClass(Proxy.class); 164 // 爲 Proxy 的抽象方法 newInstance 生成實現代碼,形如: 165 // public Object newInstance(java.lang.reflect.InvocationHandler h) { 166 // return new org.apache.dubbo.proxy0($1); 167 // } 168 ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); 169 // 生成 Proxy 實現類 170 Class<?> pc = ccm.toClass(); 171 // 經過反射建立 Proxy 實例 172 proxy = (Proxy) pc.newInstance(); 173 } catch (RuntimeException e) { 174 throw e; 175 } catch (Exception e) { 176 throw new RuntimeException(e.getMessage(), e); 177 } finally { 178 if (ccp != null) 179 // 釋放資源 180 ccp.release(); 181 if (ccm != null) 182 ccm.release(); 183 synchronized (cache) { 184 if (proxy == null) 185 cache.remove(key); 186 else 187 // 寫緩存 188 cache.put(key, new WeakReference<Proxy>(proxy)); 189 // 喚醒其餘等待線程 190 cache.notifyAll(); 191 } 192 } 193 return proxy; 194 }
ccp 用於爲服務接口生成代理類,好比咱們有一個 DemoService 接口,這個接口代理類就是由 ccp 生成的。ccm 則是用於爲 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現 Proxy 類的抽象方法。
這裏須要重點講一下,由於用到了併發控制。機制是這樣的,synchronized中首先獲取緩存中 Reference<Proxy> 實例。由於緩存是HashMap結構來存取。key是Reference<Proxy> 實例對應的接口名稱,value就是Reference<Proxy> 實例,注意的是接口列表進行拼接了。當第一個線程進入時,key對應的是實例而不是PendingGenerationMarker。因此會進入到else中,else中則設置key的對應的value爲標誌位PendingGenerationMarker。這樣其餘線程只能等待,然後對服務接口生產代理類和抽象類的子類。在最後釋放資源時,會喚醒其餘線程,而且把已經生成過的Reference實例標誌成弱引用對象,表明能夠回收了。(注:弱引用,比軟引用更弱一點,被弱引用關聯的對象只能生存到下一次垃圾收集發生以前。當垃圾收集發生時不管內存是否足夠,都會回收弱引用對象。具體能夠看JVM垃圾回收算法)
到這裏應該算是講完了Dubbo內的服務引用機制。對於Invoker後面會再單獨講。這裏還要補充一句若是是集羣的話會啓動服務降級以及負載均衡每次只選擇一個Invoker調用,一樣這個後面會再作單獨介紹。