目標:解釋註冊中心在dubbo框架中做用,dubbo-registry-api源碼解讀
服務治理框架中能夠大體分爲服務通訊和服務管理兩個部分,服務管理能夠分爲服務註冊、服務發現以及服務被熱加工介入,服務提供者Provider會往註冊中心註冊服務,而消費者Consumer會從註冊中心中訂閱相關的服務,並不會訂閱所有的服務。html
官方文檔給出了Provider、Consumer以及Registry之間的依賴關係:java
從上圖看,能夠清晰的看到Registry所起到的做用,我舉個例子,Registry相似於一個自動售貨機,服務提供者相似於一個商品生產者,他會往這個自動售賣機中添加商品,也就是註冊服務,而消費者則會到註冊中心中購買本身須要的商品,也就是訂閱對應的服務。這樣解釋應該就能夠比較直觀的感覺到註冊中心所擔任的是什麼角色。git
首先咱們來看看這個包下的結構:github
能夠很清晰的看到dubbo內部支持的四種註冊中心實現方式,分別是dubbo、multicast、zookeeper、redis。他們都依賴於support包下面的類。根據上圖的依賴關係,我會從上往下講解dubbo中對於註冊中心的設計以及實現。正則表達式
該接口是註冊中心模塊的服務接口,提供了註冊、取消註冊、訂閱、取消訂閱以及查詢符合條件的已註冊數據。它的源代碼我就不貼出來了,能夠查看官方文檔中相關部分,還給出了中文註釋。redis
RegistryService源碼地址: http://dubbo.apache.org/zh-cn...
咱們能夠從註釋中看到各個方法要處理的契約都在上面寫明瞭。這個接口就是協定了註冊中心的功能,這裏統一說明一下URL,又再次提到URL了,在上篇文章中就說明了dubbo是以總線模式來時刻傳遞和保存配置信息的,也就是配置信息都被放在URL上進行傳遞,隨時能夠取得相關配置信息,而這裏提到了URL有別的做用,就是做爲相似於節點的做用,首先服務提供者(Provider)啓動時須要提供服務,就會向註冊中心寫下本身的URL地址。而後消費者啓動時須要去訂閱該服務,則會訂閱Provider註冊的地址,而且消費者也會寫下本身的URL。繼續拿我上面的例子,商品生產者生產完商品,它會在把該商品放在自動售賣機的某一個欄目內,二消費者須要買該商品的時候,就是經過該地址去購買,而且會留下本身的購買記錄。下面來說講各個方法:apache
註冊,若是看懂我上面說的url的做用,那麼就很清楚該方法的做用了,這裏強調一點,就是註釋中講到的容許URI相同但參數不一樣的URL並存,不能覆蓋,也就是說url值必須惟一的,不能有如出一轍。segmentfault
void register(URL url);
取消註冊,該方法也很簡單,就是取消註冊,也就是商品生產者不在銷售該商品, 須要把東西從自動售賣機上取下來,欄目也要取出,這裏強調按全URL匹配取消註冊。api
void unregister(URL url);
訂閱,這裏不是根據全URL匹配訂閱的,而是根據條件去訂閱,也就是說能夠訂閱多個服務。listener是用來監聽處理註冊數據變動的事件。緩存
void subscribe(URL url, NotifyListener listener);
取消訂閱,這是按照全URL匹配去取消訂閱的。
void unsubscribe(URL url, NotifyListener listener);
查詢註冊列表,經過url進行條件查詢所匹配的全部URL集合。
List<URL> lookup(URL url);
註冊中心接口,該接口很好理解,就是把節點以及註冊中心服務的方法整合在了這個接口裏面。咱們來看看源代碼:
public interface Registry extends Node, RegistryService { }
能夠看到該接口並無本身的方法,就是繼承了Node和RegistryService接口。這裏的Node是節點的接口,裏面協定了關於節點的一些操做方法,咱們能夠來看看源代碼:
public interface Node { //得到節點地址 URL getUrl(); //判斷節點是否可用 boolean isAvailable(); //銷燬節點 void destroy(); }
這個接口是註冊中心的工廠接口,用來返回註冊中心的對象。來看看它的源碼:
@SPI("dubbo") public interface RegistryFactory { @Adaptive({"protocol"}) Registry getRegistry(URL url); }
原本方法上有一些英文註釋,寫的是關於鏈接註冊中心需處理的契約,具體的能夠直接看官方文檔,仍是中文的。
地址: http://dubbo.apache.org/zh-cn...
該接口是一個可擴展接口,能夠看到該接口上有個@SPI註解,而且默認值爲dubbo,也就是默認擴展的是DubboRegistryFactory,而且能夠在getRegistry方法上能夠看到有@Adaptive註解,那麼該接口會動態生成一個適配器RegistryFactory$Adaptive,而且會去首先擴展url.protocol的值對應的實現類。關於SPI擴展機制請觀看《dubbo源碼解析(二)Dubbo擴展機制SPI》。
該接口只有一個notify方法,通知監聽器。當收到服務變動通知時觸發。來看看它的源碼:
public interface NotifyListener { /** * 當收到服務變動通知時觸發。 * <p> * 通知需處理契約:<br> * 1. 老是以服務接口和數據類型爲維度全量通知,即不會通知一個服務的同類型的部分數據,用戶不須要對比上一次通知結果。<br> * 2. 訂閱時的第一次通知,必須是一個服務的全部類型數據的全量通知。<br> * 3. 中途變動時,容許不一樣類型的數據分開通知,好比:providers, consumers, routers, overrides,容許只通知其中一種類型,但該類型的數據必須是全量的,不是增量的。<br> * 4. 若是一種類型的數據爲空,需通知一個empty協議並帶category參數的標識性URL數據。<br> * 5. 通知者(即註冊中心實現)需保證通知的順序,好比:單線程推送,隊列串行化,帶版本對比。<br> * * @param urls 已註冊信息列表,總不爲空,含義同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。 */ void notify(List<URL> urls); }
AbstractRegistry實現的是Registry接口,是Registry的抽象類。爲了減輕註冊中心的壓力,在該類中實現了把本地URL緩存到property文件中的機制,而且實現了註冊中心的註冊、訂閱等方法。
源碼註釋地址: https://github.com/CrazyHZM/i...
// URL的地址分隔符,在緩存文件中使用,服務提供者的URL分隔 private static final char URL_SEPARATOR = ' '; // URL地址分隔正則表達式,用於解析文件緩存中服務提供者URL列表 private static final String URL_SPLIT = "\\s+"; // 日誌輸出 protected final Logger logger = LoggerFactory.getLogger(getClass()); // 本地磁盤緩存,有一個特殊的key值爲registies,記錄的是註冊中心列表,其餘記錄的都是服務提供者列表 private final Properties properties = new Properties(); // 緩存寫入執行器 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); // 是否同步保存文件標誌 private final boolean syncSaveFile; //數據版本號 private final AtomicLong lastCacheChanged = new AtomicLong(); // 已註冊 URL 集合 // 註冊的 URL 不只僅能夠是服務提供者的,也能夠是服務消費者的 private final Set<URL> registered = new ConcurrentHashSet<URL>(); // 訂閱URL的監聽器集合 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 某個消費者被通知的某一類型的 URL 集合 // 第一個key是消費者的URL,對應的就是哪一個消費者。 // value是一個map集合,該map集合的key是分類的意思,例如providers、routes等,value就是被通知的URL集合 private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>(); // 註冊中心 URL private URL registryUrl; // 本地磁盤緩存文件,緩存註冊中心的數據 private File file;
理解屬性的含義對於後面去解讀方法頗有幫助,從上面能夠看到除了註冊中心相關的一些屬性外,能夠看到好幾個是個屬性跟磁盤緩存文件和讀寫文件有關的,這就是上面提到的把URL緩存到本地property的相關屬性這裏有幾個須要關注的點:
先來看看源碼:
public AbstractRegistry(URL url) { // 把url放到registryUrl中 setUrl(url); // Start file save timer // 從url中讀取是否同步保存文件的配置,若是沒有值默認用異步保存文件 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); // 得到file路徑 String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache"); File file = null; if (ConfigUtils.isNotEmpty(filename)) { //建立文件 file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; // 把文件裏面的數據寫入properties loadProperties(); // 通知監聽器,URL 變化結果 notify(url.getBackupUrls()); }
須要關注的幾個點:
protected static List<URL> filterEmpty(URL url, List<URL> urls) { if (urls == null || urls.isEmpty()) { List<URL> result = new ArrayList<URL>(1); result.add(url.setProtocol(Constants.EMPTY_PROTOCOL)); return result; } return urls; }
這個方法的源碼都不須要解釋了,很簡單,就是判斷url集合是否爲空,若是爲空,則把url中key爲empty的值加入到集合。該方法只有在notify方法中用到,爲了防止通知的URL變化結果爲空。
該方法比較長,我這裏不貼源碼了,須要的就查看github上的分析,該方法主要是將內存緩存properties中的數據存儲到文件中,而且在裏面作了版本號的控制,防止老的版本數據覆蓋了新版本數據。數據流向是跟loadProperties方法相反。
private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); // 把數據寫入到內存緩存中 properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry store file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry store file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
該方法就是加載本地磁盤緩存文件到內存緩存,也就是把文件裏面的數據寫入properties,能夠對比doSaveProperties方法,其中關鍵的實現就是properties.load和properties.store的區別,邏輯並不難。跟doSaveProperties的數據流向相反。
public List<URL> getCacheUrls(URL url) { for (Map.Entry<Object, Object> entry : properties.entrySet()) { // key爲某個分類,例如服務提供者分類 String key = (String) entry.getKey(); // value爲某個分類的列表,例如服務提供者列表 String value = (String) entry.getValue(); if (key != null && key.length() > 0 && key.equals(url.getServiceKey()) && (Character.isLetter(key.charAt(0)) || key.charAt(0) == '_') && value != null && value.length() > 0) { //分割出列表的每一個值 String[] arr = value.trim().split(URL_SPLIT); List<URL> urls = new ArrayList<URL>(); for (String u : arr) { urls.add(URL.valueOf(u)); } return urls; } } return null; }
該方法是得到內存緩存properties中相關value,而且返回爲一個集合,從該方法中能夠很清楚的看出properties中是存儲的什麼數據格式。
來看看源碼:
@Override public List<URL> lookup(URL url) { List<URL> result = new ArrayList<URL>(); // 得到該消費者url訂閱的 全部被通知的 服務URL集合 Map<String, List<URL>> notifiedUrls = getNotified().get(url); // 判斷該消費者是否訂閱服務 if (notifiedUrls != null && notifiedUrls.size() > 0) { for (List<URL> urls : notifiedUrls.values()) { for (URL u : urls) { // 判斷協議是否爲空 if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) { // 添加 該消費者訂閱的服務URL result.add(u); } } } } else { // 原子類 避免在獲取註冊在註冊中心的服務url時可以保證是最新的url集合 final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>(); // 通知監聽器。當收到服務變動通知時觸發 NotifyListener listener = new NotifyListener() { @Override public void notify(List<URL> urls) { reference.set(urls); } }; // 訂閱服務,就是消費者url訂閱已經 註冊在註冊中心的服務(也就是添加該服務的監聽器) subscribe(url, listener); // Subscribe logic guarantees the first notify to return List<URL> urls = reference.get(); if (urls != null && !urls.isEmpty()) { for (URL u : urls) { if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) { result.add(u); } } } } return result; }
該方法是實現了RegistryService接口的方法,做用是得到消費者url訂閱的服務URL列表。該方法有幾個地方有些繞我在這裏重點講解一下:
這兩個方法實現了RegistryService接口的方法,裏面的邏輯很簡單,全部我就不貼代碼了,以避免影響篇幅,若是真想看,能夠進到我github查看,下面我會貼出這部分註釋github的地址。其中註冊的邏輯就是把url加入到屬性registered,而取消註冊的邏輯就是把url從該屬性中移除,該屬性在上面有介紹。真正的實現是在FailbackRegistry類中,FailbackRegistry類我會在下面介紹。
這兩個方法實現了RegistryService接口的方法,分別是訂閱和取消訂閱,我就貼一個訂閱的代碼:
@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } // 得到該消費者url 已經訂閱的服務 的監聽器集合 Set<NotifyListener> listeners = subscribed.get(url); if (listeners == null) { subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); listeners = subscribed.get(url); } // 添加某個服務的監聽器 listeners.add(listener); }
從源代碼能夠看到,其實訂閱也就是把服務通知監聽器加入到subscribed中,具體的實現也是在FailbackRegistry類中。
恢復方法,在註冊中心斷開,重連成功的時候,會恢復註冊和訂閱。
protected void recover() throws Exception { // register //把內存緩存中的registered取出來遍歷進行註冊 Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { register(url); } } // subscribe //把內存緩存中的subscribed取出來遍歷進行訂閱 Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { subscribe(url, listener); } } } }
protected void notify(List<URL> urls) { if (urls == null || urls.isEmpty()) return; // 遍歷訂閱URL的監聽器集合,通知他們 for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); // 匹配 if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } // 遍歷監聽器集合,通知他們 Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } } protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); // 將urls進行分類 for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { // 按照url中key爲category對應的值進行分類,若是沒有該值,就找key爲providers的值進行分類 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); // 分類結果放入result result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } // 得到某一個消費者被通知的url集合(通知的 URL 變化結果) Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { // 添加該消費者對應的url notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } // 處理通知監聽器URL 變化結果 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); // 把分類標實和分類後的列表放入notified的value中 // 覆蓋到 `notified` // 當某個分類的數據爲空時,會依然有 urls 。其中 `urls[0].protocol = empty` ,經過這樣的方式,處理全部服務提供者爲空的狀況。 categoryNotified.put(category, categoryList); // 保存到文件 saveProperties(url); //通知監聽器 listener.notify(categoryList); } }
notify方法是通知監聽器,url的變化結果,不過變化的是全量數據,全量數據意思就是是以服務接口和數據類型爲維度全量通知,即不會通知一個服務的同類型的部分數據,用戶不須要對比上一次通知結果。這裏要注意幾個重點:
先來看看源碼:
private void saveProperties(URL url) { if (file == null) { return; } try { // 拼接url StringBuilder buf = new StringBuilder(); Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified != null) { for (List<URL> us : categoryNotified.values()) { for (URL u : us) { if (buf.length() > 0) { buf.append(URL_SEPARATOR); } buf.append(u.toFullString()); } } } // 設置到properties中 properties.setProperty(url.getServiceKey(), buf.toString()); // 增長版本號 long version = lastCacheChanged.incrementAndGet(); if (syncSaveFile) { // 將集合中的數據存儲到文件中 doSaveProperties(version); } else { //異步開啓保存到文件 registryCacheExecutor.execute(new SaveProperties(version)); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
該方法是單個消費者url對應在notified中的數據,保存在到文件,而保存到文件的操做是調用了doSaveProperties方法,該方法跟doSaveProperties的區別是doSaveProperties方法將properties數據所有覆蓋性的保存到文件,而saveProperties只是保存單個消費者url的數據。
該方法在JVM關閉時調用,進行取消註冊和訂閱的操做。具體邏輯就是調用了unregister和unsubscribe方法,有須要看源碼的能夠進入github查看。
我在上面講AbstractRegistry類的時候已經提到了FailbackRegistry,FailbackRegistry繼承了AbstractRegistry,AbstractRegistry中的註冊訂閱等方法,實際上就是一些內存緩存的變化,而真正的註冊訂閱的實現邏輯在FailbackRegistry實現,而且FailbackRegistry提供了失敗重試的機制。
源碼註釋地址: https://github.com/CrazyHZM/i...
// Scheduled executor service // 定時任務執行器 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry // 失敗重試定時器,定時去檢查是否有請求失敗的,若有,無限次重試。 private final ScheduledFuture<?> retryFuture; // 註冊失敗的URL集合 private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); // 取消註冊失敗的URL集合 private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); // 訂閱失敗的監聽器集合 private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 取消訂閱失敗的監聽器集合 private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 通知失敗的URL集合 private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
該類的屬性比較好理解,也能夠很明顯看出這些屬性都是跟失敗重試機制相關。
public FailbackRegistry(URL url) { super(url); // 從url中讀取重試頻率,若是爲空,則默認5000ms this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 建立失敗重試定時器 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // Check and connect to the registry try { //重試 retry(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
構造函數主要是建立了失敗重試的定時器,重試頻率從URL取,若是沒有設置,則默認爲5000ms。
這四個方法就是註冊、取消註冊、訂閱、取消訂閱的具體實現,由於代碼邏輯極其類似,因此爲放在一塊兒,下面爲只貼出註冊的源碼:
public void register(URL url) { super.register(url); //首先從失敗的緩存中刪除該url failedRegistered.remove(url); failedUnregistered.remove(url); try { // Sending a registration request to the server side // 向註冊中心發送一個註冊請求 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. // 若是開啓了啓動時檢測,則直接拋出異常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly // 把這個註冊失敗的url放入緩存,而且定時重試。 failedRegistered.add(url); } }
能夠看到,邏輯很清晰,就是作了一個doRegister的操做,若是失敗拋出異常,則加入到失敗的緩存中進行重試。爲這裏要解釋的是doRegister,與之對應的還有doUnregister、doSubscribe、doUnsubscribe三個方法,是FailbackRegistry抽象出來的方法,意圖在於每種實現註冊中心的方法不同,相對應的註冊、訂閱等操做也會有所區別,而把這四個方法抽象出現,爲了讓子類只去關注這四個的實現,好比說redis實現的註冊中心跟zookeeper實現的註冊中心方式確定不同,那麼對應的註冊訂閱等操做也有所不一樣,那麼各自只要去實現該抽象方法便可。
其餘的三個方法有須要的能夠查看github上的我寫的註釋。
@Override protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } try { // 通知 url 數據變化 doNotify(url, listener, urls); } catch (Exception t) { // Record a failed registration request to a failed list, retry regularly // 放入失敗的緩存中,重試 Map<NotifyListener, List<URL>> listeners = failedNotified.get(url); if (listeners == null) { failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>()); listeners = failedNotified.get(url); } listeners.put(listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { super.notify(url, listener, urls); }
能夠看到notify不同,他仍是又回去調用了父類AbstractRegistry的notify,與上述四個方法不同。
@Override protected void recover() throws Exception { // register // register 恢復註冊,添加到 `failedRegistered` ,定時重試 Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { failedRegistered.add(url); } } // subscribe // subscribe 恢復訂閱,添加到 `failedSubscribed` ,定時重試 Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { addFailedSubscribed(url, listener); } } } }
重寫了父類的recover,將註冊和訂閱放入到對應的失敗緩存中,而後定時重試。
該方法中實現了重試的邏輯,分別對註冊失敗failedRegistered、取消註冊失敗failedUnregistered、訂閱失敗failedSubscribed、取消訂閱失敗failedUnsubscribed、通知監聽器失敗failedNotified這五個緩存中的元素進行重試,重試的邏輯就是調用了相關的方法,而後從緩存中刪除,例如重試註冊,先進行doRegister,而後把該url從failedRegistered移除。具體的註釋請到GitHub查看。
該類實現了RegistryFactory接口,抽象了createRegistry方法,它實現了Registry的容器管理。
// Log output // 日誌記錄 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class); // The lock for the acquisition process of the registry // 鎖,對REGISTRIES訪問對競爭控制 private static final ReentrantLock LOCK = new ReentrantLock(); // Registry Collection Map<RegistryAddress, Registry> // Registry 集合 private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
public static void destroyAll() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } // Lock up the registry shutdown process // 得到鎖 LOCK.lock(); try { for (Registry registry : getRegistries()) { try { // 銷燬 registry.destroy(); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } // 清空緩存 REGISTRIES.clear(); } finally { // Release the lock // 釋放鎖 LOCK.unlock(); } }
該方法做用是銷燬全部的Registry對象,而且清除內存緩存,邏輯比較簡單,關鍵就是對REGISTRIES進行同步的操做。
@Override public Registry getRegistry(URL url) { // 修改url url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); // 計算key值 String key = url.toServiceString(); // Lock the registry access process to ensure a single instance of the registry // 得到鎖 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 建立Registry對象 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } // 添加到緩存。 REGISTRIES.put(key, registry); return registry; } finally { // Release the lock // 釋放鎖 LOCK.unlock(); } }
該方法是實現了RegistryFactory接口中的方法,關於key值的計算我會在後續講解URL的文章中講到,這裏最要注意的是createRegistry,由於AbstractRegistryFactory類把這個方法抽象出來,爲了讓子類只要關注該方法,好比說redis實現的註冊中心和zookeeper實現的註冊中心建立方式確定不一樣,而他們相同的一些操做都已經在AbstractRegistryFactory中實現。因此只要關注而且實現該抽象方法便可。
這兩個類實現了Invoker接口,分別是服務消費者和服務提供者的Invoker的包裝器,其中就包裝了一些屬性,咱們來看看源碼:
// Invoker 對象 private Invoker<T> invoker; // 原始url private URL originUrl; // 註冊中心url private URL registryUrl; // 消費者url private URL consumerUrl; // 註冊中心 Directory private RegistryDirectory registryDirectory;
// Invoker對象 private Invoker<T> invoker; // 原始url private URL originUrl; // 註冊中心url private URL registryUrl; // 服務提供者url private URL providerUrl; // 是否註冊 private volatile boolean isReg;
這兩個類都被運用在Dubbo QOS中,須要瞭解Dubbo QOS的能夠到官方文檔裏面查看
QOS網址: http://dubbo.apache.org/zh-cn...
服務提供者和消費者註冊表,存儲JVM進程中服務提供者和消費者的Invoker,該類也是被運用在QOS中,包括上面的兩個類,都跟QOS中的Offline下線服務命令和ls列出消費者和提供者邏輯實現有關係。咱們能夠看看它的屬性:
// 服務提供者Invoker集合,key 爲服務提供者的url 計算的key,就是url.toServiceString()方法獲得的 public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>(); // 服務消費者的Invoker集合,key 爲服務消費者的url 計算的key,url.toServiceString()方法獲得的 public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
能夠看到,其實記錄的服務提供者、消費者、註冊中心中間的調用鏈,爲了從一方出發可以很直觀的找到跟它相關聯的全部調用鏈。
該類中的其餘方法請自行查看,這部分跟運維命令的實現相關,因此爲不在這裏講解。
該類是一個dubbo單首創建的異常,在FailbackRegistry中被使用到,自定義的是一個跳過失敗重試的異常。
該類實現了StatusChecker,StatusChecker是一個狀態校驗的接口,RegistryStatusChecker是它的擴展類,作了一些跟註冊中心有關的狀態檢查和設置。咱們來看看源碼:
@Activate public class RegistryStatusChecker implements StatusChecker { @Override public Status check() { // 得到全部的註冊中心對象 Collection<Registry> registries = AbstractRegistryFactory.getRegistries(); if (registries.isEmpty()) { return new Status(Status.Level.UNKNOWN); } Status.Level level = Status.Level.OK; StringBuilder buf = new StringBuilder(); // 拼接註冊中心url中的地址 for (Registry registry : registries) { if (buf.length() > 0) { buf.append(","); } buf.append(registry.getUrl().getAddress()); // 若是註冊中心的節點不可用,則拼接disconnected,而且狀態設置爲error if (!registry.isAvailable()) { level = Status.Level.ERROR; buf.append("(disconnected)"); } else { buf.append("(connected)"); } } // 返回狀態檢查結果 return new Status(level, buf.toString()); } }
第一個關注點就是@Activate註解,也就是RegistryStatusChecker類會自動激活加載。該類就實現了接口的check方法,做用就是給註冊中心進行狀態檢查,而且返回檢查結果。
下面講的是integration下面的兩個類RegistryProtocol和RegistryDirectory,這兩個類與註冊中心核心的邏輯關係沒有那麼強。RegistryProtocol是對dubbo-rpc-api的依賴集成,RegistryDirectory是對dubbo-cluster的依賴集成。若是看了下面的解析有點糊塗,能夠先跳過這部分,等我出了rpc和cluster相關的文章後再回來看就會比較清晰。
這兩個類等我講解完rpc和cluster模塊以後再進行補充源碼解析。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了dubbo的註冊中心關於服務註冊、訂閱、服務變動通知等內部邏輯實現,接下來四篇文章我將會講解dubbo、multicast、zookeeper、redis四種實現註冊中心策略的邏輯實現。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。