Dubbo源碼學習總結系列七---註冊中心

        Dubbo註冊中心是框架的核心模塊,提供了服務註冊發現(包括服務提供者、消費者、路由策略、覆蓋規則)的功能,該功能集中體現了服務治理的特性。該模塊結合Cluster模塊實現了集羣服務。Dubbo管理控制檯查詢註冊的數據展示服務提供者、消費者、路由策略、覆蓋規則相關信息。監控中心從註冊中心訂閱相關信息實時監控調用鏈調用狀況。java

        那麼,Registry模塊的職責咱們總結爲:redis

        (1)註冊:包括服務提供者、路由策略、覆蓋規則信息註冊到註冊中心;緩存

        (2)訂閱:消費端從註冊中心訂閱相關信息;網絡

        (3)通知(push):註冊信息有變化,註冊中心向訂閱者主動通知變動信息;session

        (4)得到服務列表(pull):消費方主動從註冊中心目錄服務拉取服務提供者列表等信息。架構

 

        從如下幾個架構圖能夠看出Registry在Dubbo框架中所扮演的角色,架構圖摘自Dubbo開發者文檔:app

        (1)依賴關係圖中1-3的步驟咱們看到提供者和消費者對註冊中心的依賴;框架

        

        (2)調用鏈圖(一部分)顯示了組件調用Registry的場景:消費端refer時,從註冊中心目錄服務得到服務提供者列表。less

        (3)暴露服務時序圖中,服務提供者向註冊中心註冊或取消註冊服務提供者信息。dom

        (4)引用服務時序圖中消費端配置指向RegistryProtocol,實現向註冊中心訂閱提供者信息,銷燬時取消訂閱。

 

        註冊中心的配置方法以下:        

