public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref; }
下面來重點看看init方法的源碼,追蹤一下服務引用是如何建立的,方法比較長,有較長的篇幅都在進行參數的校驗、補全,須要點耐心看完。java
private void init() { //檢查初始化標識,防止重複初始化 if (initialized) { return; } initialized = true; //檢查接口名是否合法 if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:reference interface=\"\"... not allow null!"); } //檢查ConsumerConfig變量是否爲空(ConsumerConfig爲ReferenceConfig提供了某些屬性的默認值): //(1)若是ConsumerConfig爲null,則new一個; //(2)調用appendProperties(AbstractConfig config)方法完善ConsumerConfig的配置; checkDefault(); //調用appendProperties(AbstractConfig config)方法完善ReferenceConfig的配置,該方法邏輯以下: //(1)檢查AbstractConfig中每個setXXX(原始類型)或isXXX(原始類型)的方法,對XXX屬性進行配置; //(2)按優先級從高到低的順序,依次從System.getProperty、配置中心、AbstractConfig對應getXXX返回值、 //dubbo本地配置文件中進行查找XXX的屬性值並進行設置; appendProperties(this); //設置成員變量「泛化引用標識」,若是爲空則從成員變量ConsumerConfig中獲取該標識的值 if (getGeneric() == null && getConsumer() != null) { setGeneric(getConsumer().getGeneric()); } //判斷泛化標識的值是否爲真,作這個判斷的緣由是由於泛化標識爲字符串類型 if (ProtocolUtils.isGeneric(getGeneric())) { //若是爲真,則將interfaceClass設置爲GenericService interfaceClass = GenericService.class; } else { //若是爲假,則經過當前的類加載器加載interfaceName,獲取interfaceClass try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } //(1)校驗interfaceClass是否爲null、是否爲接口類型; //(2)若是配置了List<MethodConfig>,須要校驗interfaceClass是否有相應的方法 checkInterfaceAndMethods(interfaceClass, methods); } /****************************** begin ******************************/ //下面代碼塊的做用是嘗試從系統屬性或配置文件中獲取interfaceName的配置, //該配置值賦給成員變量String url,用於服務消費方點對點調用服務提供方。 String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); } finally { try { if (null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { url = resolve; //省略了日誌打印的代碼 } /****************************** end ******************************/ /****************************** begin ******************************/ //下面代碼塊的做用是檢測ApplicationConfig、ModuleConfig、RegistryConfig、MonitorConfig //這幾個核心配置是否爲空。若是爲空,則嘗試從其餘配置中獲取。 if (consumer != null) { if (application == null) { application = consumer.getApplication(); } if (module == null) { module = consumer.getModule(); } if (registries == null) { registries = consumer.getRegistries(); } if (monitor == null) { monitor = consumer.getMonitor(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } //相似於checkDefault方法檢查ConsumerConfig,該方法檢查ApplicationConfig是否爲空並完善其各字段值 checkApplication(); //檢查ReferenceConfig的local、stub、mock配置項是否正確 checkStubAndMock(interfaceClass); /****************************** end ******************************/ /****************************** start ******************************/ //下面代碼塊的做用是收集配置,並將配置存儲在一個map中 Map<String, String> map = new HashMap<String, String>(); Map<Object, Object> attributes = new HashMap<Object, Object>(); map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); //side=consumer map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); //dubbo=2.6.2 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); //timestamp=時間戳 if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); //pid=進程pid } //判斷是不是泛化引用 if (!isGeneric()) { //非泛化引用,設置revision=interfaceClass的jar版本號 String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } //設置接口方法,methods=xxx1,xxx2,... String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } map.put(Constants.INTERFACE_KEY, interfaceName); //interface=interfaceName //將ApplicationConfig、ModuleConfig、ConsumerConfig、ReferenceConfig的值設置到map中, //(1)獲取方法名爲getXXX或者isXXX、public、無方法參數、返回值爲原始值的非getClass方法; //(2)獲取(1)方法上的@Paramter註解,根據該註解的excluded、escaped、append屬性判斷該屬性值 //是否須要被忽略、是否須要URLEncoded、是否須要以追加的形式設置入map(","做爲追加值分隔符); //(3)獲取方法名爲getParameters、public、無方法參數、返回值爲Map的方法; //(4)將(3)中的方法返回值Map的key-value鍵值對作key處理以後(添加前綴、"-"變"."),設置入map; appendParameters(map, application); appendParameters(map, module); appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, this); /****************************** end ******************************/ /****************************** start ******************************/ //下面代碼塊的做用是處理MethodConfig 實例。該實例包含了事件通知配置如onreturn、onthrow、oninvoke等。 //因爲通常不會使用到MethodConfig配置,咱們先暫時忽略這個配置的代碼 String prefix = StringUtils.getServiceKey(map); if (methods != null && !methods.isEmpty()) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } appendAttributes(attributes, method, prefix + "." + method.getName()); checkAndConvertImplicitConfig(method, map, attributes); } } /****************************** end ******************************/ /****************************** start ******************************/ //下面代碼塊的做用是設置服務消費者的IP並存儲到map中 String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); if (hostToRegistry == null || hostToRegistry.length() == 0) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property ... value: ..."); } map.put(Constants.REGISTER_IP_KEY, hostToRegistry); //register.ip=實際的IP地址 /****************************** end ******************************/ //將attributes存入靜態上下文 StaticContext.getSystemContext().putAll(attributes); //根據map建立服務應用代理,下一小節將對該方法進行詳細說明 ref = createProxy(map); //將服務接口名、ReferenceConfig、服務引用實例、服務接口方法包裝成ConsumerModel並存入ApplicationModel ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); }
private T createProxy(Map<String, String> map) { //構建臨時的URL,構造方法參數依次爲protocol、host、port、parameter URL tmpUrl = new URL("temp", "localhost", 0, map); //判斷是不是JVM內的引用 final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { //點對點直連參數"url"有值,則不爲JVM內引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { //調用InjvmProtocol的isInjvmRefer方法,判斷是不是JVM內引用 isJvmRefer = true; } else { //默認不是JVM內引用 isJvmRefer = false; } } else { //獲取injvm配置值 isJvmRefer = isInjvm().booleanValue(); } /******************************* injvm調用 *******************************/ if (isJvmRefer) { //若是是JVM內的引用,則建立JVM調用的上下文URL,protocol=injvm,host=127.0.0.1, //port=0,path=interfaceClass.getName(),而且將參數map設置入URL的parameters中 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); //refprotocol爲SPI接口Protocol的自適應擴展類,且refer方法有@Adaptive註解, //運用SPI機制源碼分析中的知識,Protocol接口的自適應擴展類的refer代碼, //會經過調用URL類型參數的getProtocol方法獲得實際應該獲取到的擴展類name,即injvm。 //在源碼的dubbo-rpc-injvm模塊下,找到protocol的配置文件, //其中配置了injvm的擴展類爲org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol。 //那麼這裏獲取到的invoker即爲InjvmProtocol.refer的返回結果,即InjvmInvoker invoker = refprotocol.refer(interfaceClass, url); } else { /******************************* 點對點直連調用 *******************************/ //若是成員變量url不爲空,表示要作直連調用。url是一個String,維護服務提供者地址 if (url != null && url.length() > 0) { //使用";"切分url字符串,表示若是想傳入多個地址,使用";"分割便可 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); //進行遍歷 if (us != null && us.length > 0) { for (String u : us) { //解析每一個地址字符串,將其轉換爲URL對象。地址字符串支持協議頭、驗證用戶密碼、IP、PORT、 //等其餘調用參數(如protocol) URL url = URL.valueOf(u); //若是地址字符串中未包含服務路徑,則進行補全,即接口的全限定名 if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } //檢測url協議是否爲registry,如果,代表用戶想使用指定的註冊中心 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { //將map轉換爲查詢字符串,並做爲refer參數的值添加到url的map字段中, //注意:這裏經過字符串生成的url還沒維護該方法的傳入參數map。 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { //合併用戶經過直連地址字符串配置的調用參數與其餘途徑配置的調用參數: //(1)移除服務提供者的一些配置(這些配置來源於用戶配置的url屬性),如線程池相關配置; //(2)保留服務提供者的部分配置,好比版本,group,時間戳等; //(3)最後將合併後的配置設置爲url查詢字符串中。 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { /******************************* 經過註冊中心調用 *******************************/ //加載註冊中心配置List<Registry>,轉換爲List<URL> List<URL> us = loadRegistries(false); //遍歷註冊中心URL if (us != null && !us.isEmpty()) { for (URL u : us) { //獲取監控URL URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } //(1)將本方法的請求參數map轉換爲查詢字符串形式"key1=value1&key2=value2..."; //(2)以refer做爲key,(1)的結果encoded做爲value,存入註冊中心URL的paramters參數中; //(3)將註冊中心URL存入ReferenceConfig的全局變量List<URL> urls中; urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference ..."); } } //若是成員變量List<URL> urls大小爲1,則直接經過Protocol自適應拓展類構建Invoker實例接口。 //該自適應擴展類由字節碼技術生成,其refer方法具備@Adaptive註解,根據SPI機制的源碼知識, //refer方法按照參數URL中getProtocol的值查找實際的擴展類實例,若是getProtocol沒有值, //則取Protocol接口的@SPI註解value值"dubbo"做爲name查找擴展類實例。通常來講,若是經過註冊 //中心進行調用,則getProtocol獲取到的值爲registry,對應RegistryProtocol這個擴展類;而若是 //直連調用,getProtocol爲空或者是指定的協議(通常爲dubbo協議),對應擴展類DubboProtocol。 if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // use AvailableCluster only when register's cluster is available URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url invoker = cluster.join(new StaticDirectory(invokers)); } } } //校驗標識 Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } //檢查invoker是否可用,最終調用的是Curator客戶端的getZookeeperClient().isConnected()方法 if (c && !invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service ..."); } //...省略日誌打印代碼 //建立代理對象,proxyFactory是SPI接口ProxyFactory的自適應擴展類,經過ProxyFactory的定義可知, //在未設置URL的proxy屬性時,獲取到默認的擴展類JavassistProxyFactory,可是ProxyFactory接口擁有 //一個包裝擴展類StubProxyFactoryWrapper,所以實際獲取到的是StubProxyFactoryWrapper實例,並調用 //它的getProxy方法 return (T) proxyFactory.getProxy(invoker); }
/** * 獲取服務引用的I */ public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { //從url的parameters字段中獲取key=registry即註冊中心的協議頭,如zookeeper, //在將其設置爲protocol,以後移除paramters字段中registry這個key url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); //在獲取到RegistryProtocol這個擴展類實例時,dubbo SPI機制會自動裝配它的RegistryFactory字段, //RegistryFactory是一個SPI接口,則實際裝配的是RegistryFactory的自適應擴展類。 //另外,RegistryFactory的getRegistry方法被@Adaptive註解,且註解的value值爲"protocol", //所以RegistryFactory自定義擴展類會調用方法參數URL的getProtocol,以其返回值做爲實際擴展類的name。 //通常咱們使用的註冊中心爲zookeeper,那麼最終會調用到ZookeeperRegistryFactory的getRegistry方法。 Registry registry = registryFactory.getRegistry(url); //若是要獲取的服務引用爲RegistryService,直接調用proxyFactory的getInvoker方法獲取Invoker if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } //將url中paremeters的key=refer的value查詢字符串從新轉爲Map Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); //獲取group配置 String group = qs.get(Constants.GROUP_KEY); //若group不爲空,而且有多個分組 if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { //經過SPI加載MergeableCluster實例,並調用doRefer繼續執行獲取服務引用邏輯 return doRefer(getMergeableCluster(), registry, type, url); } } //單group或無group,使用RegistryProtocol默認裝配的Cluster自適應擴展類調用doRefer方法 return doRefer(cluster, registry, type, url); } /** * 建立Invoker對象 */ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { //建立RegistryDirectory服務目錄,服務目錄相關內容參考下一節,注意每次走到doRefer都會new。 //維度是某個服務的(type)的在某個註冊中心(URL)的服務目錄 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); //添加註冊中心屬性 directory.setProtocol(protocol); //添加協議自適應擴展類 //生成服務消費者URL,protocol=consumer,ip=register.ip的值,port=0,path=type的全限定名,格式: //consumer://192.168.54.1/com.alibaba.dubbo.rpc.service.GenericService? //application=monitor-app&check=false&dubbo=2.6.2&generic=true& //interface=com.bestpay.monitor.app.AlarmTestService&pid=6332& //protocol=dubbo&retries=-1&side=consumer&timeout=10000×tamp=1561427241838 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); //使用服務消費者URL進行註冊 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { //調用Registry的register方法,通常爲ZookeeperRegistry,它調用FailRegistry的register方法(見下) registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false))); } //向subscribeUrl的parameter參數添加鍵值對"category" -> "providers,configurators,routers" //訂閱服務提供方的zkNode下的providers、configurators、routers等節點數據 //(1)將RegistryDirectory中的consumerUrl設置爲subscribeUrl; //(2)調用Registry的subscribe方法,該方法: //(2.1)調用父類AbstractRegistry方法,向成員變量subscribed設置值; //(2.2)移除failedSubscribed、failedUnsubscribed、failedNotified該subscribeUrl相關數據 //(3)若是訂閱失敗,則嘗試從ZookeeperRegistry初始化時從緩存文件讀取到的數據中獲取到URLs, //且若是URLs不爲空,則向它們發送訂閱失敗的通知;若是爲空,且check=true,則直接拋出異常; //不然將url加入failedSubscribed directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); //調用SPI接口Cluster的自適應擴展類的join,根據Cluster定義能夠知道自適應擴展類 //應該獲取一個FailoverCluster實例,可是MockClusterWrapper是Cluster擴展類中的包裝類, //所以FailoverCluster會被包裝起來返回,最終自適應擴展類獲取到MockClusterWrapper實例。 //調用擴展類MockClusterWrapper的join方法,該方法建立了一個MockClusterInvoker實例, //並維護了directory以及一個FailoverClusterInvoker。 Invoker invoker = cluster.join(directory); //將服務引用invoker、registryUrl、consumerUrl、服務目錄directory包裝成ConsumerInvokerWrapper, //而後以serviceUniqueName = consumerUrl.getServiceKey()作爲key,存入 //在ProviderConsumerRegTable中的static變量ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
//dubbo的SPI機制決定了Dubbo運行過程當中同一個擴展類實例只有一個,ZookeeperRegistryFactory中具備一個註冊中心的緩 //存,key爲"zookeeper://172.17.45.14:2181/com.alibaba.dubbo.registry.RegistryService",即以下格式 //"protocol://username:password@ip:port/com.alibaba.dubbo.registry.RegistryService"。更多格式參考 //URL類的toServiceString方法 private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>(); //實例化擴展類ZookeeperRegistryFactory時,會經過SPI的注入ZookeeperTransporter的自適應擴展類 private ZookeeperTransporter zookeeperTransporter; /** * 獲取一個註冊中心封裝對象 */ public Registry getRegistry(URL url) { //克隆url,並將Path設置爲RegistryService的全限定名; //在URL的parameters參數中添加interface=RegistryService的全限定名鍵值對; //在URL的parameters參數中移除key=export、key=refer; url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); //獲取註冊中心Registry的緩存key String key = url.toServiceString(); LOCK.lock(); try { //從緩存中嘗試獲取 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //若是沒有獲取,則調用建立方法建立緩存 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } //存入緩存 REGISTRIES.put(key, registry); return registry; } finally { LOCK.unlock(); } } /** * 建立緩存,直接new一個ZookeeperRegistry對象 */ public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }
/** * ZookeeperRegistry的構造方法 */ public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { //調用父類FailbackRegistry的構造方法,方法見下 super(url); //判斷url中的host字段是否爲"0.0.0.0"或者爲表示anyhost的true if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } //獲取url中key爲group的參數值,若是未獲取到,則給默認值"dubbo" String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); //判斷group是否以"/"開頭,若是不是,則爲其添加"/"前綴 if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; //調用SPI接口ZookeeperTransporter的自適應擴展類,該擴展類爲dubbo經過字節碼技術生成。 //connect方法被@Adaptive註解修飾,而且要求從key爲client以及transporter中取值,若是 //這兩個參數沒有值,則擴展類name取@SPI註解的value值"curator",通常來講都是這個值。 //那麼自適應擴展類最終會調用CuratorZookeeperTransporter類的connect方法獲取Zk客戶端。 zkClient = zookeeperTransporter.connect(url); //向獲取到的zkClient設置監聽器 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { //若是是RECONNECTED狀態,則進行恢復 recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
FailbackRegistry爲ZookeeperRegistry的父類,定義了註冊中心的重試邏輯:apache
//定時任務線程池,用於週期性的執行重試任務 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); //定時任務線程執行結果 private final ScheduledFuture<?> retryFuture; //須要重試的註冊失敗的URL private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); //須要重試的註銷失敗的URL private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); //須要重試的訂閱失敗的URL private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); //須要重試的解除訂閱失敗的URL private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); //須要重試的通知失敗的URL private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); /** * FailbackRegistry構造函數 */ public FailbackRegistry(URL url) { //調用父類AbstractRegistry的構造方法,方法見下 super(url); //獲取重試周期,默認5000毫秒 int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); //建立一個定時線程池,以retryPeriod爲週期定時調用retry方法 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { //嘗試註冊失敗、解註冊失敗、訂閱失敗、解訂閱失敗、通知失敗的列表 retry(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
AbstractRegistry爲FailbackRegistry的父類,其定義了從緩存文件加載配置、緩存配置到文件、註冊、解註冊、訂閱、解除訂閱等Registry的主要功能。api
//註冊URL列表 private final Set<URL> registered = new ConcurrentHashSet<URL>(); //訂閱URL列表 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); //通知URL列表 private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>(); /** * AbstractRegistry構造函數 */ public AbstractRegistry(URL url) { //設置成員變量registryUrl=url setUrl(url); //獲取文件的同步保存標識 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); //獲取緩存文件名,格式爲"C:\Users\chenjunyi/.dubbo/dubbo-registry-monitor-app-172.17.45.14:2181.cache" String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache"); File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry store file ..."); } } } this.file = file; //讀取緩存的配置文件,將讀取結果存到Properties properties成員屬性中 loadProperties(); //拆分註冊中心URL中的address,將其backup的地址與主要地址拆成List<URL>,而後通知其監聽器 notify(url.getBackupUrls()); }
/** * 繼承自父類FailbackRegistry的register方法,用於註冊服務引用(消費者)的URL */ public void register(URL url) { //調用父類AbstractRegistry的register方法 super.register(url); failedRegistered.remove(url); //從註冊失敗列表中移除該url failedUnregistered.remove(url); //從註銷失敗列表中移除該url try { //勾起實際的註冊方法 doRegister(url); } catch (Exception e) { //若是拋出異常 Throwable t = e; //判斷check標識,從ZookeeperRegistry的registerUrl,即註冊中心URL的paramters獲取key=check的值, //從消費者URL,即url的parameters獲取key=check的值,以及獲取url的protocol。 //計算flag check的布爾值 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); //判斷異常類型是否爲SkipFailbackWrapperException boolean skipFailback = t instanceof SkipFailbackWrapperException; //若是都不該該跳過異常,則拋出異常,不然僅僅是打印異常 if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } //對於註冊失敗的消費者URL,添加到註冊失敗列表中Set<URL> failedRegistered failedRegistered.add(url); } } /** * 由ZookeeperRegistry實現的doRegister方法 */ protected void doRegister(URL url) { try { //調用CuratorZookeeperClient建立節點,以泛化調用爲例,傳入的參數URL格式爲 //consumer://192.168.54.1/com.alibaba.dubbo.rpc.service.GenericService? //application=monitor-app&category=consumers&check=false&dubbo=2.6.2& //generic=true&interface=com.bestpay.monitor.app.AlarmTestService&pid=19616& //protocol=dubbo&retries=-1&side=consumer&timeout=10000×tamp=1561429026207 //能夠看出,它的path路徑GenericService不必定等於interface //(1)toUrlPath(url)獲取須要建立節點路徑,以消費者爲例,其格式爲 //"/dubbo/com.bestpay.monitor.app.AlarmTestService/consumers/consumer%3A%2F%2F192.168.54.1%2F //com.alibaba.dubbo.rpc.service.GenericService%3Fapplication%3Dmonitor-app%26..." //能夠看出,真正的建立節點路徑是interface接口做爲path的路徑 //(2)url.getParameter(Constants.DYNAMIC_KEY, true)決定是否建立臨時節點,true-臨時節點。 //而CuratorZookeeperClient內部的create邏輯爲: //(1)截取示例中的"/dubbo/com.bestpay.monitor.app.AlarmTestService/consumers"做爲 //zkNode的父路徑並一級級建立父節點consumers,父節點都爲永久節點; //(2)根據第二個參數的值決定建立的每一個消費者節點(即截取父路徑後遺留的字符串)是否爲臨時節點, //true-臨時節點;false-永久節點; zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
/** * RegistryDirectory的subscribe方法 */ public void subscribe(URL url) { setConsumerUrl(url); //簡單的設置RegistryDirectory成員變量consumerUrl //勾起成員變量Registry的subscribe方法,並將自身做爲NotifyListener傳入 registry.subscribe(url, this); }
調起繼承自FailbackRegistry的subscribe方法:緩存
/** * 繼承自FailbackRegistry的subscribe方法 */ public void subscribe(URL url, NotifyListener listener) { //調用父類的訂閱方法, super.subscribe(url, listener); //從failedSubscribed、failedUnsubscribed、failedNotified列表中移除該url對應的listener removeFailedSubscribed(url, listener); try { //調用由ZookeeperRegistry實現的doSubscribe方法 doSubscribe(url, listener); } catch (Exception e) { //若是doSubscribe執行拋出異常 Throwable t = e; //從加載的文件緩存Properties中獲取url.getServiceKey對應的緩存 List<URL> urls = getCacheUrls(url); if (urls != null && !urls.isEmpty()) { //若是讀取到的緩存不爲空,則對該url進行通知,通知的內容是文件緩存的內容 notify(url, listener, urls); logger.error("Failed to subscribe ... Using cached list: ..."); } else { //獲取check標識,getUrl方法獲取的是ZookeeperRegistry維護的registryUrl, //而url指的是服務消費者的URL,從它們的parameters字段獲取check這個key的value; boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); //判斷異常類型 boolean skipFailback = t instanceof SkipFailbackWrapperException; //決定訂閱失敗的處理是繼續拋出異常仍是打印錯誤日誌 if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe ... cause: "); } else { logger.error("Failed to subscribe url ... waiting for retry ..."); } } //向成員變量 ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed添加訂閱失敗的數據 addFailedSubscribed(url, listener); } } /** * 由ZookeeperRegistry實現的doSubscribe方法 */ protected void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { //若是方法參數URL的paramters中key=interface爲*,暫時先不討論 String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { //方法參數URL的paramters中key=interface不爲* List<URL> urls = new ArrayList<URL>(); //經過toCategoriesPath方法生成要訂閱的ZK節點路徑,以interface=AlarmTestService爲例: //(1)/dubbo/com.bestpay.monitor.app.AlarmTestService/providers; //(2)/dubbo/com.bestpay.monitor.app.AlarmTestService/configurators; //(3)/dubbo/com.bestpay.monitor.app.AlarmTestService/routers; //而後遍歷這3個路徑 for (String path : toCategoriesPath(url)) { //獲取這個ZookeeperRegistry中的服務消費者URL對應的監聽器Map ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } //獲取NotifyListener,即RegistryDirectory對應的ChildListener。 //通常來講RegistryDirectory與註冊中心Registry和服務引用接口(如GenericService)綁定。 ChildListener zkListener = listeners.get(listener); if (zkListener == null) { //若是沒有ChildListener,則建立一個並設置到listeners這個map中 listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } //建立"XXXX/XXX/providers、configurators、providers"永久節點 zkClient.create(path, false); //向Dubbo封裝的ZkClient添加ChildListener List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
相似於register方法,FallbackRegistry首先調用父類AbstractRegistry的subscribe,在通過一系列的校驗以後,向成員變量ConcurrentMap<URL, Set<NotifyListener>> subscribed添加服務引用(即消費者)的URL和監聽器。併發
/** * AbstractRegistry的subscribe方法 */ public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } Set<NotifyListener> listeners = subscribed.get(url); if (listeners == null) { subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); listeners = subscribed.get(url); } listeners.add(listener); }
/** * 用於根據Curator客戶端推送的鏈接狀態,RECONNECTED進行恢復 */ protected void recover() throws Exception { //獲取恢復的註冊中心的地址URL列表Set<URL>,getRegistered()獲取成員變量Set<URL> Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } //將getRegistered()添加到成員變量Set<URL> failedRegistered中 for (URL url : recoverRegistered) { failedRegistered.add(url); } } //獲取恢復的訂閱列表Map<URL, Set<NotifyListener>>,getSubscribed()獲取成員變量subscribed Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>> (getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); //添加到成員變量failedSubscribed中 for (NotifyListener listener : entry.getValue()) { addFailedSubscribed(url, listener); } } } } /** * 對於成員變量failedRegistered、failedUnregistered、failedSubscribed、 * failedUnsubscribed、failedNotified進行重試,能夠看到,若是重試成功則將 * 其移出相應的重試列表,若是重試失敗,則忽略異常等待下次重試 */ protected void retry() { if (!failedRegistered.isEmpty()) { Set<URL> failed = new HashSet<URL>(failedRegistered); if (failed.size() > 0) { if (logger.isInfoEnabled()) { logger.info("Retry register " + failed); } try { for (URL url : failed) { try { doRegister(url); failedRegistered.remove(url); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry register ... waiting for again, cause: ..."); } } } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry register ... waiting for again, cause: ..."); } } } if (!failedUnregistered.isEmpty()) { Set<URL> failed = new HashSet<URL>(failedUnregistered); if (!failed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Retry unregister " + failed); } try { for (URL url : failed) { try { doUnregister(url); failedUnregistered.remove(url); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry unregister ... waiting for again, cause: ..."); } } } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry unregister ... waiting for again, cause: "); } } } if (!failedSubscribed.isEmpty()) { Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed); for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>> (failed).entrySet()) { if (entry.getValue() == null || entry.getValue().size() == 0) { failed.remove(entry.getKey()); } } if (failed.size() > 0) { if (logger.isInfoEnabled()) { logger.info("Retry subscribe " + failed); } try { for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { URL url = entry.getKey(); Set<NotifyListener> listeners = entry.getValue(); for (NotifyListener listener : listeners) { try { doSubscribe(url, listener); listeners.remove(listener); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry subscribe ... waiting for again, cause: ..."); } } } } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry subscribe ... waiting for again, cause: "); } } } if (!failedUnsubscribed.isEmpty()) { Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>> (failedUnsubscribed); for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>> (failed).entrySet()) { if (entry.getValue() == null || entry.getValue().isEmpty()) { failed.remove(entry.getKey()); } } if (failed.size() > 0) { if (logger.isInfoEnabled()) { logger.info("Retry unsubscribe " + failed); } try { for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { URL url = entry.getKey(); Set<NotifyListener> listeners = entry.getValue(); for (NotifyListener listener : listeners) { try { doUnsubscribe(url, listener); listeners.remove(listener); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry unsubscribe ... waiting for again, cause: .."); } } } } catch (Throwable t) { logger.warn("Failed to retry unsubscribe ... waiting for again, cause: ..."); } } } if (!failedNotified.isEmpty()) { Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified); for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) { if (entry.getValue() == null || entry.getValue().size() == 0) { failed.remove(entry.getKey()); } } if (failed.size() > 0) { if (logger.isInfoEnabled()) { logger.info("Retry notify " + failed); } try { for (Map<NotifyListener, List<URL>> values : failed.values()) { for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) { try { NotifyListener listener = entry.getKey(); List<URL> urls = entry.getValue(); listener.notify(urls); values.remove(listener); } catch (Throwable t) { logger.warn("Failed to retry notify ... waiting for again, cause: ..."); } } } } catch (Throwable t) { logger.warn("Failed to retry notify ... waiting for again, cause: ..."); } } } }
public ZookeeperClient connect(URL url) { return new CuratorZookeeperClient(url); }
而CuratorZookeeperClient是dubbo經過Curator封裝出來的zookeeper客戶端。它的構造函數經過Curator框架建立一個client,而且向該client添加一個鏈接狀態的監聽器。當有鏈接狀態改變時,會向CuratorZookeeperClient維護的StateListener調用stateChanged方法,傳入獲取到的狀態。app
public CuratorZookeeperClient(URL url) { super(url); try { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } client = builder.build(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState state) { //回調自身維護的監聽器List<StateListener> if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
/** * StubProxyFactoryWrapper的getProxy方法 */ public <T> T getProxy(Invoker<T> invoker) throws RpcException { //它首先調用包裝的JavassistProxyFactory的getProxy方法 T proxy = proxyFactory.getProxy(invoker); if (GenericService.class != invoker.getInterface()) { //若是設置了本地代理類,則將獲取到的Proxy包裝爲代理類對象 String stub = invoker.getUrl().getParameter( Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY)); if (ConfigUtils.isNotEmpty(stub)) { Class<?> serviceType = invoker.getInterface(); if (ConfigUtils.isDefault(stub)) { if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) { stub = serviceType.getName() + "Stub"; } else { stub = serviceType.getName() + "Local"; } } try { Class<?> stubClass = ReflectUtils.forName(stub); if (!serviceType.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class ..."); } try { Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType); proxy = (T) constructor.newInstance(new Object[]{proxy}); //export stub service URL url = invoker.getUrl(); if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) { url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join( Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ",")); url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString()); try { export(proxy, (Class) invoker.getInterface(), url); } catch (Exception e) { LOGGER.error("export a stub service error.", e); } } } catch (NoSuchMethodException e) { throw new IllegalStateException("No such constructor \"public ..."); } } catch (Throwable t) { LOGGER.error("Failed to create stub implementation class ..."); } } } return proxy; } /** * JavassistProxyFactory的getProxy方法,繼承自AbstractProxyFactory */ public <T> T getProxy(Invoker<T> invoker) throws RpcException { Class<?>[] interfaces = null; //獲取接口列表 String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { //切分接口列表 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { //設置服務接口類和EchoService.class到interfaces中 interfaces = new Class<?>[types.length + 2]; interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } //若是接口列表爲空,則設置它爲服務接口以及回聲測試接口 if (interfaces == null) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } return getProxy(invoker, interfaces); } /** * JavassistProxyFactory實現的getProxy方法,非繼承,獲取服務接口代理類對象 */ public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { //經過字節碼技術生成類Proxy的子類(Proxy是抽象類) //並調用Proxy子類的newInstance方法建立服務接口代理類實例,該實例維護一個InvokerInvocationHandler //代理類實例的每一個方法實現都會調用InvokerInvocationHandler的invoke方法,將服務接口的方法參數以及 //調用的方法反射對象Method傳入。InvokerInvocationHandler的invoke方法在一系列檢查後最終執行以下方法: //return invoker.invoke(new RpcInvocation(method, args)).recreate(); return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
//Proxy子類實例緩存 private static final Map<ClassLoader, Map<String, Object>> ProxyCacheMap = new WeakHashMap<ClassLoader, Map<String, Object>>(); /** * 生成Proxy子類,該子類封裝了生成服務接口代理類的邏輯 */ public static Proxy getProxy(Class<?>... ics) { return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } /** * 生成Proxy子類,該子類封裝了生成服務接口代理類的邏輯 */ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { if (ics.length > 65535) throw new IllegalArgumentException("interface limit exceeded"); StringBuilder sb = new StringBuilder(); //遍歷接口列表 for (int i = 0; i < ics.length; i++) { String itf = ics[i].getName(); //檢測是否爲接口類型 if (!ics[i].isInterface()) throw new RuntimeException(itf + " is not a interface."); //使用提供的ClassLoader,即Proxy自身的ClassLoader加載當前遍歷的接口 Class<?> tmp = null; try { tmp = Class.forName(itf, false, cl); } catch (ClassNotFoundException e) { } //檢測接口是否相同,這裏至關於判斷Proxy的類加載器與接口的類加載器是否爲一個 if (tmp != ics[i]) throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); //拼接接口名稱,格式"接口名1;接口名2..." sb.append(itf).append(';'); } //獲取類加載器對應的Proxy緩存,一個Map對象 Map<String, Object> cache; synchronized (ProxyCacheMap) { //獲取當前類加載器的Proxy實例緩存,key爲ClassLoader cache = ProxyCacheMap.get(cl); if (cache == null) { cache = new HashMap<String, Object>(); //緩存爲null,設置該ClassLoader的緩存 ProxyCacheMap.put(cl, cache); } } //以接口名做爲key,從cache中獲取這個key的Proxy實例緩存 String key = sb.toString(); Proxy proxy = null; //獲取value時的併發控制,使用監視器鎖 synchronized (cache) { do { //若是value就是應用類型包裝的,直接從引用中獲取實例 Object value = cache.get(key); if (value instanceof Reference<?>) { proxy = (Proxy) ((Reference<?>) value).get(); if (proxy != null) return proxy; } //若是value不是引用類型,則進行判斷其是否等於PendingGenerationMarker,即一個Object對象。 //這是使用了cache.wait(),讓其餘線程在cache這個對象上進行等待,緣由以下: //(1)首先一個在cache中未命中的key其value確定爲null;那麼咱們確定要建立這個value; //(2)既然value==null,則進入到else邏輯,設置一個key的標識,跳出循環,也跳出監視器鎖同步塊; //(3)當前線程代碼繼續執行去建立Proxy的實例,其餘線程進入到這個監視器鎖塊,就會進行循環獲取Proxy; //(4)不斷地循環獲取知足條件的Reference也沒錯,可是這樣不斷瘋狂的循環,對程序有影響; //(5)所以,設置PendingGenerationMarker的目的也在於此,做爲一個標識,若是發現key的value仍是它, // 就表示Proxy實例還沒有建立完成,在此進行等待;直到實例建立完成並進行notify。 //(6)固然,若使用監視器鎖將建立Proxy的代碼鎖住也能夠,可是這樣鎖住的代碼塊太大了。 if (value == PendingGenerationMarker) { try { cache.wait(); } catch (InterruptedException e) { } } else { cache.put(key, PendingGenerationMarker); break; } } while (true); } //原子計數器+1,做爲id,用於拼接生成的Proxy子類的名字 long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null; //ccm用於爲Proxy生成子類,ccp爲服務接口生成代理類 ClassGenerator ccp = null, ccm = null; try { /************************* 開始建立服務接口代理類 *************************/ //建立生成服務接口代理類的ClassGenerator ccp = ClassGenerator.newInstance(cl); Set<String> worked = new HashSet<String>(); List<Method> methods = new ArrayList<Method>(); //遍歷要代理的接口 for (int i = 0; i < ics.length; i++) { //檢測接口訪問級別是否爲public if (!Modifier.isPublic(ics[i].getModifiers())) { //不爲public級別的接口,須要確保它們必須在同一個包下 String npkg = ics[i].getPackage().getName(); if (pkg == null) { pkg = npkg; } else { if (!pkg.equals(npkg)) throw new IllegalArgumentException("non-public interfaces from diff pack..."); } } //添加接口到cpp中 ccp.addInterface(ics[i]); //遍歷服務接口的全部方法 for (Method method : ics[i].getMethods()) { //獲取方法描述,JVM格式"realTimePushList(Lcom/bestpay/messagecenter/product/ //service/api/dto/push/RealTimePushListDTO;)Lcom/bestpay/dubbo/result/Result;" String desc = ReflectUtils.getDesc(method); //若是方法描述字符串已在worked中,則忽略。考慮A接口和B接口中包含一個徹底相同的方法的狀況 if (worked.contains(desc)) continue; worked.add(desc); //服務接口代理類方法大小 TODO int ix = methods.size(); Class<?> rt = method.getReturnType(); Class<?>[] pts = method.getParameterTypes(); //拼接代碼字符串"Object[] args = new Object[N];",N是當前遍歷的method參數個數 StringBuilder code = new StringBuilder("Object[] args = new Object["). append(pts.length).append("];"); //遍歷method的參數列表 for (int j = 0; j < pts.length; j++) //拼接代碼字符串"args[j]=($w)$k;",其中k=j+1。這個是args的賦值語句 code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); //拼接handler的調用語句,"Object ret = handler.invoke(this, methods[ix], args);" //handler是java動態代理InvocationHandler的一個實現類,ix爲methods.size()。 code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);"); //若方法返回類型不爲void,則拼接返回類型並進行強制類型轉換"return (類型)ret;" if (!Void.TYPE.equals(rt)) code.append(" return ").append(asArgument(rt, "ret")).append(";"); //向List<Method>中添加該method methods.add(method); //添加方法名、訪問控制符、返回類型、參數列表、拋出異常類型、方法體代碼到ClassGenerator中 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } //設置服務接口代理類包名就爲Proxy類所在的包名 if (pkg == null) pkg = PACKAGE_NAME; //設置服務接口代理類類名爲"pkg + ".proxy" + id",好比org.apache.dubbo.proxy0,注意是小寫 String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); //添加成員屬性Method[] methods ccp.addField("public static java.lang.reflect.Method[] methods;"); //添加成員屬性InvocationHandler ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); //添加帶有InvocationHandler參數的構造方法,好比: //public proxy0(java.lang.reflect.InvocationHandler $1) { handler=$1; } ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); //添加默認無參構造器 ccp.addDefaultConstructor(); //生成服務接口代理Class Class<?> clazz = ccp.toClass(); //將服務接口代理類的static屬性methods設置爲上面收集到的methods列表 clazz.getField("methods").set(null, methods.toArray(new Method[0])); /************************* 開始建立Proxy類的子類 *************************/ String fcn = Proxy.class.getName() + id; //類名=Proxy全限定名+id,如Proxy一、Proxy2等 ccm = ClassGenerator.newInstance(cl); //建立生成Proxy子類的ClassGenerator ccm.setClassName(fcn); //設置類名 ccm.addDefaultConstructor(); //添加默認構造器 ccm.setSuperClass(Proxy.class); //設置父類 //添加方法newInstance,該方法調用構造方法,格式以下: //public Object newInstance(java.lang.reflect.InvocationHandler $1) { // return new org.apache.dubbo.proxy0($1); //} ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); Class<?> pc = ccm.toClass(); //生成Proxy子類Class proxy = (Proxy) pc.newInstance(); //生成Proxy子類的實例 } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { //釋放資源 if (ccp != null) ccp.release(); if (ccm != null) ccm.release(); //同步設置該Proxy子類的實例緩存,使用弱引用 synchronized (cache) { if (proxy == null) cache.remove(key); else cache.put(key, new WeakReference<Proxy>(proxy)); //通知全部在cache上等待的線程 cache.notifyAll(); } } return proxy; }