目標:解釋覺得zookeeper實現的註冊中心原理,解讀duubo-registry-zookeeper的源碼
這篇文章是講解註冊中心的最後一篇文章。這篇文章講的是dubbo的註冊中心用zookeeper來實現。這種實現註冊中心的方法也是dubbo推薦的方法。爲了能更加理解zookeeper在dubbo中的應用,接下來我先簡單的介紹一下zookeeper。html
由於dubbo是一個分佈式的RPC開源框架,各個服務之間單獨部署,就會出現資源之間不一致的問題。而zookeeper就有保證分佈式一致性的特性。ZooKeeper是一種爲分佈式應用所設計的高可用、高性能且一致的開源協調服務。關於dubbo爲何會推薦使用zookeeper做爲它的註冊中心實現,有不少書籍以及博客講解了zookeeper的特性以及優點,這不是本章的重點,我要講的是zookeeper的數據結構,dubbo服務是如何被zookeeper的數據結構存儲管理的,由於這影響到下面源碼的解讀。zookeeper採用的是樹形結構來組織數據節點,它相似於一個標準的文件系統。先來看看下面這張圖:java
該圖是官方文檔裏面的一張圖,展現了dubbo在zookeeper中存儲的形式以及節點層級,node
zookeeper以每一個斜槓來分割每一層的znode,好比第一層根節點dubbo就是「/dubbo」,而第二層的Service層就是/com.foo.Barservice,zookeeper的每一個節點經過路徑來表示以及訪問,例如服務提供者啓動時,向/dubbo/com.foo.Barservice/providers目錄下寫入本身的URL地址。關於流程調用說明,見官方文檔:git
文檔地址: http://dubbo.apache.org/zh-cn...
瞭解了dubbo在zookeeper中的節點層級,就能夠看相關的源碼了,下圖是包的結構:github
跟前面三種實現方式同樣的目錄,也就兩個類,看起來很是的舒服,接下來就來解析這兩個類。apache
該類繼承了FailbackRegistry類,該類就是針對註冊中心核心的功能註冊、訂閱、取消註冊、取消訂閱,查詢註冊列表進行展開,基於zookeeper來實現。數組
// 日誌記錄 private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class); // 默認的zookeeper端口 private final static int DEFAULT_ZOOKEEPER_PORT = 2181; // 默認zookeeper根節點 private final static String DEFAULT_ROOT = "dubbo"; // zookeeper根節點 private final String root; // 服務接口集合 private final Set<String> anyServices = new ConcurrentHashSet<String>(); // 監聽器集合 private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>(); // zookeeper客戶端實例 private final ZookeeperClient zkClient;
其實你會發現zookeeper雖然是最被推薦的,反而它的實現邏輯相對簡單,由於調用了zookeeper服務組件,不少的邏輯不須要在dubbo中本身去實現。上面的屬性介紹也很簡單,不須要多說,更多的是調用zookeeper客戶端。微信
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 得到url攜帶的分組配置,而且做爲zookeeper的根節點 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; // 建立zookeeper client zkClient = zookeeperTransporter.connect(url); // 添加狀態監聽器,當狀態爲重連的時候調用恢復方法 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { // 恢復 recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
這裏有如下幾個關注點:數據結構
static String appendDefaultPort(String address) { if (address != null && address.length() > 0) { int i = address.indexOf(':'); // 若是地址自己沒有端口,則使用默認端口2181 if (i < 0) { return address + ":" + DEFAULT_ZOOKEEPER_PORT; } else if (Integer.parseInt(address.substring(i + 1)) == 0) { return address.substring(0, i + 1) + DEFAULT_ZOOKEEPER_PORT; } } return address; }
該方法是拼接使用默認的zookeeper端口,就是方地址自己沒有端口的時候才使用默認端口。app
@Override public boolean isAvailable() { return zkClient.isConnected(); } @Override public void destroy() { super.destroy(); try { zkClient.close(); } catch (Exception e) { logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e); } }
這裏兩個方法分別是檢測zookeeper是否鏈接以及銷燬鏈接,很簡單,都是調用了zookeeper客戶端封裝好的方法。
@Override protected void doRegister(URL url) { try { // 建立URL節點,也就是URL層的節點 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override protected void doUnregister(URL url) { try { // 刪除節點 zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
這兩個方法分別是註冊和取消註冊,也很簡單,調用都是客戶端create和delete方法,一個是建立一個節點,另外一個是刪除節點,該操做都在URL層。
@Override protected void doSubscribe(final URL url, final NotifyListener listener) { try { // 處理全部Service層發起的訂閱,例如監控中心的訂閱 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { // 得到根目錄 String root = toRootPath(); // 得到url對應的監聽器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); // 不存在就建立監聽器集合 if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } // 得到節點監聽器 ChildListener zkListener = listeners.get(listener); // 若是該節點監聽器爲空,則建立 if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { // 遍歷現有的節點,若是現有的服務集合中沒有該節點,則加入該節點,而後訂閱該節點 for (String child : currentChilds) { // 解碼 child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); // 從新獲取,爲了保證一致性 zkListener = listeners.get(listener); } // 建立service節點,該節點爲持久節點 zkClient.create(root, false); // 向zookeeper的service節點發起訂閱,得到Service接口全名數組 List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { // 遍歷Service接口全名數組 for (String service : services) { service = URL.decode(service); anyServices.add(service); // 發起該service層的訂閱 subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { // 處理指定 Service 層的發起訂閱,例如服務消費者的訂閱 List<URL> urls = new ArrayList<URL>(); // 遍歷分類數組 for (String path : toCategoriesPath(url)) { // 得到監聽器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); // 若是沒有則建立 if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } // 得到節點監聽器 ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { // 通知服務變化 回調NotifyListener ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); // 從新獲取節點監聽器,保證一致性 zkListener = listeners.get(listener); } // 建立type節點,該節點爲持久節點 zkClient.create(path, false); // 向zookeeper的type節點發起訂閱 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { // 加入到自子節點數據數組 urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 通知數據變化 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
這個方法是訂閱,邏輯實現比較多,能夠分兩段來看,這裏的實現把全部Service層發起的訂閱以及指定的Service層發起的訂閱分開處理。全部Service層相似於監控中心發起的訂閱。指定的Service層發起的訂閱能夠看做是服務消費者的訂閱。訂閱的大體邏輯相似,不過仍是有幾個區別:
@Override protected void doUnsubscribe(URL url, NotifyListener listener) { // 得到監聽器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners != null) { // 得到子節點的監聽器 ChildListener zkListener = listeners.get(listener); if (zkListener != null) { // 若是爲所有的服務接口,例如監控中心 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { // 得到根目錄 String root = toRootPath(); // 移除監聽器 zkClient.removeChildListener(root, zkListener); } else { // 遍歷分類數組進行移除監聽器 for (String path : toCategoriesPath(url)) { zkClient.removeChildListener(path, zkListener); } } } } }
該方法是取消訂閱,也是分爲兩種狀況,全部的Service發起的取消訂閱仍是指定的Service發起的取消訂閱。能夠看到全部的Service發起的取消訂閱就直接移除了根目錄下全部的監聽器,而指定的Service發起的取消訂閱是移除了該Service層下面的全部Type節點監聽器。若是不太明白再回去看看前面的那個節點層級圖。
@Override public List<URL> lookup(URL url) { if (url == null) { throw new IllegalArgumentException("lookup url == null"); } try { List<String> providers = new ArrayList<String>(); // 遍歷分組類別 for (String path : toCategoriesPath(url)) { // 得到子節點 List<String> children = zkClient.getChildren(path); if (children != null) { providers.addAll(children); } } // 得到 providers 中,和 consumer 匹配的 URL 數組 return toUrlsWithoutEmpty(url, providers); } catch (Throwable e) { throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
該方法就是查詢符合條件的已經註冊的服務。調用了toUrlsWithoutEmpty方法,在後面會講到。
private String toServicePath(URL url) { String name = url.getServiceInterface(); // 若是是包括全部服務,則返回根節點 if (Constants.ANY_VALUE.equals(name)) { return toRootPath(); } return toRootDir() + URL.encode(name); }
該方法是得到服務路徑,拼接規則:Root + Type。
private String[] toCategoriesPath(URL url) { String[] categories; // 若是url攜帶的分類配置爲*,則建立包括全部分類的數組 if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) { categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY, Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY}; } else { // 返回url攜帶的分類配置 categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY}); } String[] paths = new String[categories.length]; for (int i = 0; i < categories.length; i++) { // 加上服務路徑 paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i]; } return paths; } private String toCategoryPath(URL url) { return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); }
第一個方法是得到分類數組,也就是url攜帶的服務下的全部Type節點數組。第二個是得到分類路徑,分類路徑拼接規則:Root + Service + Type
private String toUrlPath(URL url) { return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); }
該方法是得到URL路徑,拼接規則是Root + Service + Type + URL
private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) { List<URL> urls = new ArrayList<URL>(); if (providers != null && !providers.isEmpty()) { // 遍歷服務提供者 for (String provider : providers) { // 解碼 provider = URL.decode(provider); if (provider.contains("://")) { // 把服務轉化成url的形式 URL url = URL.valueOf(provider); // 判斷是否匹配,若是匹配, 則加入到集合中 if (UrlUtils.isMatch(consumer, url)) { urls.add(url); } } } } return urls; } private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) { // 返回和服務消費者匹配的服務提供者url List<URL> urls = toUrlsWithoutEmpty(consumer, providers); // 若是不存在,則建立`empty://` 的 URL返回 if (urls == null || urls.isEmpty()) { int i = path.lastIndexOf('/'); String category = i < 0 ? path : path.substring(i + 1); URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category); urls.add(empty); } return urls; }
第一個toUrlsWithoutEmpty方法是得到 providers 中,和 consumer 匹配的 URL 數組,第二個toUrlsWithEmpty方法是調用了第一個方法後增長了若不存在匹配,則建立 empty://
的 URL返回。經過這樣的方式,能夠處理相似服務提供者爲空的狀況。
該類繼承了AbstractRegistryFactory類,實現了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原代碼:
public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } @Override public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } }
能夠看到就是實例化了ZookeeperRegistry而已,全部這裏就不解釋了。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了dubbo利用zookeeper來實現註冊中心,其中關鍵的是須要弄明白dubbo在zookeeper中存儲的節點層級意義,也就是root層、service層、type層以及url層分別表明什麼,其餘的邏輯並不複雜大多數調用了zookeeper客戶端的能力,有興趣的同窗也能夠深刻的去了解zookeeper。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。