文章有點長,親,要慢慢看!html
整個registry下的模塊 java
api是註冊中心全部的API和抽象類實現正則表達式
default是註冊中心的內存實現redis
zookeeper、redis、nacos就是基於不一樣的組件的實現apache
multicast是經過廣播實現api
這張圖相信只要是用過的都不陌生,掛在dubbo.io的官網掛了好久好久了。那麼這個流程主要是說了什麼呢?緩存
api層主要是註冊中心全部API的抽象實現類,並非實際提供服務的組件。bash
模塊關係圖 服務器
類關係圖 網絡
目錄結構
public interface RegistryService {
/** * 註冊服務. * @param url 註冊信息,不容許爲空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin */
void register(URL url);
/** * 取消註冊服務. * @param url 註冊信息,不容許爲空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin */
void unregister(URL url);
/** * 訂閱服務. * @param listener 變動事件監聽器,不容許爲空 */
void subscribe(URL url, NotifyListener listener);
/** * 取消訂閱服務. * @param url 訂閱條件,不容許爲空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @param listener 變動事件監聽器,不容許爲空 */
void unsubscribe(URL url, NotifyListener listener);
/** * 查詢註冊列表,與訂閱的推模式相對應,這裏爲拉模式,只返回一次結果。 * * @see org.apache.dubbo.registry.NotifyListener#notify(List) * @param url 查詢條件,不容許爲空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @return 已註冊信息列表,可能爲空,含義同{@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}的參數。 */
List<URL> lookup(URL url);
}
複製代碼
public interface Node {
/**
* 獲取節點Url
*/
URL getUrl();
/**
* 是否可用
*/
boolean isAvailable();
/**
* 銷燬節點
*/
void destroy();
}
複製代碼
public interface Registry extends Node, RegistryService {
}
複製代碼
// url地址分隔符,用於文件緩存,服務提供程序url分隔
private static final char URL_SEPARATOR = ' ';
// URL地址分隔的正則表達式,用於分析文件緩存中的服務提供程序URL列表
private static final String URL_SPLIT = "\\s+";
// 日誌輸出
protected final Logger logger = LoggerFactory.getLogger(getClass());
// 本地磁盤緩存,其中的特殊key爲registies是記錄註冊表中心列表,其餘是服務提供者的李彪
private final Properties properties = new Properties();
// 文件緩存寫入執行器 提供一個線程的線程池
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// 是否同步保存文件標誌
private final boolean syncSaveFile;
// 這個是緩存的版本號
private final AtomicLong lastCacheChanged = new AtomicLong();
// 這個是已經註冊的URL集合,不只僅是服務提供者的,也能夠是服務消費者的
private final Set<URL> registered = new ConcurrentHashSet<URL>();
// 已訂閱的url 值爲url的監聽器集合
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 消費者或服務治理服務獲取註冊信息後的緩存對象
// 內存中服務器緩存的notified對象是ConcurrentHashMap裏面嵌套了一個Map,
// 外層Map的Key是消費者的URL,
// 內層的Map的key是分類,包括provider,consumer,routes,configurators四種,
// value則對應服務列表,沒有服務提供者提供服務的URL,會以一個特別的empty://前綴開頭
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
// 註冊中心的URL
private URL registryUrl;
// 本地磁盤緩存文件保存的是註冊中心的數據
private File file;
複製代碼
public AbstractRegistry(URL url) {
// 設置註冊中心的地址URL
setUrl(url);
// 從URL參數中獲取是否同步保存的狀態,URL中若是不包含,那就設置默認值爲false
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 獲取文件路徑
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
// 開始讀入文件
File file = null;
// 不存在就拋出異常
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
// 把文件對象放到屬性上
this.file = file;
// 加載文件中的參數放入Properties,Properties繼承HashTable。
loadProperties();
// 通知監聽器 URL變化 見下面notify的源碼
notify(url.getBackupUrls());
}
複製代碼
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
// 把文件中的key-value讀進來
in = new FileInputStream(file);
// Properties是一個繼承HashTable的類.
// 這個地方就是按行讀入,util裏面的類,裏面調用了一個load0 方法會把key和value作分割而後放入Properties中,。
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry store file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry store file " + file, e);
} finally {
// 關閉流
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
複製代碼
@Override
public List<URL> lookup(URL url) {
// 查找的結果數據
List<URL> result = new ArrayList<URL>();
// 獲取註冊信息中的分類服務列表信息
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
// 若是該消費者訂閱了服務
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
// 把非空的加入結果集中
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
} else {
// 若是沒有訂閱服務
// 使用原子類以保證在獲取註冊在註冊中心的服務url時可以保證是最新的url集合
final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
// 通知監聽器。當收到服務變動通知時觸發
NotifyListener listener = new NotifyListener() {
@Override
public void notify(List<URL> urls) {
reference.set(urls);
}
};
// 添加這個服務的監聽器
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
// 而後把非空結果放入結果集中
if (urls != null && !urls.isEmpty()) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
}
return result;
}
複製代碼
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
registered.add(url);
}
@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
複製代碼
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
// 遍歷已訂閱的URL
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
// 通知URL對應的監聽器
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
// 通知監聽器,看下方代碼註釋
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
複製代碼
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// 將url進行分類
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
// 根據不一樣的category分別放到不一樣List中處理 以category的值作分類
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
// 沒有分類結果就直接return
if (result.size() == 0) {
return;
}
// 得到消費者被通知的url的Map
Map<String, List<URL>> categoryNotified = notified.get(url);
// 若是沒有 就建立一個
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
// 建立事後再獲取
categoryNotified = notified.get(url);
}
// 發送URL變化給監聽器
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 把分類標實和分類後的列表放入notified的value中 覆蓋到 `notified`
// 當分類的數據爲空,依然有urls 。不過其中的urls[0].protocol是empty,以此來處理全部服務提供者爲空時的狀況。
categoryNotified.put(category, categoryList);
// 保存一份到文件緩存中 中間作的 就是解析出參數而後同步或者異步保存到文件中
saveProperties(url);
// 通知監聽器
listener.notify(categoryList);
}
}
複製代碼
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// 得到url已經訂閱的服務的監聽器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
// 而後把listener添加到上
listeners.add(listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
// 得到url已經訂閱的服務的監聽器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
// 而後移除
listeners.remove(listener);
}
}
複製代碼
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
//調用的上面的註冊方法
register(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 調用上面的訂閱方法
subscribe(url, listener);
}
}
}
}
複製代碼
@Override
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 獲取以註冊的URL
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
// 取消註冊
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 獲取已訂閱的URL以及監聽器
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
// 去取消訂閱
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
複製代碼
// Scheduled executor service
// 通過固定時間後(默認是5s),調用FailbackRegistry#retry方法
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// 失敗重試計時器,按期檢查是否有失敗請求,若是有,則無限制重試
private final ScheduledFuture<?> retryFuture;
// 註冊失敗的集合
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
// 取消註冊失敗的集合
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
// 發起訂閱失敗的監聽器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 取消訂閱失敗的監聽器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 通知失敗的URL集合
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
/** * The time in milliseconds the retryExecutor will wait * RetryExecutor將等待的時間(毫秒) */
private final int retryPeriod;
複製代碼
public FailbackRegistry(URL url) {
super(url);
// 獲取重試的時間 若是沒有就設置成默認的 DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 設置重試任務 裏面就是調用retry方法 見下方retry方法的解析
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
複製代碼
@Override
public void register(URL url) {
// 緩存等註冊操做 見AbstractRegistry
super.register(url);
// 在失敗集合中將這個移除
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// 向服務器發送註冊請求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 開啓了啓動時就檢測,直接拋異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 記錄失敗的url
failedRegistered.add(url);
}
}
複製代碼
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
// 將失敗的註冊請求記錄到失敗列表,按期重試
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
// 注意 這個是調用父類的
super.notify(url, listener, urls);
}
複製代碼
@Override
protected void recover() throws Exception {
// register
// 把已註冊的添加到失敗重試的列表中
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
// 添加到失敗重試註冊列表
failedRegistered.add(url);
}
}
// subscribe
// 把已訂閱的添加到失敗重試的列表中
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 添加到失敗重試訂閱列表
addFailedSubscribed(url, listener);
}
}
}
}
複製代碼
// Retry the failed actions
protected void retry() {
if (!failedRegistered.isEmpty()) {
// 不爲空就把他URL拿到
Set<URL> failed = new HashSet<URL>(failedRegistered);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry register " + failed);
}
try {
// 而後遍歷它 作對應的操做
for (URL url : failed) {
try {
// 作註冊操做
doRegister(url);
// 移除失敗集合中URL
failedRegistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
......
}
複製代碼
@Override
public void destroy() {
// 調用父類的方法
super.destroy();
try {
// 取消執行任務
retryFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod);
}
複製代碼
// ==== Template method ====
protected abstract void doRegister(URL url);
protected abstract void doUnregister(URL url);
protected abstract void doSubscribe(URL url, NotifyListener listener);
protected abstract void doUnsubscribe(URL url, NotifyListener listener);
複製代碼
@SPI("dubbo")
public interface RegistryFactory {
// 這個接口方法實際上就是獲取對註冊中心的鏈接,而後返回不一樣註冊中心的不一樣Regsitry的實現對象,
// 註解就是根據設置不一樣的protocol(協議)來選擇不一樣的實現,
// 好比Zookeeper,就會去使用Zookeeper的ZookeeperRegistryFactory,具體怎麼選擇,後續博客再寫
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
複製代碼
// 註冊中心獲取過程的鎖
private static final ReentrantLock LOCK = new ReentrantLock();
// 註冊中心Map<註冊地址,registry> 一個類的緩存。
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
複製代碼
/** * Get all registries * 獲取全部的registry對象 * @return all registries */
public static Collection<Registry> getRegistries() {
//獲得一個集合的鏡像,它的返回結果不可直接被改變,不然會報錯
return Collections.unmodifiableCollection(REGISTRIES.values());
}
複製代碼
/** * Close all created registries * 關閉全部已建立的registry對象 */
// TODO: 2017/8/30 to move somewhere else better
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
//對註冊中心關閉操做加鎖
LOCK.lock();
try {
// 遍歷全部的註冊中心的操做類,而後調用destroy來銷燬。
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
// 而後清除集合
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
複製代碼
@Override
public Registry getRegistry(URL url) {
// 經過URL來獲取到註冊中心的類型
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceStringWithoutResolving();
// 鎖定註冊中心訪問進程以確保註冊表的單個實例
LOCK.lock();
try {
// 經過key來拿到對應的註冊中心的操做類
Registry registry = REGISTRIES.get(key);
// 有就直接返回
if (registry != null) {
return registry;
}
// 沒有就建立對應的註冊中心操做類
registry = createRegistry(url);
// 若是建立失敗,報錯
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 建立成功就放到結合中
REGISTRIES.put(key, registry);
// 而後再返回
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
複製代碼
protected abstract Registry createRegistry(URL url);
複製代碼
// invoker對象
private Invoker<T> invoker;
// 原始的URL地址
private URL originUrl;
// 註冊中心的地址
private URL registryUrl;
// 消費者的地址
private URL consumerUrl;
// 註冊中心的Directory
private RegistryDirectory registryDirectory;
複製代碼
// invoker對象
private Invoker<T> invoker;
// 原始的URL地址
private URL originUrl;
// 註冊中心的地址
private URL registryUrl;
// 提供者的地址
private URL providerUrl;
// 是否註冊
private volatile boolean isReg;
複製代碼
// 服務提供者的Invokers集合
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
// 服務消費者的Invokers集合
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
複製代碼
@Override
public Status check() {
// 獲取全部的註冊中心的對象
Collection<Registry> registries = AbstractRegistryFactory.getRegistries();
if (registries.isEmpty()) {
return new Status(Status.Level.UNKNOWN);
}
Status.Level level = Status.Level.OK;
StringBuilder buf = new StringBuilder();
// 遍歷
for (Registry registry : registries) {
if (buf.length() > 0) {
buf.append(",");
}
// 把地址拼接到一塊兒
buf.append(registry.getUrl().getAddress());
// 若是註冊中心的某個節點不可用就把狀態設置成error
if (!registry.isAvailable()) {
level = Status.Level.ERROR;
buf.append("(disconnected)");
} else {
buf.append("(connected)");
}
}
// 而後返回價差的結果對象
return new Status(level, buf.toString());
}
複製代碼
不知道你們看到這裏有沒有忘記這張圖
模塊關係圖
全部的註冊中心實現FailbackRegistry 和 AbstractRegistryFactory來實現對應的功能。
那麼Zookeeper也是如此。Zookeeper主要就只有兩個類
在Dubbo框架啓動時會根據咱們所寫的服務相關的配置在註冊中心建立4個目錄,在providers和consumers目錄中分別存儲服務提供方、消費方元數據信息。包括:IP、端口、權重和應用名等數據。
目錄名稱 | 存儲值樣例 |
---|---|
/dubbo/service/providers | dubbo://192.168.1.1.20880/com.demo.DemoService?key=value&... |
/dubbo/service/consumers | dubbo://192.168.1.1.5002/com.demo.DemoService?key=value&... |
/dubbo/service/routers | condition://0.0.0.0/com.demo.DemoService?category=routers&key=value&... |
/dubbo/service/configurators | override://0.0.0.0/com.demo.DemoService?category=configurators&key=value&... |
<beans>
<!-- 適用於Zookeeper一個集羣有多個節點,多個IP和端口用逗號分隔-->
<dubbo:registry protocol="zookeeper" address="ip:port;ip:port">
<!-- 適用於Zookeeper多個集羣有多個節點,多個IP和端口用豎線分隔-->
<dubbo:registry protocol="zookeeper" address="ip:port|ip:port">
</beans>
複製代碼
// Zookeeper的默認端口號
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
// Dubbo在Zookeeper中註冊的默認根節點
private final static String DEFAULT_ROOT = "dubbo";
// 組的名稱 或者說是 根節點的值
private final String root;
// 服務集合
private final Set<String> anyServices = new ConcurrentHashSet<String>();
// zk節點的監聽器
// Dubbo底層封裝了2套Zookeeper API,因此經過ChildListener抽象了監聽器,
// 可是在實際調用時會經過createTargetChildListener轉爲對應框架的監聽器實現
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
// zk的客戶端, 對節點進行一些刪改等操做
private final ZookeeperClient zkClient;
複製代碼
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 調用FailbackRegistry的構造方法
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 獲取組名稱 並複製給root
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 鏈接上Zookeeper
zkClient = zookeeperTransporter.connect(url);
// 添加鏈接狀態監聽器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
// 重連恢復
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
複製代碼
// 發佈
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 取消發佈
@Override
protected void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
複製代碼
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 訂閱全部數據
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
// 爲空則把listeners放入到緩存的Map中
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 建立子節點監聽器,對root下的子節點作監聽,一旦有子節點發生改變,
// 那麼就對這個節點進行訂閱.
if (zkListener == null) {
// zkListener爲空說明是第一次拉取,則新建一個listener
listeners.putIfAbsent(listener, new ChildListener() {
// 節點變動時,觸發通知時執行
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
// 遍歷全部節點
child = URL.decode(child);
// 若是有子節點還未被訂閱賊說明是新節點,
if (!anyServices.contains(child)) {
// 加入到集合中
anyServices.add(child);
//就訂閱之
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
// 建立持久節點root,接下來訂閱持久節點的子節點
zkClient.create(root, false);
// 添加root節點的子節點監聽器,並返回當前的services
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
// 遍歷全部的子節點進行訂閱
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 增長當前節點的訂閱,而且會返回改節點下全部子節點的列表
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
// 訂閱類別服務
} else {
List<URL> urls = new ArrayList<URL>();
// 將url轉變成
// /dubbo/com.demo.DemoService/providers
// /dubbo/com.demo.DemoService/configurators
// /dubbo/com.demo.DemoService/routers
// 根據url類別獲取一組要訂閱的路徑
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
// 若是緩存沒有,則添加到緩存中
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 一樣若是監聽器緩存中沒有 則放入緩存
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 通知節點變化
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 訂閱並返回該節點下的子路徑並緩存
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 有子節點組裝,沒有那麼就將消費者的協議變成empty做爲url。
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 回調NotifyListener,更新本地緩存信息
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
複製代碼
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
// 經過url把監聽器所有拿到
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
// 直接刪除group下全部的
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 移除監聽器
zkClient.removeChildListener(root, zkListener);
} else {
// 移除類別服務下的監聽器
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
複製代碼
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
複製代碼
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
複製代碼
上面我提到過,Dubbo用Zookeeper的時候用了兩種方式實現,一個是Apache Curator,另外一個是zkClient,這個類就是作看了一個轉換。以下圖
兩個類都實現了該接口來向外提供統一的ZookeeperClient。
這個實如今remoting模塊。暫時就不講了。