RegistryService源碼地址: http://dubbo.apache.org/zh-cn...
void register(URL url);
取消註冊,該方法也很簡單,就是取消註冊,也就是商品生產者不在銷售該商品, 須要把東西從自動售賣機上取下來,欄目也要取出,這裏強調按全URL匹配取消註冊。api
void unregister(URL url);
void subscribe(URL url, NotifyListener listener);
void unsubscribe(URL url, NotifyListener listener);
List<URL> lookup(URL url);
public interface Registry extends Node, RegistryService { }
public interface Node { //得到節點地址 URL getUrl(); //判斷節點是否可用 boolean isAvailable(); //銷燬節點 void destroy(); }
@SPI("dubbo") public interface RegistryFactory { @Adaptive({"protocol"}) Registry getRegistry(URL url); }
地址: http://dubbo.apache.org/zh-cn...
public interface NotifyListener { /** * 當收到服務變動通知時觸發。 * <p> * 通知需處理契約:<br> * 1. 老是以服務接口和數據類型爲維度全量通知,即不會通知一個服務的同類型的部分數據,用戶不須要對比上一次通知結果。<br> * 2. 訂閱時的第一次通知,必須是一個服務的全部類型數據的全量通知。<br> * 3. 中途變動時,容許不一樣類型的數據分開通知,好比:providers, consumers, routers, overrides,容許只通知其中一種類型,但該類型的數據必須是全量的,不是增量的。<br> * 4. 若是一種類型的數據爲空,需通知一個empty協議並帶category參數的標識性URL數據。<br> * 5. 通知者(即註冊中心實現)需保證通知的順序,好比:單線程推送,隊列串行化,帶版本對比。<br> * * @param urls 已註冊信息列表,總不爲空,含義同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。 */ void notify(List<URL> urls); }
源碼註釋地址: https://github.com/CrazyHZM/i...
// URL的地址分隔符,在緩存文件中使用,服務提供者的URL分隔 private static final char URL_SEPARATOR = ' '; // URL地址分隔正則表達式,用於解析文件緩存中服務提供者URL列表 private static final String URL_SPLIT = "\\s+"; // 日誌輸出 protected final Logger logger = LoggerFactory.getLogger(getClass()); // 本地磁盤緩存,有一個特殊的key值爲registies,記錄的是註冊中心列表,其餘記錄的都是服務提供者列表 private final Properties properties = new Properties(); // 緩存寫入執行器 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); // 是否同步保存文件標誌 private final boolean syncSaveFile; //數據版本號 private final AtomicLong lastCacheChanged = new AtomicLong(); // 已註冊 URL 集合 // 註冊的 URL 不只僅能夠是服務提供者的,也能夠是服務消費者的 private final Set<URL> registered = new ConcurrentHashSet<URL>(); // 訂閱URL的監聽器集合 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 某個消費者被通知的某一類型的 URL 集合 // 第一個key是消費者的URL,對應的就是哪一個消費者。 // value是一個map集合,該map集合的key是分類的意思,例如providers、routes等,value就是被通知的URL集合 private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>(); // 註冊中心 URL private URL registryUrl; // 本地磁盤緩存文件,緩存註冊中心的數據 private File file;
public AbstractRegistry(URL url) { // 把url放到registryUrl中 setUrl(url); // Start file save timer // 從url中讀取是否同步保存文件的配置,若是沒有值默認用異步保存文件 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); // 得到file路徑 String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache"); File file = null; if (ConfigUtils.isNotEmpty(filename)) { //建立文件 file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; // 把文件裏面的數據寫入properties loadProperties(); // 通知監聽器,URL 變化結果 notify(url.getBackupUrls()); }
protected static List<URL> filterEmpty(URL url, List<URL> urls) { if (urls == null || urls.isEmpty()) { List<URL> result = new ArrayList<URL>(1); result.add(url.setProtocol(Constants.EMPTY_PROTOCOL)); return result; } return urls; }
private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); // 把數據寫入到內存緩存中 properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry store file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry store file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
public List<URL> getCacheUrls(URL url) { for (Map.Entry<Object, Object> entry : properties.entrySet()) { // key爲某個分類,例如服務提供者分類 String key = (String) entry.getKey(); // value爲某個分類的列表,例如服務提供者列表 String value = (String) entry.getValue(); if (key != null && key.length() > 0 && key.equals(url.getServiceKey()) && (Character.isLetter(key.charAt(0)) || key.charAt(0) == '_') && value != null && value.length() > 0) { //分割出列表的每一個值 String[] arr = value.trim().split(URL_SPLIT); List<URL> urls = new ArrayList<URL>(); for (String u : arr) { urls.add(URL.valueOf(u)); } return urls; } } return null; }
@Override public List<URL> lookup(URL url) { List<URL> result = new ArrayList<URL>(); // 得到該消費者url訂閱的 全部被通知的 服務URL集合 Map<String, List<URL>> notifiedUrls = getNotified().get(url); // 判斷該消費者是否訂閱服務 if (notifiedUrls != null && notifiedUrls.size() > 0) { for (List<URL> urls : notifiedUrls.values()) { for (URL u : urls) { // 判斷協議是否爲空 if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) { // 添加 該消費者訂閱的服務URL result.add(u); } } } } else { // 原子類 避免在獲取註冊在註冊中心的服務url時可以保證是最新的url集合 final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>(); // 通知監聽器。當收到服務變動通知時觸發 NotifyListener listener = new NotifyListener() { @Override public void notify(List<URL> urls) { reference.set(urls); } }; // 訂閱服務,就是消費者url訂閱已經 註冊在註冊中心的服務(也就是添加該服務的監聽器) subscribe(url, listener); // Subscribe logic guarantees the first notify to return List<URL> urls = reference.get(); if (urls != null && !urls.isEmpty()) { for (URL u : urls) { if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) { result.add(u); } } } } return result; }
@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } // 得到該消費者url 已經訂閱的服務 的監聽器集合 Set<NotifyListener> listeners = subscribed.get(url); if (listeners == null) { subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); listeners = subscribed.get(url); } // 添加某個服務的監聽器 listeners.add(listener); }
protected void recover() throws Exception { // register //把內存緩存中的registered取出來遍歷進行註冊 Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { register(url); } } // subscribe //把內存緩存中的subscribed取出來遍歷進行訂閱 Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { subscribe(url, listener); } } } }
protected void notify(List<URL> urls) { if (urls == null || urls.isEmpty()) return; // 遍歷訂閱URL的監聽器集合,通知他們 for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); // 匹配 if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } // 遍歷監聽器集合,通知他們 Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } } protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); // 將urls進行分類 for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { // 按照url中key爲category對應的值進行分類,若是沒有該值,就找key爲providers的值進行分類 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); // 分類結果放入result result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } // 得到某一個消費者被通知的url集合(通知的 URL 變化結果) Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { // 添加該消費者對應的url notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } // 處理通知監聽器URL 變化結果 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); // 把分類標實和分類後的列表放入notified的value中 // 覆蓋到 `notified` // 當某個分類的數據爲空時,會依然有 urls 。其中 `urls[0].protocol = empty` ,經過這樣的方式,處理全部服務提供者爲空的狀況。 categoryNotified.put(category, categoryList); // 保存到文件 saveProperties(url); //通知監聽器 listener.notify(categoryList); } }
private void saveProperties(URL url) { if (file == null) { return; } try { // 拼接url StringBuilder buf = new StringBuilder(); Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified != null) { for (List<URL> us : categoryNotified.values()) { for (URL u : us) { if (buf.length() > 0) { buf.append(URL_SEPARATOR); } buf.append(u.toFullString()); } } } // 設置到properties中 properties.setProperty(url.getServiceKey(), buf.toString()); // 增長版本號 long version = lastCacheChanged.incrementAndGet(); if (syncSaveFile) { // 將集合中的數據存儲到文件中 doSaveProperties(version); } else { //異步開啓保存到文件 registryCacheExecutor.execute(new SaveProperties(version)); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
源碼註釋地址: https://github.com/CrazyHZM/i...
// Scheduled executor service // 定時任務執行器 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry // 失敗重試定時器,定時去檢查是否有請求失敗的,若有,無限次重試。 private final ScheduledFuture<?> retryFuture; // 註冊失敗的URL集合 private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); // 取消註冊失敗的URL集合 private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); // 訂閱失敗的監聽器集合 private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 取消訂閱失敗的監聽器集合 private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 通知失敗的URL集合 private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
public FailbackRegistry(URL url) { super(url); // 從url中讀取重試頻率,若是爲空,則默認5000ms this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 建立失敗重試定時器 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // Check and connect to the registry try { //重試 retry(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
public void register(URL url) { super.register(url); //首先從失敗的緩存中刪除該url failedRegistered.remove(url); failedUnregistered.remove(url); try { // Sending a registration request to the server side // 向註冊中心發送一個註冊請求 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. // 若是開啓了啓動時檢測,則直接拋出異常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly // 把這個註冊失敗的url放入緩存,而且定時重試。 failedRegistered.add(url); } }
@Override protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } try { // 通知 url 數據變化 doNotify(url, listener, urls); } catch (Exception t) { // Record a failed registration request to a failed list, retry regularly // 放入失敗的緩存中,重試 Map<NotifyListener, List<URL>> listeners = failedNotified.get(url); if (listeners == null) { failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>()); listeners = failedNotified.get(url); } listeners.put(listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { super.notify(url, listener, urls); }
@Override protected void recover() throws Exception { // register // register 恢復註冊,添加到 `failedRegistered` ,定時重試 Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { failedRegistered.add(url); } } // subscribe // subscribe 恢復訂閱,添加到 `failedSubscribed` ,定時重試 Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { addFailedSubscribed(url, listener); } } } }
// Log output // 日誌記錄 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class); // The lock for the acquisition process of the registry // 鎖,對REGISTRIES訪問對競爭控制 private static final ReentrantLock LOCK = new ReentrantLock(); // Registry Collection Map<RegistryAddress, Registry> // Registry 集合 private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
public static void destroyAll() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } // Lock up the registry shutdown process // 得到鎖 LOCK.lock(); try { for (Registry registry : getRegistries()) { try { // 銷燬 registry.destroy(); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } // 清空緩存 REGISTRIES.clear(); } finally { // Release the lock // 釋放鎖 LOCK.unlock(); } }
@Override public Registry getRegistry(URL url) { // 修改url url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); // 計算key值 String key = url.toServiceString(); // Lock the registry access process to ensure a single instance of the registry // 得到鎖 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 建立Registry對象 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } // 添加到緩存。 REGISTRIES.put(key, registry); return registry; } finally { // Release the lock // 釋放鎖 LOCK.unlock(); } }
// Invoker 對象 private Invoker<T> invoker; // 原始url private URL originUrl; // 註冊中心url private URL registryUrl; // 消費者url private URL consumerUrl; // 註冊中心 Directory private RegistryDirectory registryDirectory;
// Invoker對象 private Invoker<T> invoker; // 原始url private URL originUrl; // 註冊中心url private URL registryUrl; // 服務提供者url private URL providerUrl; // 是否註冊 private volatile boolean isReg;
這兩個類都被運用在Dubbo QOS中,須要瞭解Dubbo QOS的能夠到官方文檔裏面查看
QOS網址: http://dubbo.apache.org/zh-cn...
// 服務提供者Invoker集合,key 爲服務提供者的url 計算的key,就是url.toServiceString()方法獲得的 public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>(); // 服務消費者的Invoker集合,key 爲服務消費者的url 計算的key,url.toServiceString()方法獲得的 public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
@Activate public class RegistryStatusChecker implements StatusChecker { @Override public Status check() { // 得到全部的註冊中心對象 Collection<Registry> registries = AbstractRegistryFactory.getRegistries(); if (registries.isEmpty()) { return new Status(Status.Level.UNKNOWN); } Status.Level level = Status.Level.OK; StringBuilder buf = new StringBuilder(); // 拼接註冊中心url中的地址 for (Registry registry : registries) { if (buf.length() > 0) { buf.append(","); } buf.append(registry.getUrl().getAddress()); // 若是註冊中心的節點不可用,則拼接disconnected,而且狀態設置爲error if (!registry.isAvailable()) { level = Status.Level.ERROR; buf.append("(disconnected)"); } else { buf.append("(connected)"); } } // 返回狀態檢查結果 return new Status(level, buf.toString()); } }
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...