Dubbo(三):深刻理解Dubbo源碼之如何實現服務引用

1、前言

  前面講了服務是如何導出到註冊中心的。其實Dubbo作的一件事就是將服務的URL發佈到註冊中心上。那如今咱們聊一聊消費者一方如何從註冊中心訂閱服務並進行遠程調用的。html

2、引用服務時序圖

  首先總的來用文字說一遍內部的大體機制java

  Actor:能夠當作咱們的消費者。當咱們使用@Reference註解將對應服務注入到其餘類中這時候Spring會第一時間調用getObject方法,而getObject中只有一個方法就是get()。這裏能夠理解爲消費者開始引入服務了算法

    餓漢式:在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務。apache

    懶漢式:在 ReferenceBean 對應的服務被注入到其餘類中時引用。Dubbo默認使用懶漢式。數組

  ReferenceConfig:經過get方法實際上是進入到ReferenceConfig類中執行init()方法。在這個方法裏主要作了下面幾件事情:緩存

    1,、對@Reference標註的接口查看是否合法,檢查該接口是否是存在泛型服務器

    二、在系統中拿到dubbo.resolve.file這個文件,這個文件是進行配置consumer的接口的。將配置好的consumer信息存到URL中併發

    三、將配置好的ApplicationConfig、ConsumerConfig、ReferenceConfig、MethodConfig,以及消費者的IP地址存到系統的上下文中app

    四、接下來開始建立代理對象進入到ReferenceConfig的createProxy這裏仍是在ReferenceConfig類中。上面的那些配置通通傳入該方法中。上面有提到resolve解析consumer爲URL,如今就根據這個URL首先判斷是否遠程調用仍是本地調用。負載均衡

      4.1如果本地調用,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例

      4.2如果遠程調用,則讀取直連配置項,或註冊中心 url,並將讀取到的 url 存儲到 urls 中。而後根據 urls 元素數量進行後續操做。若 urls 元素數量爲1,則直接經過 Protocol 自適應拓展類即RegistryProtocol類或者DubboProtocol構建 Invoker 實例接口這得看URL前面的是registry://開頭仍是以dubbo://若 urls 元素數量大於1,即存在多個註冊中心或服務直連 url,此時先根據 url 構建 Invoker。而後再經過 Cluster 合併即merge多個 Invoker,最後調用 ProxyFactory 生成代理類

  RegistryProtocol:在refer方法中首先爲 url 設置協議頭,而後根據 url 參數加載註冊中心實例。而後獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。doRefer 方法建立一個 RegistryDirectory 實例,而後生成服務消費者連接,經過registry.register方法向註冊中心註冊消費者的連接,而後經過directory.subscribe向註冊中心訂閱 providers、configurators、routers 等節點下的數據。完成訂閱後,RegistryDirectory 會收到這幾個節點下的子節點信息。因爲一個服務可能部署在多臺服務器上,這樣就會在 providers 產生多個節點,這個時候就須要 Cluster 將多個服務節點合併爲一個,並生成一個 Invoker。一樣Invoker建立過程先不分析,後面會拿一章專門介紹。

  ProxyFactory:Invoker 建立完畢後,接下來要作的事情是爲服務接口生成代理對象。有了代理對象,便可進行遠程調用。代理對象生成的入口方法爲的getProxy。獲取須要建立的接口列表,組合成數組。然後將該接口數組傳入 Proxy 的 getProxy 方法獲取 Proxy 子類,而後建立 InvokerInvocationHandler 對象,並將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調用。能夠理解爲AOP或攔截器。也就是在獲取該對象以前會調用到Proxy實例而不會調用到服務提供者對應的類。至於如何建立proxy實例,請看後面源碼的註釋。