<!-- 定義註冊中心 -->
<dubbo:registry id="xxx1" address="xxx://ip:port" />
<!-- 引用註冊中心,若是沒有配置registry屬性,將在ApplicationContext中自動掃描registry配置 -->
<dubbo:service registry="xxx1" />
<!-- 引用註冊中心缺省值,當<dubbo:service>沒有配置registry屬性時,使用此配置 -->
<dubbo:provider registry="xxx1" />

 

        定義的接口有:

        RegistryFactory.java:

 1 public interface RegistryFactory {
 2     /**
 3      * 鏈接註冊中心.
 4      * 
 5      * 鏈接註冊中心需處理契約:<br>
 6      * 1. 當設置check=false時表示不檢查鏈接,不然在鏈接不上時拋出異常。<br>
 7      * 2. 支持URL上的username:password權限認證。<br>
 8      * 3. 支持backup=10.20.153.10備選註冊中心集羣地址。<br>
 9      * 4. 支持file=registry.cache本地磁盤文件緩存。<br>
10      * 5. 支持timeout=1000請求超時設置。<br>
11      * 6. 支持session=60000會話超時或過時設置。<br>
12      * 
13      * @param url 註冊中心地址,不容許爲空
14      * @return 註冊中心引用,總不返回空
15      */
16     Registry getRegistry(URL url); 
17 }

 

   RegistryService.java:

 1 public interface RegistryService { // Registry extends RegistryService 
 2     /**
 3      * 註冊服務.
 4      * 
 5      * 註冊需處理契約:<br>
 6      * 1. 當URL設置了check=false時,註冊失敗後不報錯,在後臺定時重試,不然拋出異常。<br>
 7      * 2. 當URL設置了dynamic=false參數,則需持久存儲,不然,當註冊者出現斷電等狀況異常退出時,需自動刪除。<br>
 8      * 3. 當URL設置了category=overrides時,表示分類存儲,缺省類別爲providers,可按分類部分通知數據。<br>
 9      * 4. 當註冊中心重啓,網絡抖動,不能丟失數據,包括斷線自動刪除數據。<br>
10      * 5. 容許URI相同但參數不一樣的URL並存,不能覆蓋。<br>
11      * 
12      * @param url 註冊信息,不容許爲空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
13      */
14     void register(URL url);
15 
16     /**
17      * 取消註冊服務.
18      * 
19      * 取消註冊需處理契約:<br>
20      * 1. 若是是dynamic=false的持久存儲數據,找不到註冊數據,則拋IllegalStateException,不然忽略。<br>
21      * 2. 按全URL匹配取消註冊。<br>
22      * 
23      * @param url 註冊信息,不容許爲空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
24      */
25     void unregister(URL url);
26 
27     /**
28      * 訂閱服務.
29      * 
30      * 訂閱需處理契約:<br>
31      * 1. 當URL設置了check=false時,訂閱失敗後不報錯,在後臺定時重試。<br>
32      * 2. 當URL設置了category=overrides,只通知指定分類的數據,多個分類用逗號分隔,並容許星號通配,表示訂閱全部分類數據。<br>
33      * 3. 容許以interface,group,version,classifier做爲條件查詢,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>
34      * 4. 而且查詢條件容許星號通配,訂閱全部接口的全部分組的全部版本,或:interface=*&group=*&version=*&classifier=*<br>
35      * 5. 當註冊中心重啓,網絡抖動,需自動恢復訂閱請求。<br>
36      * 6. 容許URI相同但參數不一樣的URL並存,不能覆蓋。<br>
37      * 7. 必須阻塞訂閱過程,等第一次通知完後再返回。<br>
38      * 
39      * @param url 訂閱條件,不容許爲空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
40      * @param listener 變動事件監聽器,不容許爲空
41      */
42     void subscribe(URL url, NotifyListener listener);
43 
44     /**
45      * 取消訂閱服務.
46      * 
47      * 取消訂閱需處理契約:<br>
48      * 1. 若是沒有訂閱,直接忽略。<br>
49      * 2. 按全URL匹配取消訂閱。<br>
50      * 
51      * @param url 訂閱條件,不容許爲空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
52      * @param listener 變動事件監聽器,不容許爲空
53      */
54     void unsubscribe(URL url, NotifyListener listener);
55 
56     /**
57      * 查詢註冊列表,與訂閱的推模式相對應,這裏爲拉模式,只返回一次結果。
58      * 
59      * @see com.alibaba.dubbo.registry.NotifyListener#notify(List)
60      * @param url 查詢條件,不容許爲空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
61      * @return 已註冊信息列表,可能爲空,含義同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}的參數。
62      */
63     List<URL> lookup(URL url);
64 
65 }

         NotifyListener.java:

 1 public interface NotifyListener { 
 2     /**
 3      * 當收到服務變動通知時觸發。
 4      * 
 5      * 通知需處理契約:<br>
 6      * 1. 老是以服務接口和數據類型爲維度全量通知,即不會通知一個服務的同類型的部分數據,用戶不須要對比上一次通知結果。<br>
 7      * 2. 訂閱時的第一次通知,必須是一個服務的全部類型數據的全量通知。<br>
 8      * 3. 中途變動時,容許不一樣類型的數據分開通知,好比:providers, consumers, routes, overrides,容許只通知其中一種類型,但該類型的數據必須是全量的,不是增量的。<br>
 9      * 4. 若是一種類型的數據爲空,需通知一個empty協議並帶category參數的標識性URL數據。<br>
10      * 5. 通知者(即註冊中心實現)需保證通知的順序,好比:單線程推送,隊列串行化,帶版本對比。<br>
11      * 
12      * @param urls 已註冊信息列表,總不爲空,含義同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
13      */
14     void notify(List<URL> urls);
15 
16 }

          Registry.java,集成了Node和RegistryService接口,定義了一個針對特定URL,有生命週期的註冊中心接口:

