Dubbo分析Serialize層
Dubbo分析之Transport層
Dubbo分析之Exchange 層
Dubbo分析之Protocol層
Dubbo分析之Cluster層
Dubbo分析之Registry層git
緊接上文Dubbo分析之Cluster層,本文繼續分析dubbo的register層;此層封裝服務地址的註冊與發現,以服務URL爲中心,擴展接口爲RegistryFactory, Registry, RegistryService;github
接口定義以下:redis
public interface Registry extends Node, RegistryService { } public interface RegistryService { void register(URL url); void unregister(URL url); void subscribe(URL url, NotifyListener listener); void unsubscribe(URL url, NotifyListener listener); List<URL> lookup(URL url); }
主要提供了註冊(register),註銷(unregister),訂閱(subscribe),退訂(unsubscribe)等功能;dubbo提供了多種註冊方式分別是:Multicast ,Zookeeper,Redis以及Simple方式;
Multicast:Multicast註冊中心不須要啓動任何中心節點,只要廣播地址同樣,就能夠互相發現;
Zookeeper:Zookeeper是Apacahe Hadoop的子項目,是一個樹型的目錄服務,支持變動推送,適合做爲Dubbo服務的註冊中心,工業強度較高,可用於生產環境,並推薦使用;
Redis:基於Redis實現的註冊中心,使用 Redis的Publish/Subscribe事件通知數據變動;
Simple:Simple註冊中心自己就是一個普通的Dubbo服務,能夠減小第三方依賴,使總體通信方式一致;
後面重點介紹官方推薦的Zookeeper註冊方式;具體的Register是在RegistryFactory中生成的,具體看一下接口定義;session
接口定義以下:app
@SPI("dubbo") public interface RegistryFactory { @Adaptive({"protocol"}) Registry getRegistry(URL url); }
RegistryFactory提供了SPI擴展,默認使用dubbo,具體有哪些擴展能夠查看META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory:負載均衡
dubbo=com.alibaba.dubbo.registry.dubbo.DubboRegistryFactory multicast=com.alibaba.dubbo.registry.multicast.MulticastRegistryFactory zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory redis=com.alibaba.dubbo.registry.redis.RedisRegistryFactory
已推薦使用的Zookeeper爲實例,查看ZookeeperRegistryFactory,提供了createRegistry方法:async
private ZookeeperTransporter zookeeperTransporter; public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }
實例化ZookeeperRegistry,兩個參數分別是url和zookeeperTransporter,zookeeperTransporter是操做Zookeeper的客戶端組件包括:zkclient和curator兩種方式ide
@SPI("curator") public interface ZookeeperTransporter { @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) ZookeeperClient connect(URL url); }
ZookeeperTransporter一樣提供了SPI擴展,默認使用curator方式;接下來重點看一下Zookeeper註冊中心。oop
在dubbo的總體設計中,能夠大體查看Registry層的大體流程,首先經過RegistryFactory實例化Registry,Registry能夠接收RegistryProtocol傳過來的註冊(register)和訂閱(subscribe)消息,而後Registry經過ZKClient來向Zookeeper指定的目錄下寫入url信息,若是是訂閱消息Registry會經過NotifyListener來通知RegitryDirctory進行更新url,最後就是Cluster層經過路由,負載均衡選擇具體的提供方;ui
官方提供了dubbo在Zookeeper中心的流程圖:
流程說明:
服務提供者啓動時: 向/dubbo/com.foo.BarService/providers目錄下寫入本身的URL地址;
服務消費者啓動時: 訂閱/dubbo/com.foo.BarService/providers目錄下的提供者URL地址;並向/dubbo/com.foo.BarService/consumers目錄下寫入本身的URL地址;
監控中心啓動時: 訂閱/dubbo/com.foo.BarService 目錄下的全部提供者和消費者URL地址。
下面分別從註冊(register),註銷(unregister),訂閱(subscribe),退訂(unsubscribe)四個方面來分析
ZookeeperRegistry的父類FailbackRegistry中實現了register方法,FailbackRegistry從名字能夠看出來具備:失敗自動恢復,後臺記錄失敗請求,定時重發功能;下面具體看一下register方法:
public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // Sending a registration request to the server side doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly failedRegistered.add(url); } }
後臺記錄了失敗的請求,包括failedRegistered和failedUnregistered,註冊的時候將裏面存放的url刪除,而後執行doRegister方法,此方式在ZookeeperRegistry中實現,主要是在Zookeeper指定的目錄下寫入url信息,若是失敗會記錄註冊失敗的url,等待自動恢復;doRegister相關代碼以下:
protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
調用zkClient的create方法在Zookeeper上建立節點,默認建立臨時節點,create方法在AbstractZookeeperClient中實現,具體源碼以下:
public void create(String path, boolean ephemeral) { if (!ephemeral) { if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
path指定須要建立的目錄,ephemeral指定是不是建立臨時節點,而且提供了遞歸建立目錄,除了葉子目錄其餘目錄都是持久化的;能夠發現無論是建立臨時目錄仍是持久化目錄,都沒有指定目錄的Data,全部使用的是默認值,也就是本地ip地址;實例中建立的目錄以下:
/dubbo/com.dubboApi.DemoService/providers/dubbo%3A%2F%2F10.13.83.7%3A20880%2Fcom.dubboApi.DemoService%3Fanyhost%3Dtrue%26application%3Dhello-world-app%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.dubboApi.DemoService%26methods%3DsyncSayHello%2CsayHello%2CasyncSayHello%26pid%3D13252%26serialization%3Dprotobuf%26side%3Dprovider%26timestamp%3D1545297239027
dubbo是一個根節點,而後是service名稱,providers是固定的一個類型,若是是消費端這裏就是consumers,最後就是一個臨時節點;使用臨時節點的目的就是提供者出現斷電等異常停機時,註冊中心能自動刪除提供者信息;能夠經過以下方法查詢當前的目錄節點信息:
public class CuratorTest { static String path = "/dubbo"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); List<String> paths = listChildren(path); for (String path : paths) { Stat stat = new Stat(); System.err.println( "path:" + path + ",value:" + new String(client.getData().storingStatIn(stat).forPath(path))); } } private static List<String> listChildren(String path) throws Exception { List<String> pathList = new ArrayList<String>(); pathList.add(path); List<String> list = client.getChildren().forPath(path); if (list != null && list.size() > 0) { for (String cPath : list) { String temp = ""; if ("/".equals(path)) { temp = path + cPath; } else { temp = path + "/" + cPath; } pathList.addAll(listChildren(temp)); } } return pathList; } }
遞歸遍歷/dubbo目錄下的全部子目錄,同時將節點存儲的數據都查詢出來,結果以下:
path:/dubbo,value:10.13.83.7 path:/dubbo/com.dubboApi.DemoService,value:10.13.83.7 path:/dubbo/com.dubboApi.DemoService/configurators,value:10.13.83.7 path:/dubbo/com.dubboApi.DemoService/providers,value:10.13.83.7 path:/dubbo/com.dubboApi.DemoService/providers/dubbo%3A%2F%2F10.13.83.7%3A20880%2Fcom.dubboApi.DemoService%3Fanyhost%3Dtrue%26application%3Dhello-world-app%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.dubboApi.DemoService%26methods%3DsyncSayHello%2CsayHello%2CasyncSayHello%26pid%3D4712%26serialization%3Dprotobuf%26side%3Dprovider%26timestamp%3D1545358401966,value:10.13.83.7
除了最後一個節點是臨時節點,其餘都是持久化的;
一樣在父類FailbackRegistry中實現了unregister方法,代碼以下:
public void unregister(URL url) { super.unregister(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // Sending a cancellation request to the server side doUnregister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to uregister " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly failedUnregistered.add(url); } }
註銷時一樣刪除了failedRegistered和failedUnregistered存放的url,而後調用doUnregister,刪除Zookeeper中的目錄節點,失敗的狀況下會存儲在failedUnregistered中,等待重試;
protected void doUnregister(URL url) { try { zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } //CuratorZookeeperClient刪除操做 public void delete(String path) { try { client.delete().forPath(path); } catch (NoNodeException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
直接使用CuratorZookeeperClient中的delete方法刪除臨時節點;
服務消費者啓動時,會先向Zookeeper註冊消費者節點信息,而後訂閱…/providers目錄下提供者的URL地址;消費端也一樣須要註冊節點信息,是由於監控中心須要對服務端和消費端都進行監控;下面重點看一下訂閱的相關代碼,在父類FailbackRegistry中實現了subscribe方法:
public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // Sending a subscription request to the server side doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && !urls.isEmpty()) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // Record a failed registration request to a failed list, retry regularly addFailedSubscribed(url, listener); } }
相似的格式,一樣存儲了失敗了訂閱url信息,重點看ZookeeperRegistry中的doSubscribe方法:
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>(); protected void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
在ZookeeperRegistry中定義了一個zkListeners變量,每一個URL對應了一個map;map裏面分別是NotifyListener和ChildListener的對應關係,消費端訂閱時這裏的NotifyListener其實就是RegistryDirectory,ChildListener是一個內部類,用來在監聽的節點發生變動時,通知對應的消費端,具體的監聽處理是在zkClient.addChildListener中實現的:
public List<String> addChildListener(String path, final ChildListener listener) { ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path); if (listeners == null) { childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>()); listeners = childListeners.get(path); } TargetChildListener targetListener = listeners.get(listener); if (targetListener == null) { listeners.putIfAbsent(listener, createTargetChildListener(path, listener)); targetListener = listeners.get(listener); } return addTargetChildListener(path, targetListener); } public CuratorWatcher createTargetChildListener(String path, ChildListener listener) { return new CuratorWatcherImpl(listener); } public List<String> addTargetChildListener(String path, CuratorWatcher listener) { try { return client.getChildren().usingWatcher(listener).forPath(path); } catch (NoNodeException e) { return null; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } private class CuratorWatcherImpl implements CuratorWatcher { private volatile ChildListener listener; public CuratorWatcherImpl(ChildListener listener) { this.listener = listener; } public void unwatch() { this.listener = null; } @Override public void process(WatchedEvent event) throws Exception { if (listener != null) { String path = event.getPath() == null ? "" : event.getPath(); listener.childChanged(path, StringUtils.isNotEmpty(path) ? client.getChildren().usingWatcher(this).forPath(path) : Collections.<String>emptyList()); } } }
CuratorWatcherImpl實現了Zookeeper的監聽接口CuratorWatcher,用來在節點有變動時通知對應的ChildListener,這樣ChildListener就能夠通知RegistryDirectory進行更新數據;
在父類FailbackRegistry中實現了unsubscribe方法
public void unsubscribe(URL url, NotifyListener listener) { super.unsubscribe(url, listener); removeFailedSubscribed(url, listener); try { // Sending a canceling subscription request to the server side doUnsubscribe(url, listener); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly Set<NotifyListener> listeners = failedUnsubscribed.get(url); if (listeners == null) { failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); listeners = failedUnsubscribed.get(url); } listeners.add(listener); } }
一樣使用failedUnsubscribed用來存儲失敗退訂的url,具體看ZookeeperRegistry中的doUnsubscribe方法
protected void doUnsubscribe(URL url, NotifyListener listener) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners != null) { ChildListener zkListener = listeners.get(listener); if (zkListener != null) { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); zkClient.removeChildListener(root, zkListener); } else { for (String path : toCategoriesPath(url)) { zkClient.removeChildListener(path, zkListener); } } } } }
退訂就比較簡單了,只須要移除監聽器就能夠了;
FailbackRegistry從名字能夠看出來具備:失敗自動恢復,後臺記錄失敗請求,定時重發功能;在FailbackRegistry的構造器中啓動了一個定時器:
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // Check and connect to the registry try { retry(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
實例化了一個間隔5秒執行一次重試的定時器,retry部分代碼以下:
protected void retry() { if (!failedRegistered.isEmpty()) { Set<URL> failed = new HashSet<URL>(failedRegistered); if (failed.size() > 0) { if (logger.isInfoEnabled()) { logger.info("Retry register " + failed); } try { for (URL url : failed) { try { doRegister(url); failedRegistered.remove(url); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); } } } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); } } } ...省略... }
按期檢查是否存在失敗的註冊(register),註銷(unregister),訂閱(subscribe),退訂(unsubscribe)URL,若是存在則重試;
本文首先介紹了RegistryFactory, Registry, RegistryService幾個核心接口,而後以Zookeeper爲註冊中心重點介紹了註冊(register),註銷(unregister),訂閱(subscribe),退訂(unsubscribe)方式。
https://github.com/ksfzhaohui...
https://gitee.com/OutOfMemory...