3、Dubbo源碼

  服務引用入口源碼ReferenceBean的getObject方法:

 1 public Object getObject() throws Exception {
 2     return get();
 3 }
 4 
 5 public synchronized T get() {
 6     if (destroyed) {
 7         throw new IllegalStateException("Already destroyed!");
 8     }
 9     // 檢測 ref 是否爲空,爲空則經過 init 方法建立
10     if (ref == null) {
11         // init 方法主要用於處理配置,以及調用 createProxy 生成代理類
12         init();
13     }
14     return ref;
15 }
View Code

  ReferenceConfig 的 init 進行消費者一方的配置:

    對源碼進行了分割,方便理清邏輯

  1 private void init() {
  2     // 避免重複初始化
  3     if (initialized) {
  4         return;
  5     }
  6     initialized = true;
  7     // 檢測接口名合法性
  8     if (interfaceName == null || interfaceName.length() == 0) {
  9         throw new IllegalStateException("interface not allow null!");
 10     }
 11 
 12     // 檢測 consumer 變量是否爲空,爲空則建立
 13     checkDefault();
 14     appendProperties(this);
 15     if (getGeneric() == null && getConsumer() != null) {
 16         // 設置 generic
 17         setGeneric(getConsumer().getGeneric());
 18     }
 19 
 20     // 檢測是否爲泛化接口
 21     if (ProtocolUtils.isGeneric(getGeneric())) {
 22         interfaceClass = GenericService.class;
 23     } else {
 24         try {
 25             // 加載類
 26             interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
 27                     .getContextClassLoader());
 28         } catch (ClassNotFoundException e) {
 29             throw new IllegalStateException(e.getMessage(), e);
 30         }
 31         checkInterfaceAndMethods(interfaceClass, methods);
 32     }
 33     
 34     // -------------------------------分割線1------------------------------
 35 
 36     // 從系統變量中獲取與接口名對應的屬性值
 37     String resolve = System.getProperty(interfaceName);
 38     String resolveFile = null;
 39     if (resolve == null || resolve.length() == 0) {
 40         // 從系統屬性中獲取解析文件路徑
 41         resolveFile = System.getProperty("dubbo.resolve.file");
 42         if (resolveFile == null || resolveFile.length() == 0) {
 43             // 從指定位置加載配置文件
 44             File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
 45             if (userResolveFile.exists()) {
 46                 // 獲取文件絕對路徑
 47                 resolveFile = userResolveFile.getAbsolutePath();
 48             }
 49         }
 50         if (resolveFile != null && resolveFile.length() > 0) {
 51             Properties properties = new Properties();
 52             FileInputStream fis = null;
 53             try {
 54                 fis = new FileInputStream(new File(resolveFile));
 55                 // 從文件中加載配置
 56                 properties.load(fis);
 57             } catch (IOException e) {
 58                 throw new IllegalStateException("Unload ..., cause:...");
 59             } finally {
 60                 try {
 61                     if (null != fis) fis.close();
 62                 } catch (IOException e) {
 63                     logger.warn(e.getMessage(), e);
 64                 }
 65             }
 66             // 獲取與接口名對應的配置
 67             resolve = properties.getProperty(interfaceName);
 68         }
 69     }
 70     if (resolve != null && resolve.length() > 0) {
 71         // 將 resolve 賦值給 url
 72         url = resolve;
 73     }
 74     
 75     // -------------------------------分割線2------------------------------
 76     if (consumer != null) {
 77         if (application == null) {
 78             // 從 consumer 中獲取 Application 實例,下同
 79             application = consumer.getApplication();
 80         }
 81         if (module == null) {
 82             module = consumer.getModule();
 83         }
 84         if (registries == null) {
 85             registries = consumer.getRegistries();
 86         }
 87         if (monitor == null) {
 88             monitor = consumer.getMonitor();
 89         }
 90     }
 91     if (module != null) {
 92         if (registries == null) {
 93             registries = module.getRegistries();
 94         }
 95         if (monitor == null) {
 96             monitor = module.getMonitor();
 97         }
 98     }
 99     if (application != null) {
100         if (registries == null) {
101             registries = application.getRegistries();
102         }
103         if (monitor == null) {
104             monitor = application.getMonitor();
105         }
106     }
107     
108     // 檢測 Application 合法性
109     checkApplication();
110     // 檢測本地存根配置合法性
111     checkStubAndMock(interfaceClass);
112     
113     // -------------------------------分割線3------------------------------
114     
115     Map<String, String> map = new HashMap<String, String>();
116     Map<Object, Object> attributes = new HashMap<Object, Object>();
117 
118     // 添加 side、協議版本信息、時間戳和進程號等信息到 map 中
119     map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
120     map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
121     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
122     if (ConfigUtils.getPid() > 0) {
123         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
124     }
125 
126     // 非泛化服務
127     if (!isGeneric()) {
128         // 獲取版本
129         String revision = Version.getVersion(interfaceClass, version);
130         if (revision != null && revision.length() > 0) {
131             map.put("revision", revision);
132         }
133 
134         // 獲取接口方法列表,並添加到 map 中
135         String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
136         if (methods.length == 0) {
137             map.put("methods", Constants.ANY_VALUE);
138         } else {
139             map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
140         }
141     }
142     map.put(Constants.INTERFACE_KEY, interfaceName);
143     // 將 ApplicationConfig、ConsumerConfig、ReferenceConfig 等對象的字段信息添加到 map 中
144     appendParameters(map, application);
145     appendParameters(map, module);
146     appendParameters(map, consumer, Constants.DEFAULT_KEY);
147     appendParameters(map, this);
148     
149     // -------------------------------分割線4------------------------------
150     
151     String prefix = StringUtils.getServiceKey(map);
152     if (methods != null && !methods.isEmpty()) {
153         // 遍歷 MethodConfig 列表
154         for (MethodConfig method : methods) {
155             appendParameters(map, method, method.getName());
156             String retryKey = method.getName() + ".retry";
157             // 檢測 map 是否包含 methodName.retry
158             if (map.containsKey(retryKey)) {
159                 String retryValue = map.remove(retryKey);
160                 if ("false".equals(retryValue)) {
161                     // 添加劇試次數配置 methodName.retries
162                     map.put(method.getName() + ".retries", "0");
163                 }
164             }
165  
166             // 添加 MethodConfig 中的「屬性」字段到 attributes
167             // 好比 onreturn、onthrow、oninvoke 等
168             appendAttributes(attributes, method, prefix + "." + method.getName());
169             checkAndConvertImplicitConfig(method, map, attributes);
170         }
171     }
172     
173     // -------------------------------✨ 分割線5 ✨------------------------------
174 
175     // 獲取服務消費者 ip 地址
176     String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
177     if (hostToRegistry == null || hostToRegistry.length() == 0) {
178         hostToRegistry = NetUtils.getLocalHost();
179     } else if (isInvalidLocalHost(hostToRegistry)) {
180         throw new IllegalArgumentException("Specified invalid registry ip from property..." );
181     }
182     map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
183 
184     // 存儲 attributes 到系統上下文中
185     StaticContext.getSystemContext().putAll(attributes);
186 
187     // 建立代理類
188     ref = createProxy(map);
189 
190     // 根據服務名,ReferenceConfig,代理類構建 ConsumerModel,
191     // 並將 ConsumerModel 存入到 ApplicationModel 中
192     ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
193     ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
194 }
View Code

  ReferenceConfig 的 createProxy 建立代理對象:

    可是不是在這個方法內建立proxy實例,而是對URL進行解析後分三種建立Invoker線路,包括InjvmProtocol中的refer、DubboProtocol的refer與RegistryProtocol中的refer,最後再調用ProxyFactory來對proxy實例進行建立:

  1 private T createProxy(Map<String, String> map) {
  2     URL tmpUrl = new URL("temp", "localhost", 0, map);
  3     final boolean isJvmRefer;
  4     if (isInjvm() == null) {
  5         // url 配置被指定,則不作本地引用
  6         if (url != null && url.length() > 0) {
  7             isJvmRefer = false;
  8         // 根據 url 的協議、scope 以及 injvm 等參數檢測是否須要本地引用
  9         // 好比若是用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true
 10         } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
 11             isJvmRefer = true;
 12         } else {
 13             isJvmRefer = false;
 14         }
 15     } else {
 16         // 獲取 injvm 配置值
 17         isJvmRefer = isInjvm().booleanValue();
 18     }
 19 
 20     // 本地引用
 21     if (isJvmRefer) {
 22         // 生成本地引用 URL,協議爲 injvm
 23         URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
 24         // 調用 refer 方法構建 InjvmInvoker 實例
 25         invoker = refprotocol.refer(interfaceClass, url);
 26         
 27     // 遠程引用
 28     } else {
 29         // url 不爲空,代表用戶可能想進行點對點調用
 30         if (url != null && url.length() > 0) {
 31             // 當須要配置多個 url 時,可用分號進行分割,這裏會進行切分
 32             String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
 33             if (us != null && us.length > 0) {
 34                 for (String u : us) {
 35                     URL url = URL.valueOf(u);
 36                     if (url.getPath() == null || url.getPath().length() == 0) {
 37                         // 設置接口全限定名爲 url 路徑
 38                         url = url.setPath(interfaceName);
 39                     }
 40                     
 41                     // 檢測 url 協議是否爲 registry,如果,代表用戶想使用指定的註冊中心
 42                     if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
 43                         // 將 map 轉換爲查詢字符串,並做爲 refer 參數的值添加到 url 中
 44                         urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
 45                     } else {
 46                         // 合併 url,移除服務提供者的一些配置(這些配置來源於用戶配置的 url 屬性),
 47                         // 好比線程池相關配置。並保留服務提供者的部分配置,好比版本,group,時間戳等
 48                         // 最後將合併後的配置設置爲 url 查詢字符串中。
 49                         urls.add(ClusterUtils.mergeUrl(url, map));
 50                     }
 51                 }
 52             }
 53         } else {
 54             // 加載註冊中心 url
 55             List<URL> us = loadRegistries(false);
 56             if (us != null && !us.isEmpty()) {
 57                 for (URL u : us) {
 58                     URL monitorUrl = loadMonitor(u);
 59                     if (monitorUrl != null) {
 60                         map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
 61                     }
 62                     // 添加 refer 參數到 url 中,並將 url 添加到 urls 中
 63                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
 64                 }
 65             }
 66 
 67             // 未配置註冊中心,拋出異常
 68             if (urls.isEmpty()) {
 69                 throw new IllegalStateException("No such any registry to reference...");
 70             }
 71         }
 72 
 73         // 單個註冊中心或服務提供者(服務直連,下同)
 74         if (urls.size() == 1) {
 75             // 調用 RegistryProtocol 的 refer 構建 Invoker 實例
 76             invoker = refprotocol.refer(interfaceClass, urls.get(0));
 77             
 78         // 多個註冊中心或多個服務提供者,或者二者混合
 79         } else {
 80             List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
 81             URL registryURL = null;
 82 
 83             // 獲取全部的 Invoker
 84             for (URL url : urls) {
 85                 // 經過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時
 86                 // 根據 url 協議頭加載指定的 Protocol 實例,並調用實例的 refer 方法
 87                 invokers.add(refprotocol.refer(interfaceClass, url));
 88                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
 89                     registryURL = url;
 90                 }
 91             }
 92             if (registryURL != null) {
 93                 // 若是註冊中心連接不爲空,則將使用 AvailableCluster
 94                 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
 95                 // 建立 StaticDirectory 實例,並由 Cluster 對多個 Invoker 進行合併
 96                 invoker = cluster.join(new StaticDirectory(u, invokers));
 97             } else {
 98                 invoker = cluster.join(new StaticDirectory(invokers));
 99             }