1 /**
2  * Registry. (SPI, Prototype, ThreadSafe)
3  *
4  * @see com.alibaba.dubbo.registry.RegistryFactory#getRegistry(URL)
5  * @see com.alibaba.dubbo.registry.support.AbstractRegistry
6  */
7 public interface Registry extends Node, RegistryService {
8 }

         register模塊在調用鏈中的位置是:消費端經過ClusterInvoker調用RegisterDirectory的list(url)方法獲得Invoker列表,即從註冊中心得到指定url的Invoker列表。服務提供者在註冊中心註冊了本身的服務信息,被消費端訂閱獲取。見下圖紅框中的紅色箭頭,即爲調用過程。

        

        RegistoryDirectory.java爲註冊中心目錄服務,繼承了AbstractDirectory,實現doList(invokation)方法,它是屬於消費端類。

        主要邏輯爲:

        (1)從invokation中解析出要請求的URL;

        (2)根據配置中的protocol和Url查找緩存(HashMap,本地內存緩存)中的invokers,若是找到則返回,不然調用屬性protocol(初始化時注入的實例,如DubboProtocol實例)的refer()方法獲得invokers並返回;

 1     public List<Invoker<T>> doList(Invocation invocation) {
 2         if (forbidden) {
 3             // 1. No service provider 2. Service providers are disabled
 4             throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
 5                 "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
 6                     + " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?");
 7         }
 8         List<Invoker<T>> invokers = null;
//invoker本地緩存,方法名做爲key的invoker Map
9 Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference 10 if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { 11 String methodName = RpcUtils.getMethodName(invocation); 12 Object[] args = RpcUtils.getArguments(invocation); 13 if (args != null && args.length > 0 && args[0] != null 14 && (args[0] instanceof String || args[0].getClass().isEnum())) { 15 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter 16 } 17 if (invokers == null) { 18 invokers = localMethodInvokerMap.get(methodName); 19 } 20 if (invokers == null) { 21 invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); 22 } 23 if (invokers == null) { 24 Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); 25 if (iterator.hasNext()) { 26 invokers = iterator.next(); 27 } 28 } 29 } 30 return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; 31 }

         如下代碼實現了根據url列表構造invoker的map:

 1 /**
 2      * Turn urls into invokers, and if url has been refer, will not re-reference.
 3      *
 4      * @param urls
 5      * @return invokers
 6      */
 7     private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
 8         Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
 9         if (urls == null || urls.size() == 0) {
10             return newUrlInvokerMap;
11         }
12         Set<String> keys = new HashSet<String>();
13         String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
14         for (URL providerUrl : urls) {
15             // If protocol is configured at the reference side, only the matching protocol is selected
16             if (queryProtocols != null && queryProtocols.length() > 0) {
17                 boolean accept = false;
18                 String[] acceptProtocols = queryProtocols.split(",");
19                 for (String acceptProtocol : acceptProtocols) {
20                     if (providerUrl.getProtocol().equals(acceptProtocol)) {
21                         accept = true;
22                         break;
23                     }
24                 }
25                 if (!accept) {
26                     continue;
27                 }
28             }
29             if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
30                 continue;
31             }
32             if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
33                 logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
34                         + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
35                 continue;
36             }
37             URL url = mergeUrl(providerUrl);
38 
39             String key = url.toFullString(); // The parameter urls are sorted
40             if (keys.contains(key)) { // Repeated url
41                 continue;
42             }
43             keys.add(key);
44             // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
45             Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
46             Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
47             if (invoker == null) { // Not in the cache, refer again
48                 try {
49                     boolean enabled = true;
50                     if (url.hasParameter(Constants.DISABLED_KEY)) {
51                         enabled = !url.getParameter(Constants.DISABLED_KEY, false);
52                     } else {
53                         enabled = url.getParameter(Constants.ENABLED_KEY, true);
54                     }
55                     if (enabled) {
56                         invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
57                     }
58                 } catch (Throwable t) {
59                     logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
60                 }
61                 if (invoker != null) { // Put new invoker in cache
62                     newUrlInvokerMap.put(key, invoker);
63                 }
64             } else {
65                 newUrlInvokerMap.put(key, invoker);
66             }
67         }
68         keys.clear();
69         return newUrlInvokerMap;
70     }

         根據URL中的method參數構造一個以method爲key的invoker的map,這是爲了分組合並多個service的調用結果功能作準備。

 1     /**
 2      * Transform the invokers list into a mapping relationship with a method
 3      *
 4      * @param invokersMap Invoker Map
 5      * @return Mapping relation between Invoker and method
 6      */
 7     private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
 8         Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
 9         // According to the methods classification declared by the provider URL, the methods is compatible with the registry to execute the filtered methods
