Dubbo的Provider,Consumer在啓動時都會建立一個註冊中心,註冊中心能夠選擇Zookeeper,Redis。經常使用的是Zookeeperjava
Dubbo裏默認使用zkclient來操做zookeeper服務器,其對zookeeper原始客戶單作了必定的封裝,操做zookeeper時能便捷一些,好比不須要手動處理session超時,不須要重複註冊watcher等等。node
Dubbo在Zookeeper上註冊的節點目錄:數組
假設接口名稱是:com.bob.dubbo.service.CityDubboService緩存
Dubbo啓動時,Consumer和Provider都會把自身的URL格式化爲字符串,而後註冊到zookeeper相應節點下,做爲一個臨時節點,當連斷開時,節點被刪除。服務器
Consumer在啓動時,不單單會註冊自身到 …/consumers/目錄下,同時還會訂閱…/providers目錄,實時獲取其上Provider的URL字符串信息。session
下面咱們就看相關的代碼實現:ide
public class ZookeeperRegistry extends FailbackRegistry { ...... /** * 默認端口 */ private final static int DEFAULT_ZOOKEEPER_PORT = 2181; /** * 默認 Zookeeper 根節點 */ private final static String DEFAULT_ROOT = "dubbo"; /** * Zookeeper 根節點 */ private final String root; /** * Service 接口全名集合 */ private final Set<String> anyServices = new ConcurrentHashSet<String>(); /** * 監聽器集合 */ private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>(); /** * Zookeeper 客戶端 */ private final ZookeeperClient zkClient; public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); // 調用父類FailbackRegistry的構造函數 if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 得到 Zookeeper 根節點, 未指定 "group" 參數時爲 dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group` 參數值 if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; // root = "/dubbo" // 建立 Zookeeper Client zkClient = zookeeperTransporter.connect(url); // 添加 StateListener 對象。該監聽器,在重連時,調用恢復方法。 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } }
public abstract class FailbackRegistry extends AbstractRegistry { /** * 發起註冊失敗的 URL 集合 */ private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); /** * 取消註冊失敗的 URL 集合 */ private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); /** * 發起訂閱失敗的監聽器集合 */ private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); /** * 取消訂閱失敗的監聽器集合 */ private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); /** * 通知通知的 URL 集合 */ private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); public FailbackRegistry(URL url) { super(url); // 重試頻率,單位:毫秒 ,默認 5*1000 int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 建立失敗重試定時器 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // Check and connect to the registry try { retry(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); } /** * 重試 */ // Retry the failed actions protected void retry() { // 重試執行註冊 if (!failedRegistered.isEmpty()) { ...... for (URL url : failed) { try { // 執行註冊 doRegister(url); // 移除出 `failedRegistered` failedRegistered.remove(url); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); } } } // 重試執行取消註冊 if (!failedUnregistered.isEmpty()) { ...... for (URL url : failed) { try { // 執行取消註冊 doUnregister(url); // 移除出 `failedUnregistered` failedUnregistered.remove(url); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); } } } // 重試執行訂閱 if (!failedSubscribed.isEmpty()) { ...... for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { URL url = entry.getKey(); Set<NotifyListener> listeners = entry.getValue(); for (NotifyListener listener : listeners) { try { // 執行訂閱 doSubscribe(url, listener); // 移除監聽器 listeners.remove(listener); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); } } } } // 重試執行取消訂閱 if (!failedUnsubscribed.isEmpty()) { ...... for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { URL url = entry.getKey(); Set<NotifyListener> listeners = entry.getValue(); for (NotifyListener listener : listeners) { try { // 執行取消訂閱 doUnsubscribe(url, listener); // 移除監聽器 listeners.remove(listener); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); } } } } } }
ZookeeperRegistry 在實例化時,調用父類構造函數。在父類構造函數中,會建立一個定時任務,每隔5S執行retry( ) 方法。函數
在retry( ) 方法中,重試那些失敗的動做。重試的動做包括:this
Provider向zookeeper註冊自身的url,生成一個臨時的znode
Provider從Dubbo容器中退出,中止提供RPC調用。也就是移除zookeeper內自身url對應的znode
Consumer訂閱 " /dubbo/…Service/providers" 目錄的子節點,生成ChildListener
Consumer從Dubbo容器中退出,移除以前建立的ChildListenerurl
爲何如此設置? 主要是和zookeeper的通訊機制有關的。當zookeeper的Client和Server鏈接斷開,或者心跳超時,那麼Server會將相應Client註冊的臨時節點刪除,固然註冊的Listener也相應刪除。
而Provider和Consumer註冊的URL就屬於臨時節點,當鏈接斷開時,Dubbo註冊了zookeeper的StateListener,也就是狀態監聽器,當Dubbo裏的zookeeper Client和Server從新鏈接上時,將以前註冊的的URL添加入這幾個失敗集合中,而後從新註冊和訂閱。
看ZookeeperRegistry 的構造函數,其添加了一個StateListener:
public class ZookeeperRegistry extends FailbackRegistry { public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { ...... // 添加 StateListener 對象。該監聽器,在重連時,調用恢復方法。 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } }
public abstract class FailbackRegistry extends AbstractRegistry { protected void recover() throws Exception { // register 恢復註冊,添加到 `failedRegistered` ,定時重試 Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { failedRegistered.add(url); } } // subscribe 恢復訂閱,添加到 `failedSubscribed` ,定時重試 Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { addFailedSubscribed(url, listener); } } } } }
ZookeeperRegistry 構造函數中爲zookeeper的操做客戶端添加了一個狀態監聽器 StateListener,當從新鏈接時( 從新鏈接意味着以前鏈接斷開了 ),將已經註冊和訂閱的URL添加到失敗集合中,定時重試,也就是從新註冊和訂閱。
zookeeper Client與Server斷開鏈接後,會定時的不斷嘗試從新鏈接,當鏈接成功後就會觸發一個Event,Dubbo註冊了CONNECTED狀態的監聽器,當鏈接成功後從新註冊和訂閱。
zookeeper Server宕機了,Dubbo裏的Client並無對此事件作什麼響應,固然其內部的zkClient會不停地嘗試鏈接Server。當Zookeeper Server宕機了不影響Dubbo裏已註冊的組件的RPC調用,由於已經經過URL生成了Invoker對象,這些對象還在Dubbo容器內。固然由於註冊中心宕機了,確定不能感知到新的Provider。同時由於在以前訂閱得到的Provider信息已經持久化到本地文件,當Dubbo應用重啓時,若是zookeeper註冊中心不可用,會加載緩存在文件內的Provider信息,仍是能保證服務的高可用。
Consumer會一直維持着對Provider的ChildListener,監聽Provider的實時數據信息。當Providers節點的子節點發生變化時,實時通知Dubbo,更新URL,同時更新Dubbo容器內的Consumer Invoker對象,只要是訂閱成功均會實時同步Provider,更新Invoker對象,不管是第一次訂閱仍是斷線重連後的訂閱:
public class ZookeeperRegistry extends FailbackRegistry { protected void doSubscribe(final URL url, final NotifyListener listener) { try { // 處理全部 Service 層的發起訂閱,例如監控中心的訂閱 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { ...... // 處理指定 Service 層的發起訂閱,例如服務消費者的訂閱 } else { // 子節點數據數組 List<URL> urls = new ArrayList<URL>(); // 循環分類數組 , router, configurator, provider for (String path : toCategoriesPath(url)) { // 得到 url 對應的監聽器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { // 不存在,進行建立 zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } // 得到 ChildListener 對象 ChildListener zkListener = listeners.get(listener); if (zkListener == null) { // 不存在子目錄的監聽器,進行建立 ChildListener 對象 // 訂閱父級目錄, 當有子節點發生變化時,觸發此回調函數 listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { // 變動時,調用 `#notify(...)` 方法,回調 NotifyListener ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } // 建立 Type 節點。該節點爲持久節點。 zkClient.create(path, false); // 向 Zookeeper ,PATH 節點,發起訂閱,返回此節點下的全部子元素 path : /根節點/接口全名/providers, 好比 : /dubbo/com.bob.service.CityService/providers List<String> children = zkClient.addChildListener(path, zkListener); // 添加到 `urls` 中 if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 首次全量數據獲取完成時,調用 `#notify(...)` 方法,回調 NotifyListener, 在這一步從鏈接Provider,實例化Invoker notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } }
訂閱獲取Providers的最新URL字符串,調用notify(…)方法,通知監聽器,最終會執行以下代碼:
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { private volatile List<Configurator> configurators; private volatile Map<String, Invoker<T>> urlInvokerMap; private volatile Map<String, List<Invoker<T>>> methodInvokerMap; private volatile Set<URL> cachedInvokerUrls; private void refreshInvoker(List<URL> invokerUrls) { // 從zookeeper獲取到的url已經沒有合適的了,在訂閱返回爲空時,會手動生成一個 EMPTY_PROTOCOL 的 url if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.methodInvokerMap = null; // Set the method invoker map to null destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map // state change // If the calculation is wrong, it is not processed. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException( "urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } }
更新Dubbo內的Invoker相關數據,保證Consumer能實時感知到Provider的信息,保證PRC調用不會出錯。
以上就是Dubbo內Zookeeper註冊中心的實現過程。
Provider和Consumer向Zookeeper註冊臨時節點,當鏈接斷開時刪除相應的註冊節點。 Consumer訂閱Providers節點的子節點,實時感知Provider的變化狀況,實時同步自身的Invoker對象,保證RPC的可用性。