100         }
101     }
102 
103     Boolean c = check;
104     if (c == null && consumer != null) {
105         c = consumer.isCheck();
106     }
107     if (c == null) {
108         c = true;
109     }
110     
111     // invoker 可用性檢查
112     if (c && !invoker.isAvailable()) {
113         throw new IllegalStateException("No provider available for the service...");
114     }
115 
116     // 生成代理類
117     return (T) proxyFactory.getProxy(invoker);
118 }
View Code

    一樣Invoker的建立後面會專門拿一篇來說。暫時先把Invoker建立當作一個黑盒,只要咱們調用便可。

  RegistryProtocol中的refer:

 1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
 2     // 取 registry 參數值,並將其設置爲協議頭
 3     url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
 4     // 獲取註冊中心實例
 5     Registry registry = registryFactory.getRegistry(url);
 6     if (RegistryService.class.equals(type)) {
 7         return proxyFactory.getInvoker((T) registry, type, url);
 8     }
 9 
10     // 將 url 查詢字符串轉爲 Map
11     Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
12     // 獲取 group 配置
13     String group = qs.get(Constants.GROUP_KEY);
14     if (group != null && group.length() > 0) {
15         if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
16                 || "*".equals(group)) {
17             // 經過 SPI 加載 MergeableCluster 實例,並調用 doRefer 繼續執行服務引用邏輯
18             return doRefer(getMergeableCluster(), registry, type, url);
19         }
20     }
21     
22     // 調用 doRefer 繼續執行服務引用邏輯
23     return doRefer(cluster, registry, type, url);
24 }
25 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
26     // 建立 RegistryDirectory 實例
27     RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
28     // 設置註冊中心和協議
29     directory.setRegistry(registry);
30     directory.setProtocol(protocol);
31     Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
32     // 生成服務消費者連接
33     URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
34 
35     // 註冊服務消費者,在 consumers 目錄下新節點
36     if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
37             && url.getParameter(Constants.REGISTER_KEY, true)) {
38         registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
39                 Constants.CHECK_KEY, String.valueOf(false)));
40     }
41 
42     // 訂閱 providers、configurators、routers 等節點數據
43     directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
44             Constants.PROVIDERS_CATEGORY
45                     + "," + Constants.CONFIGURATORS_CATEGORY
46                     + "," + Constants.ROUTERS_CATEGORY));
47 
48     // 一個註冊中心可能有多個服務提供者,所以這裏須要將多個服務提供者合併爲一個
49     Invoker invoker = cluster.join(directory);
50     ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
51     return invoker;
52 }
View Code

    在Invoker建立完後會返回到ReferenceConfig中,而後進入ProxyFactory中的getProxy方法。

  ProxyFactory中的getProxy方法:

 1 public <T> T getProxy(Invoker<T> invoker) throws RpcException {
 2     // 調用重載方法
 3     return getProxy(invoker, false);
 4 }
 5 
 6 public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
 7     Class<?>[] interfaces = null;
 8     // 獲取接口列表
 9     String config = invoker.getUrl().getParameter("interfaces");