10         List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
11         if (invokersMap != null && invokersMap.size() > 0) {
12             for (Invoker<T> invoker : invokersMap.values()) {
13                 String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
14                 if (parameter != null && parameter.length() > 0) {
15                     String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
16                     if (methods != null && methods.length > 0) {
17                         for (String method : methods) {
18                             if (method != null && method.length() > 0
19                                     && !Constants.ANY_VALUE.equals(method)) {
20                                 List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
21                                 if (methodInvokers == null) {
22                                     methodInvokers = new ArrayList<Invoker<T>>();
23                                     newMethodInvokerMap.put(method, methodInvokers);
24                                 }
25                                 methodInvokers.add(invoker);
26                             }
27                         }
28                     }
29                 }
30                 invokersList.add(invoker);
31             }
32         }
33         List<Invoker<T>> newInvokersList = route(invokersList, null);
34         newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
35         if (serviceMethods != null && serviceMethods.length > 0) {
36             for (String method : serviceMethods) {
37                 List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
38                 if (methodInvokers == null || methodInvokers.size() == 0) {
39                     methodInvokers = newInvokersList;
40                 }
41                 newMethodInvokerMap.put(method, route(methodInvokers, method));
42             }
43         }
44         // sort and unmodifiable
45         for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
46             List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
47             Collections.sort(methodInvokers, InvokerComparator.getComparator());
48             newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
49         }
50         return Collections.unmodifiableMap(newMethodInvokerMap);
51     }

 

        以上邏輯實現了Directory接口,構造了註冊中心目錄服務的功能。RegistoryDirectory類自己還實現了NotifyListener接口,實現了notify()方法。在註冊中心發佈訂閱模式中,RegistoryDirectory可做爲訂閱者訂閱一個事件,如提供者、路由策略、配置規則的變更事件,事件發生後接受通知,並觸發notify(urls)方法,接受新的提供者、路由策略、配置規則的url,並刷新本地url緩存。

 1     public synchronized void notify(List<URL> urls) {
 2         List<URL> invokerUrls = new ArrayList<URL>();
 3         List<URL> routerUrls = new ArrayList<URL>();
 4         List<URL> configuratorUrls = new ArrayList<URL>();
 5         for (URL url : urls) {
 6             String protocol = url.getProtocol();
 7             String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
 8             if (Constants.ROUTERS_CATEGORY.equals(category)
 9                     || Constants.ROUTE_PROTOCOL.equals(protocol)) {
10                 routerUrls.add(url);
11             } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
12                     || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
13                 configuratorUrls.add(url);
14             } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
15                 invokerUrls.add(url);
16             } else {
17                 logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
18             }
19         }
20         // 刷新配置規則configurators
21         if (configuratorUrls != null && configuratorUrls.size() > 0) {
22             this.configurators = toConfigurators(configuratorUrls);
23         }
24         // 刷新路由規則routers
25         if (routerUrls != null && routerUrls.size() > 0) {
26             List<Router> routers = toRouters(routerUrls);
27             if (routers != null) { // null - do nothing
28                 setRouters(routers);
29             }
30         }
31         List<Configurator> localConfigurators = this.configurators; // local reference
32         // 合併以覆蓋方式更新的參數merge override parameters
33         this.overrideDirectoryUrl = directoryUrl;
34         if (localConfigurators != null && localConfigurators.size() > 0) {
35             for (Configurator configurator : localConfigurators) {
36                 this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
37             }
38         }
39         // 刷新提供者invokers providers
40         refreshInvoker(invokerUrls);
41     }

 

        

        下面咱們再討論一下RegistoryProtocol類在註冊中心所起的做用。以下圖所示,RegistoryProtocol類是消費端和提供者端共用的類,藍色虛線表示類初始化過程,即初始化時的組裝鏈。

        首先看看export()實現邏輯:

        (1)調用doLocalExport(originInvoker)方法獲得本地緩存的exporter(若是緩存找不到則用protocol.export()建立並放入緩存);

        (2)調用getRegistry(originInvoker)方法獲得當前originInvoker的註冊中心實例registry

 1     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2         //從緩存中取得export,或者經過protocol.export()方法建立exporter export invoker
 3         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 4 
 5         URL registryUrl = getRegistryUrl(originInvoker);
 6 
 7         //registry provider
 8         final Registry registry = getRegistry(originInvoker);
 9         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
