分析Dubbo固然要從註冊開始,2.7的註冊加入了很是多的方式,已經不限於Zookeeper.java
基本上如今主流的註冊模式都有了。apache
這種一看就是模版方法模式,其中Registry接口繼承與兩個接口Node,RegistryService,它本身自己沒有任何方法。緩存
/** * Node. (API/SPI, Prototype, ThreadSafe) */ public interface Node { /** * get url. * * @return url. */ URL getUrl(); /** * is available. * * @return available. */ boolean isAvailable(); /** * destroy. */ void destroy(); }
/** * RegistryService. (SPI, Prototype, ThreadSafe) * * @see org.apache.dubbo.registry.Registry * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL) */ public interface RegistryService { /** * 註冊 */ void register(URL url); /** * 反註冊 */ void unregister(URL url); /** * 訂閱 */ void subscribe(URL url, NotifyListener listener); /** * 取消訂閱 */ void unsubscribe(URL url, NotifyListener listener); /** * 查找 */ List<URL> lookup(URL url); }
AbstractRegistry是一個抽象類,實現了Registry接口的註冊,訂閱,查找等方法,咱們來看一下註冊方法。安全
private final Set<URL> registered = new ConcurrentHashSet<>();
這個URL是Dubbo本身封裝的一個類,其中包含了例如協議、用戶名、密碼、主機、端口號、路徑、參數等的屬性。多線程
@Override public void register(URL url) { if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()) { logger.info("Register: " + url); } registered.add(url); }
這個是Apache孵化後的寫法。意思就是把提供者或者消費者各自的資源放到該集合中,此處是一個線程安全的集合。dom
取消註冊異步
@Override public void unregister(URL url) { if (url == null) { throw new IllegalArgumentException("unregister url == null"); } if (logger.isInfoEnabled()) { logger.info("Unregister: " + url); } registered.remove(url); }
訂閱ide
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
public interface NotifyListener { /** * 給一組URL發通知 */ void notify(List<URL> urls); }
NotifyListener是一個專門用監聽通知的接口,它的做用就是專門用來發通知的,至於怎麼發通知須要到Registry最終實現裏面去具體實現。this
此處能夠看到訂閱對象是一個URL對應一組監聽器url
@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } //若是訂閱對象中沒有該url的鍵,則建立該鍵對應的集合,集合添加監聽器listener Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>()); listeners.add(listener); }
取消訂閱
@Override public void unsubscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("unsubscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("unsubscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Unsubscribe: " + url); } //找出訂閱到全部監聽器,在這些監聽器集合中刪除對應多監聽器 Set<NotifyListener> listeners = subscribed.get(url); if (listeners != null) { listeners.remove(listener); } }
查找
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
第一個URL爲消費者的URL,Map的String爲類型,基本爲生產者,具體包含providers、consumers、routes、configurators,List<URL>爲消費者訂閱生產者的URL集合
lookup即爲查找一個消費者訂閱了哪些生產者URL
@Override public List<URL> lookup(URL url) { List<URL> result = new ArrayList<>(); //獲取幾組已通知的生產者url,根據String類型的不一樣,可能會有幾組url Map<String, List<URL>> notifiedUrls = getNotified().get(url); //若是存在這麼幾組已通知的生產者url if (notifiedUrls != null && notifiedUrls.size() > 0) { //遍歷全部的組,獲取每一組的url集合 for (List<URL> urls : notifiedUrls.values()) { //遍歷集合每個url for (URL u : urls) { if (!EMPTY_PROTOCOL.equals(u.getProtocol())) { //將全部協議不爲空的url添加到結果集合中 result.add(u); } } } } else { //若是不存在任何已通知的url final AtomicReference<List<URL>> reference = new AtomicReference<>(); //這段代碼的意思等同與new NotifyListener() { // void notify(List<URL> urls) { // reference.set(urls); // } //} //這裏就是實現了一個匿名類訂閱監聽器對象 NotifyListener listener = reference::set; //將該匿名類訂閱監聽器對象放入訂閱對象中,這裏是訂閱了,有沒有通知不知道,看多線程的 subscribe(url, listener); // Subscribe logic guarantees the first notify to return //若是有通知的url就取出 List<URL> urls = reference.get(); if (CollectionUtils.isNotEmpty(urls)) { for (URL u : urls) { if (!EMPTY_PROTOCOL.equals(u.getProtocol())) { //遍歷全部有通知的url,且協議不爲空協議的,添加到結果集 result.add(u); } } } } return result; }
public Map<URL, Map<String, List<URL>>> getNotified() { //返回一個不可變的映射,該映射不可修改 return Collections.unmodifiableMap(notified); }
總的來講notified是一個內存緩存,用來保存獲取的服務提供者,不用每次遠程調用都要先從註冊中心獲取一次可調用的服務列表,對於沒有服務提供者提供服務的URL,它會以特殊的empty://前綴開頭。固然它還有一個磁盤文件服務緩存以及一個Properties,Properties本質是一個HashTable,線程安全的Map.
private File file;
private final Properties properties = new Properties();
咱們來看一下從硬盤加載資源到內存中的過程
private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); //從硬盤中讀取文件的內容,保存到properties中 properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry cache file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry cache file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
properties保存了全部服務提供者的URL,使用URL#serviceKey()做爲key,提供者列表、路由規則列表、配置規則列表等作爲value。還有一個key.registies,保存全部的註冊中心的地址。若是應用在啓動過程當中,註冊中心沒法鏈接或宕機,Dubbo會自動經過本地緩存加載Invokers(調用程序).
有從硬盤文件導入,固然就會有保存。保存資源到文件的主入口爲doSaveProperties().
private final AtomicLong lastCacheChanged = new AtomicLong(); //版本號,保證數據是最新的
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); //保存文件的重試次數
private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3; //重試的最大次數
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //用於異步保存文件的線程池,它只有1個線程
public void doSaveProperties(long version) { //當前版本號必須大於等於已保存版本號,絕對不能小於 if (version < lastCacheChanged.get()) { return; } if (file == null) { return; } // Save try { File lockfile = new File(file.getAbsolutePath() + ".lock"); if (!lockfile.exists()) { lockfile.createNewFile(); } try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); FileChannel channel = raf.getChannel()) { FileLock lock = channel.tryLock(); //當多線程操做文件時,咱們須要得到一個文件獨佔鎖 if (lock == null) { throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); } //將properties的內容寫入到文件中 try { if (!file.exists()) { file.createNewFile(); } try (FileOutputStream outputFile = new FileOutputStream(file)) { properties.store(outputFile, "Dubbo Registry Cache"); } } finally { lock.release(); } } } catch (Throwable e) { //當其餘線程拿不到文件鎖到時候,將重試次數加1,此處爲原子競爭,不一樣的線程不能同時加 savePropertiesRetryTimes.incrementAndGet(); //若是重試次數達到最大,將其從新置0 if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) { logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e); savePropertiesRetryTimes.set(0); return; } //若是沒有達到重試次數上限,繼續判斷版本號是否大於已保存版本號 if (version < lastCacheChanged.get()) { savePropertiesRetryTimes.set(0); return; } else { //若是當前版本號大於已保存版本號,調用其餘線程來從新執行一次doSaveProperties方法,原子競爭到版本號加1的線程能夠執行 registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); } logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e); } }
private class SaveProperties implements Runnable { private long version; private SaveProperties(long version) { this.version = version; } @Override public void run() { doSaveProperties(version); } }
還有一些通知方法notify()是在客戶端第一次訂閱獲取全量數據後,後續因爲訂閱獲得新數據時,都會調用該方法進行保存。