10     if (config != null && config.length() > 0) {
11         // 切分接口列表
12         String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
13         if (types != null && types.length > 0) {
14             interfaces = new Class<?>[types.length + 2];
15             // 設置服務接口類和 EchoService.class 到 interfaces 中
16             interfaces[0] = invoker.getInterface();
17             interfaces[1] = EchoService.class;
18             for (int i = 0; i < types.length; i++) {
19                 // 加載接口類
20                 interfaces[i + 1] = ReflectUtils.forName(types[i]);
21             }
22         }
23     }
24     if (interfaces == null) {
25         interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
26     }
27 
28     // 爲 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827
29     if (!invoker.getInterface().equals(GenericService.class) && generic) {
30         int len = interfaces.length;
31         Class<?>[] temp = interfaces;
32         // 建立新的 interfaces 數組
33         interfaces = new Class<?>[len + 1];
34         System.arraycopy(temp, 0, interfaces, 0, len);
35         // 設置 GenericService.class 到數組中
36         interfaces[len] = GenericService.class;
37     }
38 
39     // 調用重載方法
40     return getProxy(invoker, interfaces);
41 }
42 
43 public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
View Code

    在上面的代碼主要是獲取接口數組再經過抽象的getProxy進入到Proxy中的getProxy。

1 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
2     // 生成 Proxy 子類(Proxy 是抽象類)。並調用 Proxy 子類的 newInstance 方法建立 Proxy 實例
3     return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
4 }
View Code

    代碼具體分析上面有寫。接下來進入到Proxy子類的getProxy中

  1 public static Proxy getProxy(Class<?>... ics) {
  2     // 調用重載方法
  3     return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
  4 }
  5 
  6 public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
  7     if (ics.length > 65535)
  8         throw new IllegalArgumentException("interface limit exceeded");
  9 
 10     StringBuilder sb = new StringBuilder();
 11     // 遍歷接口列表
 12     for (int i = 0; i < ics.length; i++) {
 13         String itf = ics[i].getName();
 14         // 檢測類型是否爲接口
 15         if (!ics[i].isInterface())
 16             throw new RuntimeException(itf + " is not a interface.");
 17 
 18         Class<?> tmp = null;
 19         try {
 20             // 從新加載接口類
 21             tmp = Class.forName(itf, false, cl);
 22         } catch (ClassNotFoundException e) {
 23         }
 24 
 25         // 檢測接口是否相同,這裏 tmp 有可能爲空
 26         if (tmp != ics[i])
 27             throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
 28 
 29         // 拼接接口全限定名,分隔符爲 ;
 30         sb.append(itf).append(';');
 31     }
 32 
 33     // 使用拼接後的接口名做爲 key
 34     String key = sb.toString();
 35 
 36     Map<String, Object> cache;
 37     synchronized (ProxyCacheMap) {
 38         cache = ProxyCacheMap.get(cl);
 39         if (cache == null) {
 40             cache = new HashMap<String, Object>();
 41             ProxyCacheMap.put(cl, cache);
 42         }
 43     }
 44 
 45     Proxy proxy = null;
 46     synchronized (cache) {
 47         do {
 48             // 從緩存中獲取 Reference<Proxy> 實例
 49             Object value = cache.get(key);
 50             if (value instanceof Reference<?>) {
 51                 proxy = (Proxy) ((Reference<?>) value).get();
 52                 if (proxy != null) {
 53                     return proxy;
 54                 }
 55             }
 56 
 57             // 併發控制,保證只有一個線程能夠進行後續操做
 58             if (value == PendingGenerationMarker) {
 59                 try {
 60                     // 其餘線程在此處進行等待
 61                     cache.wait();
 62                 } catch (InterruptedException e) {
 63                 }
 64             } else {
 65                 // 放置標誌位到緩存中,並跳出 while 循環進行後續操做
 66                 cache.put(key, PendingGenerationMarker);
 67                 break;
 68             }
 69         }
 70         while (true);
 71     }
 72 
 73     long id = PROXY_CLASS_COUNTER.getAndIncrement();
 74     String pkg = null;
 75     ClassGenerator ccp = null, ccm = null;
 76     try {
 77         // 建立 ClassGenerator 對象
 78         ccp = ClassGenerator.newInstance(cl);
 79 
 80         Set<String> worked = new HashSet<String>();
 81         List<Method> methods = new ArrayList<Method>();
 82 
 83         for (int i = 0; i < ics.length; i++) {
 84             // 檢測接口訪問級別是否爲 protected 或 privete
 85             if (!Modifier.isPublic(ics[i].getModifiers())) {
 86                 // 獲取接口包名
 87                 String npkg = ics[i].getPackage().getName();
 88                 if (pkg == null) {
 89                     pkg = npkg;
 90                 } else {
 91                     if (!pkg.equals(npkg))
 92                         // 非 public 級別的接口必須在同一個包下,否者拋出異常
 93                         throw new IllegalArgumentException("non-public interfaces from different packages");
 94                 }
 95             }
 96             
 97             // 添加接口到 ClassGenerator 中
 98             ccp.addInterface(ics[i]);
 99 
100             // 遍歷接口方法
101             for (Method method : ics[i].getMethods()) {
102                 // 獲取方法描述,可理解爲方法簽名
103                 String desc = ReflectUtils.getDesc(method);
104                 // 若是方法描述字符串已在 worked 中,則忽略。考慮這種狀況,
105                 // A 接口和 B 接口中包含一個徹底相同的方法
106                 if (worked.contains(desc))
107                     continue;
108                 worked.add(desc);
109 
110                 int ix = methods.size();
111                 // 獲取方法返回值類型
112                 Class<?> rt = method.getReturnType();
113                 // 獲取參數列表
114                 Class<?>[] pts = method.getParameterTypes();
115 
116                 // 生成 Object[] args = new Object[1...N]
117                 StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
118                 for (int j = 0; j < pts.length; j++)
119                     // 生成 args[1...N] = ($w)$1...N;
120                     code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
121                 // 生成 InvokerHandler 接口的 invoker 方法調用語句,以下:
122                 // Object ret = handler.invoke(this, methods[1...N], args);
123                 code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
124 
125                 // 返回值不爲 void
126                 if (!Void.TYPE.equals(rt))
127                     // 生成返回語句,形如 return (java.lang.String) ret;
128                     code.append(" return ").append(asArgument(rt, "ret")).append(";");
129 
130                 methods.add(method);
131                 // 添加方法名、訪問控制符、參數列表、方法代碼等信息到 ClassGenerator 中 
132                 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
133             }
134         }
135 
136         if (pkg == null)
137             pkg = PACKAGE_NAME;
138 
139         // 構建接口代理類名稱:pkg + ".proxy" + id,好比 org.apache.dubbo.proxy0
140         String pcn = pkg + ".proxy" + id;
141         ccp.setClassName(pcn);
142         ccp.addField("public static java.lang.reflect.Method[] methods;");
143         // 生成 private java.lang.reflect.InvocationHandler handler;
144         ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
145 
146         // 爲接口代理類添加帶有 InvocationHandler 參數的構造方法,好比:
147         // porxy0(java.lang.reflect.InvocationHandler arg0) {
148         //     handler=$1;
149         // }
150         ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
151         // 爲接口代理類添加默認構造方法
152         ccp.addDefaultConstructor();
153         
154         // 生成接口代理類
155         Class<?> clazz = ccp.toClass();
156         clazz.getField("methods").set(null, methods.toArray(new Method[0]));
157 
158         // 構建 Proxy 子類名稱,好比 Proxy1,Proxy2 等
159         String fcn = Proxy.class.getName() + id;
160         ccm = ClassGenerator.newInstance(cl);
161         ccm.setClassName(fcn);
162         ccm.addDefaultConstructor();
163         ccm.setSuperClass(Proxy.class);
164         // 爲 Proxy 的抽象方法 newInstance 生成實現代碼,形如:
165         // public Object newInstance(java.lang.reflect.InvocationHandler h) { 
166         //     return new org.apache.dubbo.proxy0($1);
167         // }
168         ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
169         // 生成 Proxy 實現類
170         Class<?> pc = ccm.toClass();
171         // 經過反射建立 Proxy 實例
172         proxy = (Proxy) pc.newInstance();
173     } catch (RuntimeException e) {
174         throw e;
175     } catch (Exception e) {
176         throw new RuntimeException(e.getMessage(), e);
177     } finally {
178         if (ccp != null)
179             // 釋放資源
180             ccp.release();
181         if (ccm != null)
182             ccm.release();
183         synchronized (cache) {
184             if (proxy == null)
185                 cache.remove(key);
186             else
187                 // 寫緩存
188                 cache.put(key, new WeakReference<Proxy>(proxy));
189             // 喚醒其餘等待線程
190             cache.notifyAll();
191         }
192     }
193     return proxy;
194 }
View Code

    ccp 用於爲服務接口生成代理類,好比咱們有一個 DemoService 接口,這個接口代理類就是由 ccp 生成的。ccm 則是用於爲 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現 Proxy 類的抽象方法。

    這裏須要重點講一下,由於用到了併發控制。機制是這樣的,synchronized中首先獲取緩存中 Reference<Proxy> 實例。由於緩存是HashMap結構來存取。key是Reference<Proxy> 實例對應的接口名稱,value就是Reference<Proxy> 實例,注意的是接口列表進行拼接了。當第一個線程進入時,key對應的是實例而不是PendingGenerationMarker。因此會進入到else中,else中則設置key的對應的value爲標誌位PendingGenerationMarker。這樣其餘線程只能等待,然後對服務接口生產代理類和抽象類的子類。在最後釋放資源時,會喚醒其餘線程,而且把已經生成過的Reference實例標誌成弱引用對象,表明能夠回收了。(注:弱引用,比軟引用更弱一點,被弱引用關聯的對象只能生存到下一次垃圾收集發生以前。當垃圾收集發生時不管內存是否足夠,都會回收弱引用對象。具體能夠看JVM垃圾回收算法

4、總結:

  到這裏應該算是講完了Dubbo內的服務引用機制。對於Invoker後面會再單獨講。這裏還要補充一句若是是集羣的話會啓動服務降級以及負載均衡每次只選擇一個Invoker調用,一樣這個後面會再作單獨介紹。

相關文章
相關標籤/搜索