10 
11         //取得是否當即註冊的配置,默認爲true to judge to delay publish whether or not
12         boolean register = registedProviderUrl.getParameter("register", true);
13         //將originInvoker加入本地緩存
14         ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
15         //當即註冊
16         if (register) {
//根據registryUrl取得相應的registry實例,調用registry.register()方法註冊提供者url
17 register(registryUrl, registedProviderUrl); 18 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); 19 } 20 21 //訂閱配置策略更新事件 Subscribe the override data 22 // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. 23 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 24 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 25 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 26 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 27 //Ensure that a new exporter instance is returned every time export 28 return new Exporter<T>() { 29 public Invoker<T> getInvoker() { 30 return exporter.getInvoker(); 31 } 32 33 public void unexport() { 34 try { 35 exporter.unexport(); 36 } catch (Throwable t) { 37 logger.warn(t.getMessage(), t); 38 } 39 try { 40 registry.unregister(registedProviderUrl); 41 } catch (Throwable t) { 42 logger.warn(t.getMessage(), t); 43 } 44 try { 45 overrideListeners.remove(overrideSubscribeUrl); 46 registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 47 } catch (Throwable t) { 48 logger.warn(t.getMessage(), t); 49 } 50 } 51 }; 52 }

         接下來咱們分析一下refer()方法的實現邏輯。refer()方法是客戶端指向一個服務接口,用來獲取一個服務接口的invoker(調用對象)遠程請求服務方法的,顧名思義,RegisterProtocol協議類固然是從註冊中心訂閱獲取一個invoker對象,這就涉及到對註冊中心的操做。

實現邏輯大概是:

(1)經過URL獲得相應的註冊中心對象Registry(一般爲zookeeper註冊中心);

(2)若是請求的服務類別是RegistryService註冊服務(猜想多是管理控制檯如dubbo-admin或監控臺monitor用到的),則直接返回註冊中心代理對象;

(3)請求的服務類別是普通服務接口,判斷url中的group屬性,若是是group="a,b" 或 group="*"的配置,則考慮分組聚合結果;

(4)調用doRefer()實現指向邏輯;

 1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
 2         url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//經過URL獲得相應的註冊中心對象Registry(一般爲zookeeper註冊中心)
3 Registry registry = registryFactory.getRegistry(url);
//若是請求的服務類別是RegistryService註冊服務(猜想多是管理控制檯如dubbo-admin或監控臺monitor用到的),則直接返回註冊中心代理對象
4 if (RegistryService.class.equals(type)) { 5 return proxyFactory.getInvoker((T) registry, type, url); 6 } 7 8 // group="a,b" or group="*" 請求的服務類別是普通服務接口,判斷url中的group屬性,若是是group="a,b" 或 group="*"的配置,則考慮分組聚合結果 9 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 10 String group = qs.get(Constants.GROUP_KEY); 11 if (group != null && group.length() > 0) { 12 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 13 || "*".equals(group)) { 14 return doRefer(getMergeableCluster(), registry, type, url); 15 } 16 } 17 return doRefer(cluster, registry, type, url); 18 }

 

doRefer()方法實現指向普通遠程服務的邏輯:

(1)建立註冊目錄服務對象,並設置註冊中心對象和協議;

(2)向註冊中心註冊客戶端的服務指向事件,爲管理控制檯和監控臺提供指向信息;

(3)從註冊中心訂閱所指向的服務接口提供列表;

(4)根據指定的集羣策略,獲得一個集羣服務調用代理,該代理可根據指定的集羣策略調用遠程服務接口,該接口是從註冊中心得到的可用服務(所指定的某個服務接口)列表,具體原理參見集羣邏輯;

