/** * Node. (API/SPI, Prototype, ThreadSafe) */ public interface Node { /** * get url. * * @return url. */ URL getUrl(); /** * is available. * * @return available. */ boolean isAvailable(); /** * destroy. */ void destroy(); }
/** * RegistryService. (SPI, Prototype, ThreadSafe) * * @see org.apache.dubbo.registry.Registry * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL) */ public interface RegistryService { /** * 註冊 */ void register(URL url); /** * 反註冊 */ void unregister(URL url); /** * 訂閱 */ void subscribe(URL url, NotifyListener listener); /** * 取消訂閱 */ void unsubscribe(URL url, NotifyListener listener); /** * 查找 */ List<URL> lookup(URL url); }
private final Set<URL> registered = new ConcurrentHashSet<>();
@Override public void register(URL url) { if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()) { logger.info("Register: " + url); } registered.add(url); }
@Override public void unregister(URL url) { if (url == null) { throw new IllegalArgumentException("unregister url == null"); } if (logger.isInfoEnabled()) { logger.info("Unregister: " + url); } registered.remove(url); }
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
public interface NotifyListener { /** * 給一組URL發通知 */ void notify(List<URL> urls); }
@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } //若是訂閱對象中沒有該url的鍵,則建立該鍵對應的集合,集合添加監聽器listener Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>()); listeners.add(listener); }
@Override public void unsubscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("unsubscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("unsubscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Unsubscribe: " + url); } //找出訂閱到全部監聽器,在這些監聽器集合中刪除對應多監聽器 Set<NotifyListener> listeners = subscribed.get(url); if (listeners != null) { listeners.remove(listener); } }
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
@Override public List<URL> lookup(URL url) { List<URL> result = new ArrayList<>(); //獲取幾組已通知的生產者url,根據String類型的不一樣,可能會有幾組url Map<String, List<URL>> notifiedUrls = getNotified().get(url); //若是存在這麼幾組已通知的生產者url if (notifiedUrls != null && notifiedUrls.size() > 0) { //遍歷全部的組,獲取每一組的url集合 for (List<URL> urls : notifiedUrls.values()) { //遍歷集合每個url for (URL u : urls) { if (!EMPTY_PROTOCOL.equals(u.getProtocol())) { //將全部協議不爲空的url添加到結果集合中 result.add(u); } } } } else { //若是不存在任何已通知的url final AtomicReference<List<URL>> reference = new AtomicReference<>(); //這段代碼的意思等同與new NotifyListener() { // void notify(List<URL> urls) { // reference.set(urls); // } //} //這裏就是實現了一個匿名類訂閱監聽器對象 NotifyListener listener = reference::set; //將該匿名類訂閱監聽器對象放入訂閱對象中,這裏是訂閱了,有沒有通知不知道,看多線程的 subscribe(url, listener); // Subscribe logic guarantees the first notify to return //若是有通知的url就取出 List<URL> urls = reference.get(); if (CollectionUtils.isNotEmpty(urls)) { for (URL u : urls) { if (!EMPTY_PROTOCOL.equals(u.getProtocol())) { //遍歷全部有通知的url,且協議不爲空協議的,添加到結果集 result.add(u); } } } } return result; }
public Map<URL, Map<String, List<URL>>> getNotified() { //返回一個不可變的映射,該映射不可修改 return Collections.unmodifiableMap(notified); }
private File file;
private final Properties properties = new Properties();
private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); //從硬盤中讀取文件的內容,保存到properties中 properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry cache file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry cache file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
private final AtomicLong lastCacheChanged = new AtomicLong(); //版本號,保證數據是最新的
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); //保存文件的重試次數
private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3; //重試的最大次數
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //用於異步保存文件的線程池,它只有1個線程
public void doSaveProperties(long version) { //當前版本號必須大於等於已保存版本號,絕對不能小於 if (version < lastCacheChanged.get()) { return; } if (file == null) { return; } // Save try { File lockfile = new File(file.getAbsolutePath() + ".lock"); if (!lockfile.exists()) { lockfile.createNewFile(); } try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); FileChannel channel = raf.getChannel()) { FileLock lock = channel.tryLock(); //當多線程操做文件時,咱們須要得到一個文件獨佔鎖 if (lock == null) { throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); } //將properties的內容寫入到文件中 try { if (!file.exists()) { file.createNewFile(); } try (FileOutputStream outputFile = new FileOutputStream(file)) { properties.store(outputFile, "Dubbo Registry Cache"); } } finally { lock.release(); } } } catch (Throwable e) { //當其餘線程拿不到文件鎖到時候,將重試次數加1,此處爲原子競爭,不一樣的線程不能同時加 savePropertiesRetryTimes.incrementAndGet(); //若是重試次數達到最大,將其從新置0 if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) { logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e); savePropertiesRetryTimes.set(0); return; } //若是沒有達到重試次數上限,繼續判斷版本號是否大於已保存版本號 if (version < lastCacheChanged.get()) { savePropertiesRetryTimes.set(0); return; } else { //若是當前版本號大於已保存版本號,調用其餘線程來從新執行一次doSaveProperties方法,原子競爭到版本號加1的線程能夠執行 registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); } logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e); } }
private class SaveProperties implements Runnable { private long version; private SaveProperties(long version) { this.version = version; } @Override public void run() { doSaveProperties(version); } }