本章主要介紹下AbstractRegistry、FailbackRegistry的做用和源碼。html
首先,直接引出這個類的做用,該類主要把服務提供者信息緩存本地文件上,文件目錄是:當前用戶目錄下的/.dubbo/dubbo-registry-${application}-${hos}-${port}.cache。
在解讀源碼前,先閱讀下AbstractRegistry類的成員變量,從成員變量中能夠看到這個類是怎麼完成數據的本地化存儲的。正則表達式
// URL 地址分隔符 private static final char URL_SEPARATOR = ' '; //URL地址正則表達式,任何空白符 private static final String URL_SPLIT = "\\s+"; // 參數保存到本地文件的最大重試次數 private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3; // 須要保存的參數 private final Properties properties = new Properties(); // 保存線程,能夠看出是否異步保存 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); // 是否同步保存 private boolean syncSaveFile; // 上一次保存的版本,每次保存更新+1 private final AtomicLong lastCacheChanged = new AtomicLong(); // 保存重試的次數 private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); // 服務註冊的URL保存在這裏 private final Set<URL> registered = new ConcurrentHashSet<>(); // 訂閱的URL,key:消費端訂閱者URL,values: 通知監聽器 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>(); // 訂閱的URL,key:消費端訂閱者URL,values: Map ,key:服務提供者的名字(默認爲providers,configurations,routers),和服務提供者URL private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>(); //當前註冊URL,於指定的註冊中心鏈接的URL private URL registryUrl; //本地文件 private File file;
public AbstractRegistry(URL url) { // 保存與註冊中心鏈接的url. setUrl(url); //判斷是否須要緩存本地文件,默認須要,文件地址 if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) { // Start file save timer // 是否同步保存,默認是異步 syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false); //文件名和路徑通常在,當前用戶目錄下的/.dubbo/dubbo-registry-${application}-${hos}-${port}.cache //例如dubbo-registry-dubbo-demo-annotation-provider-106.52.187.48-2181.cache String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache"; String filename = url.getParameter(FILE_KEY, defaultFilename); File file = null; //建立文件 if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; //在啓動訂閱中心時,咱們須要讀取本地緩存文件,以便未來進行註冊表容錯處理。 其實就是把本地文件file的內容 放入參數properties裏 loadProperties(); // 進行通知url.getBackupUrls(),第一個參數就是url 本身自己 notify(url.getBackupUrls()); } }
上面的註釋已經很是的清晰了,這裏就不在描述,須要關注的是notify()這個函數,因此當每一個服務註冊和訂閱時,首次建立註冊中心都會進行notify操做。具體來看下notify方法。算法
protected void notify(List<URL> urls) { // 這裏是註冊中心連接的url,裏面包括了服務提供方的信息(key:interface等) if (CollectionUtils.isEmpty(urls)) { return; } // 這裏循環全部的訂閱URL for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); // 查看訂閱的url 是不是訂閱當前的註冊服務。不是的話,輪訓下一個 if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } // 這裏訂閱的URL的通知監聽器 Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { // 而後進行依次遍歷通知 for (NotifyListener listener : listeners) { try { notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } }
接下來看下具體的notify(URL url, NotifyListener listener, List
protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } //result,key: providers,configurators,routers ,values:urls. Map<String, List<URL>> result = new HashMap<>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { // 這裏再一次判斷,訂閱URL 和服務提供者URL 是否匹配 String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); //監聽通知 listener.notify(categoryList); // 咱們將在每次通知後更新緩存文件。 // 當咱們的註冊表因爲網絡抖動而出現訂閱失敗時,咱們至少能夠返回現有的緩存URL。 saveProperties(url); } }
接着看下saveProperties,網絡
private void saveProperties(URL url) { if (file == null) { return; } try { StringBuilder buf = new StringBuilder(); // 獲得該訂閱URL 的全部服務提供者URLS,並放入buf中 Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified != null) { for (List<URL> us : categoryNotified.values()) { for (URL u : us) { if (buf.length() > 0) { buf.append(URL_SEPARATOR); } buf.append(u.toFullString()); } } } // key 服務接口,value :提供者URL. properties.setProperty(url.getServiceKey(), buf.toString()); long version = lastCacheChanged.incrementAndGet(); // 新增一個版本 if (syncSaveFile) { //同步保存 doSaveProperties(version); } else { // 異步保存 registryCacheExecutor.execute(new SaveProperties(version)); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
從上面能夠知道,把消費端的訂閱的服務信息存入了file文件中,doSaveProperties就是文件操做,不進行分析。再一次強調下,消費端訂閱時,會訂閱某個具體服務下3個節點(providers,configurations,routers)。
app
接着,FailbackRegistry繼承自AbstractRegistry。
其構造函數以下,能夠得知除了調用AbstractRegistry構造方法外,而且建立一個HashedWheelTimer類型的定時器。異步
public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD); // since the retry task will not be very much. 128 ticks is enough. //集運時間輪轉的重試線程器 retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); }
而且FailbackRegistry 成員記錄一組註冊失敗和訂閱失敗的集合,而後經過retryTimer定式掃描這些失敗集合,從新發起訂閱和註冊。以後會單獨拿一章節來說解這個時間輪算法。
下面是失敗集合:ide
// 這裏是註冊失敗的urls private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>(); // 這裏是取消註冊失敗的urls private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>(); // 這裏是訂閱失敗的urls private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>(); // 這裏是取消訂閱失敗的urls private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>(); // 這裏是通知notify()方法失敗異常時的url集合,會進行從新通知 private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
本章的內容比較簡單,主要是接上一章節Dubbo系列之 (二)Registry註冊中心-註冊(1)的內容,使其完整。目前咱們已經完成大部分dubbo是如何與註冊中心交互的,接下來的章節咱們講繼續分享dubbo服務的導出和訂閱等內容。函數