(5)將Consumer包裝類對象存入本地緩存;

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//建立註冊目錄服務對象,並設置註冊中心對象和協議 RegistryDirectory
<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
//向註冊中心註冊客戶端的服務指向事件,爲管理控制檯和監控臺提供指向信息 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(
false))); }
//從註冊中心訂閱所指向的服務接口提供列表 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); //根據指定的集羣策略,獲得一個集羣服務調用代理,該代理可根據指定的集羣策略調用遠程服務接口,該接口是從註冊中心得到的可用服務(所指定的某個服務接口)列表,具體原理參見集羣邏輯 Invoker invoker = cluster.join(directory);
//將Consumer包裝類對象存入本地緩存 ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
return invoker; }

 

        在介紹Zookeeper註冊中心以前,咱們討論一下ZookeeperRegister的爺爺類AbstractRegistry類和父類FailbackRegistry類的實現原理(顧名思義,它實現了失敗自動重試的註冊服務。它自己繼承了AbstractRegistry類)。

AbstractRegistry類實現了Registry接口,將註冊、訂閱的url寫入HashMap結構的內存緩存,並在訂閱接受通知時將訂閱的url信息寫入本地文件(用戶文件夾.dubbo內)做爲本地緩存。

        AbstractRegistry類的構造函數將本地文件url註冊信息緩存載入內存,並觸發通知notify()將新的url的集羣備用地址寫入文件緩存和notifys內存緩存。

 1     public AbstractRegistry(URL url) {
 2         setUrl(url);
 3         // Start file save timer
 4         syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
 5         String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
 6         File file = null;
 7         if (ConfigUtils.isNotEmpty(filename)) {
 8             file = new File(filename);
 9             if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
10                 if (!file.getParentFile().mkdirs()) {
11                     throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
12                 }
13             }
14         }
15         this.file = file;
//將本地文件url緩存載入內存
16 loadProperties();
//將url的集羣備用地址寫入本地文件和notifys內存緩存
17 notify(url.getBackupUrls()); 18 }

 

         如下是將本地properties內存緩存刷新到文件的代碼,爲了解決同步寫入問題,增長了一個.lock文件做爲鎖。同時經過版本號樂觀鎖控制讀取到最新的文件內容後再追加新的內容。

 1     public void doSaveProperties(long version) {
 2         if (version < lastCacheChanged.get()) {
 3             return;
 4         }
 5         if (file == null) {
 6             return;
 7         }
 8         // Save
 9         try {
//建立加鎖文件
10 File lockfile = new File(file.getAbsolutePath() + ".lock"); 11 if (!lockfile.exists()) { 12 lockfile.createNewFile(); 13 } 14 RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); 15 try { 16 FileChannel channel = raf.getChannel(); 17 try { 18 FileLock lock = channel.tryLock(); 19 if (lock == null) { 20 throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); 21 } 22 // Save 23 try { 24 if (!file.exists()) { 25 file.createNewFile(); 26 } 27 FileOutputStream outputFile = new FileOutputStream(file); 28 try {
//寫入內容
29 properties.store(outputFile, "Dubbo Registry Cache"); 30 } finally { 31 outputFile.close(); 32 } 33 } finally { 34 lock.release(); 35 } 36 } finally { 37 channel.close(); 38 } 39 } finally { 40 raf.close(); 41 } 42 } catch (Throwable e) { 43 if (version < lastCacheChanged.get()) { 44 return; 45 } else { 46 registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); 47 } 48 logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e); 49 } 50 }

         register(url)方法將url寫入registered內存緩存,subscribe(URL url, NotifyListener listener)方法將url和listener鍵值對寫入listeners內存緩存,在zookeeperRegistry類的doSubscribe中觸發notify,觸發listener的notify方法。notify(urls)將urls列表寫入notified內存緩存中,並觸發對應url的listener.notify()方法。lookup(url)方法從notified本地內存緩存找到對應的url地址列表,若是不存在,則觸發subscribe()方法訂閱url。

         FailbackRegistry類是失敗自動嘗試恢復的註冊類,繼承了AbstractRegistry類。如下是構造方法,初始化了一個定時任務管理器,定時掃描註冊/取消註冊、訂閱/取消訂閱、通知的失敗列表,重試註冊/取消註冊、訂閱/取消訂閱、通知任務,成功後從失敗列表移除。在註冊/取消註冊、訂閱/取消訂閱、通知方法拋出異常時,自動將對應的url加入到失敗列表等待重試。

 1     public FailbackRegistry(URL url) {
 2         super(url);
 3         int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
 4         this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
 5             public void run() {
 6                 // Check and connect to the registry
 7                 try {
 8                     retry();
 9                 } catch (Throwable t) { // Defensive fault tolerance
10                     logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
11                 }
12             }
13         }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
14     }

 

         ZookeeperRegistry類基於Zookeeper目錄服務,將註冊的url信息寫入zk,從zk中訂閱url,從zk查找url對應的服務信息。

        該類主要實現了doRegister()、doUnRegister()、doSubscribe()、doUnSubscribe()、lookup()方法。

