Apache Dubbo 2.7孵化版整理

分析Dubbo固然要從註冊開始,2.7的註冊加入了很是多的方式,已經不限於Zookeeper.java

基本上如今主流的註冊模式都有了。apache

這種一看就是模版方法模式,其中Registry接口繼承與兩個接口Node,RegistryService,它本身自己沒有任何方法。緩存

/**
 * Node. (API/SPI, Prototype, ThreadSafe)
 */
public interface Node {

    /**
     * get url.
     *
     * @return url.
     */
    URL getUrl();

    /**
     * is available.
     *
     * @return available.
     */
    boolean isAvailable();

    /**
     * destroy.
     */
    void destroy();

}
/**
 * RegistryService. (SPI, Prototype, ThreadSafe)
 *
 * @see org.apache.dubbo.registry.Registry
 * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
 */
public interface RegistryService {

    /**
     * 註冊
     */
    void register(URL url);

    /**
     * 反註冊
     */
    void unregister(URL url);

    /**
     * 訂閱
     */
    void subscribe(URL url, NotifyListener listener);

    /**
     * 取消訂閱
     */
    void unsubscribe(URL url, NotifyListener listener);

    /**
     * 查找
     */
    List<URL> lookup(URL url);

}

AbstractRegistry是一個抽象類,實現了Registry接口的註冊,訂閱,查找等方法,咱們來看一下註冊方法。安全

private final Set<URL> registered = new ConcurrentHashSet<>();

這個URL是Dubbo本身封裝的一個類,其中包含了例如協議、用戶名、密碼、主機、端口號、路徑、參數等的屬性。多線程

@Override
public void register(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("register url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Register: " + url);
    }
    registered.add(url);
}

這個是Apache孵化後的寫法。意思就是把提供者或者消費者各自的資源放到該集合中,此處是一個線程安全的集合。dom

取消註冊異步

@Override
public void unregister(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("unregister url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Unregister: " + url);
    }
    registered.remove(url);
}

訂閱ide

private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
public interface NotifyListener {

    /**
     * 給一組URL發通知
     */
    void notify(List<URL> urls);

}

NotifyListener是一個專門用監聽通知的接口,它的做用就是專門用來發通知的,至於怎麼發通知須要到Registry最終實現裏面去具體實現。this

此處能夠看到訂閱對象是一個URL對應一組監聽器url

@Override
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    //若是訂閱對象中沒有該url的鍵,則建立該鍵對應的集合,集合添加監聽器listener
    Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
    listeners.add(listener);
}

取消訂閱

@Override
public void unsubscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("unsubscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("unsubscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Unsubscribe: " + url);
    }
    //找出訂閱到全部監聽器,在這些監聽器集合中刪除對應多監聽器
    Set<NotifyListener> listeners = subscribed.get(url);
    if (listeners != null) {
        listeners.remove(listener);
    }
}

查找

private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();

第一個URL爲消費者的URL,Map的String爲類型,基本爲生產者,具體包含providers、consumers、routes、configurators,List<URL>爲消費者訂閱生產者的URL集合

lookup即爲查找一個消費者訂閱了哪些生產者URL

@Override
public List<URL> lookup(URL url) {
    List<URL> result = new ArrayList<>();
    //獲取幾組已通知的生產者url,根據String類型的不一樣,可能會有幾組url
    Map<String, List<URL>> notifiedUrls = getNotified().get(url);
    //若是存在這麼幾組已通知的生產者url
    if (notifiedUrls != null && notifiedUrls.size() > 0) {
        //遍歷全部的組,獲取每一組的url集合
        for (List<URL> urls : notifiedUrls.values()) {
            //遍歷集合每個url
            for (URL u : urls) {
                if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    //將全部協議不爲空的url添加到結果集合中
                    result.add(u);
                }
            }
        }
    } else {
        //若是不存在任何已通知的url
        final AtomicReference<List<URL>> reference = new AtomicReference<>();
        //這段代碼的意思等同與new NotifyListener() {
        //    void notify(List<URL> urls) {
        //        reference.set(urls);
        //    }
        //}
        //這裏就是實現了一個匿名類訂閱監聽器對象
        NotifyListener listener = reference::set;
        //將該匿名類訂閱監聽器對象放入訂閱對象中,這裏是訂閱了,有沒有通知不知道,看多線程的
        subscribe(url, listener); // Subscribe logic guarantees the first notify to return
        //若是有通知的url就取出
        List<URL> urls = reference.get();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    //遍歷全部有通知的url,且協議不爲空協議的,添加到結果集
                    result.add(u);
                }
            }
        }
    }
    return result;
}
public Map<URL, Map<String, List<URL>>> getNotified() {
    //返回一個不可變的映射,該映射不可修改
    return Collections.unmodifiableMap(notified);
}