1     protected void doRegister(URL url) {
2         try {
//將url信息存入zk
3 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 }

 

 

1     protected void doUnregister(URL url) {
2         try {
//從zk刪除url信息
3 zkClient.delete(toUrlPath(url)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 }

 

         doSubscribe()方法將訂閱的url存入zk,觸發notify()方法,調用listener.notify方法。      

 1     protected void doSubscribe(final URL url, final NotifyListener listener) {
 2         try {
 3             if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
 4                 String root = toRootPath();
 5                 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
 6                 if (listeners == null) {
 7                     zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
 8                     listeners = zkListeners.get(url);
 9                 }
10                 ChildListener zkListener = listeners.get(listener);
11                 if (zkListener == null) {
12                     listeners.putIfAbsent(listener, new ChildListener() {
13                         public void childChanged(String parentPath, List<String> currentChilds) {
14                             for (String child : currentChilds) {
15                                 child = URL.decode(child);
16                                 if (!anyServices.contains(child)) {
17                                     anyServices.add(child);
18                                     subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
19                                             Constants.CHECK_KEY, String.valueOf(false)), listener);
20                                 }
21                             }
22                         }
23                     });
24                     zkListener = listeners.get(listener);
25                 }
26                 zkClient.create(root, false);
27                 List<String> services = zkClient.addChildListener(root, zkListener);
28                 if (services != null && services.size() > 0) {
29                     for (String service : services) {
30                         service = URL.decode(service);
31                         anyServices.add(service);
32                         subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
33                                 Constants.CHECK_KEY, String.valueOf(false)), listener);
34                     }
35                 }
36             } else {
37                 List<URL> urls = new ArrayList<URL>();
38                 for (String path : toCategoriesPath(url)) {
39                     ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
40                     if (listeners == null) {
41                         zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
42                         listeners = zkListeners.get(url);
43                     }
44                     ChildListener zkListener = listeners.get(listener);
45                     if (zkListener == null) {
46                         listeners.putIfAbsent(listener, new ChildListener() {
47                             public void childChanged(String parentPath, List<String> currentChilds) {
48                                 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
49                             }
50                         });
51                         zkListener = listeners.get(listener);
52                     }
//path信息加入zk
53 zkClient.create(path, false); 54 List<String> children = zkClient.addChildListener(path, zkListener); 55 if (children != null) { 56 urls.addAll(toUrlsWithEmpty(url, path, children)); 57 } 58 } 59 notify(url, listener, urls); 60 } 61 } catch (Throwable e) { 62 throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 63 } 64 }

         lookup(url)方法從zk中查詢全部相關的url服務列表:

 1     public List<URL> lookup(URL url) {
 2         if (url == null) {
 3             throw new IllegalArgumentException("lookup url == null");
 4         }
 5         try {
 6             List<String> providers = new ArrayList<String>();
 7             for (String path : toCategoriesPath(url)) {
//查詢全部相關的url列表
8 List<String> children = zkClient.getChildren(path); 9 if (children != null) { 10 providers.addAll(children); 11 } 12 } 13 return toUrlsWithoutEmpty(url, providers); 14 } catch (Throwable e) { 15 throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 16 } 17 }

 

 

         Dubbo出了實現基於zk的註冊中心,還實現了基於redis的註冊中心,multicast基於網絡廣播的註冊中心,基本原理相似,在此再也不詳述。        

相關文章
相關標籤/搜索