總的來講notified是一個內存緩存,用來保存獲取的服務提供者,不用每次遠程調用都要先從註冊中心獲取一次可調用的服務列表,對於沒有服務提供者提供服務的URL,它會以特殊的empty://前綴開頭。固然它還有一個磁盤文件服務緩存以及一個Properties,Properties本質是一個HashTable,線程安全的Map.

private File file;
private final Properties properties = new Properties();

咱們來看一下從硬盤加載資源到內存中的過程

private void loadProperties() {
    if (file != null && file.exists()) {
        InputStream in = null;
        try {
            in = new FileInputStream(file);
            //從硬盤中讀取文件的內容,保存到properties中
            properties.load(in);
            if (logger.isInfoEnabled()) {
                logger.info("Load registry cache file " + file + ", data: " + properties);
            }
        } catch (Throwable e) {
            logger.warn("Failed to load registry cache file " + file, e);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
}

properties保存了全部服務提供者的URL,使用URL#serviceKey()做爲key,提供者列表、路由規則列表、配置規則列表等作爲value。還有一個key.registies,保存全部的註冊中心的地址。若是應用在啓動過程當中,註冊中心沒法鏈接或宕機,Dubbo會自動經過本地緩存加載Invokers(調用程序).

有從硬盤文件導入,固然就會有保存。保存資源到文件的主入口爲doSaveProperties().

private final AtomicLong lastCacheChanged = new AtomicLong(); //版本號,保證數據是最新的
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); //保存文件的重試次數
private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3; //重試的最大次數
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //用於異步保存文件的線程池,它只有1個線程
public void doSaveProperties(long version) {
    //當前版本號必須大於等於已保存版本號,絕對不能小於
    if (version < lastCacheChanged.get()) {
        return;
    }
    if (file == null) {
        return;
    }
    // Save
    try {
        File lockfile = new File(file.getAbsolutePath() + ".lock");
        if (!lockfile.exists()) {
            lockfile.createNewFile();
        }
        try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
             FileChannel channel = raf.getChannel()) {
            FileLock lock = channel.tryLock();
            //當多線程操做文件時,咱們須要得到一個文件獨佔鎖
            if (lock == null) {
                throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
            }
            //將properties的內容寫入到文件中
            try {
                if (!file.exists()) {
                    file.createNewFile();
                }
                try (FileOutputStream outputFile = new FileOutputStream(file)) {
                    properties.store(outputFile, "Dubbo Registry Cache");
                }
            } finally {
                lock.release();
            }
        }
    } catch (Throwable e) {
        //當其餘線程拿不到文件鎖到時候,將重試次數加1,此處爲原子競爭,不一樣的線程不能同時加
        savePropertiesRetryTimes.incrementAndGet();
        //若是重試次數達到最大,將其從新置0
        if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
            logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
            savePropertiesRetryTimes.set(0);
            return;
        }
        //若是沒有達到重試次數上限,繼續判斷版本號是否大於已保存版本號
        if (version < lastCacheChanged.get()) {
            savePropertiesRetryTimes.set(0);
            return;
        } else { //若是當前版本號大於已保存版本號調用其餘線程來從新執行一次doSaveProperties方法原子競爭到版本號加1的線程能夠執行
            registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
    }
}
private class SaveProperties implements Runnable {
    private long version;

    private SaveProperties(long version) {
        this.version = version;
    }

    @Override
    public void run() {
        doSaveProperties(version);
    }
}

還有一些通知方法notify()是在客戶端第一次訂閱獲取全量數據後,後續因爲訂閱獲得新數據時,都會調用該方法進行保存。

相關文章
相關標籤/搜索