1.註冊中心的做用
利用註冊中心,服務提供者能夠動態添加刪除服務,服務消費者在收到更新通知後,能夠拉取最新的服務從而實現同步。能夠在註冊中心實現統一配置,參數的動態調整能夠自動通知到全部服務節點。redis
2.Dubbo四種註冊中心實現
Dubbo註冊中心的實如今dubbo-registry模塊。apache
2.1 ZooKeeper
基於Zookeeper。ZooKeeper學習segmentfault
2.1.1 Zookeeper註冊中心數據結構
2.1.2 Zookeeper註冊中心實現原理
Zookeeper註冊中心採用"事件通知" + 「客戶端拉取」的實現方式:緩存
2.1.3 Zookeeper註冊和取消註冊
//註冊 protected void doRegister(URL url) { try { //建立目錄 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } //取消註冊 protected void doUnregister(URL url) { try { //刪除目錄 zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } private String toUrlPath(URL url) { return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); } //返回"/dubbo/接口名稱/providers" private String toCategoryPath(URL url) { return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); } //返回"/dubbo/接口名稱" private String toServicePath(URL url) { String name = url.getServiceInterface(); if (Constants.ANY_VALUE.equals(name)) { return toRootPath(); } return toRootDir() + URL.encode(name); } //返回"/"或者"/dubbo/" private String toRootDir() { if (root.equals(Constants.PATH_SEPARATOR)) { return root; } return root + Constants.PATH_SEPARATOR; } private String toRootPath() { return root; } private final static String DEFAULT_ROOT = "dubbo"; private final ZookeeperClient zkClient; public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { //省略n行代碼 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); //添加前綴"/" if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); //省略n行代碼 }
ZookeeperClient和ZookeeperTransporter都是接口,在Dubbo中有兩種實現,一種是基於curator客戶端的CuratorZookeeperClient和CuratorZookeeperTransporter;另外一種是基於zkclient客戶端的ZkclientZookeeperClient和ZkclientZookeeperTransporter。默認實現是curator。網絡
@SPI("curator") public interface ZookeeperTransporter { @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) ZookeeperClient connect(URL url); }
在不能根據Constants.CLIENT_KEY和Constants.TRANSPORTER_KEY找到匹配的URL的時候,就會使用默認的Curator實現。數據結構
2.1.4 Zookeeper訂閱的實現
先看看ConcurrentMap的put和putIfAbsent的區別異步
public static final String ANY_VALUE = "*"; public static final String INTERFACE_KEY = "interface"; public static final String CHECK_KEY = "check"; protected void doSubscribe(final URL url, final NotifyListener listener) { try { //當接口名稱是*的時候表示訂閱所有Service if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { //默認值是"/dubbo" String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { //listeners爲空,說明緩存中沒有 zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { //zkListener爲空,說明是第一次調用 listeners.putIfAbsent(listener, new ChildListener() { //這個內部方法不會當即執行,會在觸發變動通知時執行 @Override public void childChanged(String parentPath, List<String> currentChilds) { //遍歷全部子節點 for (String child : currentChilds) { child = URL.decode(child); //若是有子節點沒有訂閱,就進行訂閱。 if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); zkListener = listeners.get(listener); } //建立持久節點 zkClient.create(root, false); //遍歷訂閱持久節點的直接子節點 List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
private String[] toCategoriesPath(URL url) { String[] categories; //若是類別是"*",訂閱四種類型的路徑(providers,consumers,routers,configurations) if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) { categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY, Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY}; } else { //默認是providers categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY}); } String[] paths = new String[categories.length]; for (int i = 0; i < categories.length; i++) { paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i]; } return paths; }
2.2 Redis
基於Redis。Redis學習ide
2.2.1 Redis註冊中心數據結構
2.2.2 Redis續期Key和清理Key
org.apache.dubbo.registry.redis.RedisRegistry.deferExpired()定義了Redis刷新key的過時時間和清除過時的key。服務提供者發佈一個服務,會先在Redis中建立一個Key,而後發佈register事件,在RedisRegistry的構造方法中啓動expireExecutor線程池週期性調用deferExpired方法刷新服務的過時時間,函數
private volatile boolean admin = false; //獲取本地緩存全部已經註冊的key for (URL url : new HashSet<URL>(getRegistered())) { if (url.getParameter(Constants.DYNAMIC_KEY, true)) { String key = toCategoryPath(url); //刷新過時時間 if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { //廣播,從新發布 jedis.publish(key, Constants.REGISTER); } } } //若是是服務治理中心,須要清除過時的Key if (admin) { clean(jedis); }
org.apache.dubbo.common.Constants類定義常量。學習
public static final String DYNAMIC_KEY = "dynamic";
org.apache.dubbo.registry.support.AbstractRegistry.getRegistered()獲取緩存的key。
private final Set<URL> registered = new ConcurrentHashSet<URL>(); public Set<URL> getRegistered() { return registered; } public void register(URL url) { if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()) { logger.info("Register: " + url); } registered.add(url); }
org.apache.dubbo.registry.redis.RedisRegistry.clean()服務治理中心刪除過時的key。
private void clean(Jedis jedis) { Set<String> keys = jedis.keys(root + Constants.ANY_VALUE); if (keys != null && !keys.isEmpty()) { for (String key : keys) { //獲取jedis鏈接全部的key Map<String, String> values = jedis.hgetAll(key); if (values != null && values.size() > 0) { boolean delete = false; long now = System.currentTimeMillis(); for (Map.Entry<String, String> entry : values.entrySet()) { URL url = URL.valueOf(entry.getKey()); if (url.getParameter(Constants.DYNAMIC_KEY, true)) { long expire = Long.parseLong(entry.getValue()); //若是key的過時時間早於當前時間,說明過時了。 if (expire < now) { jedis.hdel(key, entry.getKey()); delete = true; if (logger.isWarnEnabled()) { logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now)); } } } } if (delete) { //廣播通知取消註冊哪一個Key。 jedis.publish(key, Constants.UNREGISTER); } } } } }
2.2.3 Redis註冊
(1)org.apache.dubbo.registry.redis.RedisRegistry.doRegister(URL url)
public void doRegister(URL url) { // /dubbo/interface/providers(or consumers or configurations or routes) String key = toCategoryPath(url); //URL String value = url.toFullString(); //計算過時時間 String expire = String.valueOf(System.currentTimeMillis() + expirePeriod); boolean success = false; RpcException exception = null; //遍歷鏈接池中的全部節點 for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { Jedis jedis = jedisPool.getResource(); try { //保存鍵值 jedis.hset(key, value, expire); //發佈 jedis.publish(key, Constants.REGISTER); success = true; //非replicate模式只須要寫入一個節點,不然寫入所有節點 if (!replicate) { break; // If the server side has synchronized data, just write a single machine } } finally { jedis.close(); } } catch (Throwable t) { exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); } else { throw exception; } } }
(2)org.apache.dubbo.registry.redis.RedisRegistry.toCategoryPath(URL url)
private String toCategoryPath(URL url) { //toServicePath(url) 獲得根節點(默認獲得"/dubbo/")和接口名稱 //Constants.PATH_SEPARATOR("/") //url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY)默認獲得"providers" return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); } //拼接根節點和接口名稱 private String toServicePath(URL url) { return root + url.getServiceInterface(); }
RedisRegistry的構造函數對root進行了賦值:
private final static String DEFAULT_ROOT = "dubbo"; public RedisRegistry(URL url) { ....省略N行代碼... String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } if (!group.endsWith(Constants.PATH_SEPARATOR)) { group = group + Constants.PATH_SEPARATOR; } this.root = group; ....省略N行代碼... }
再來看看org.apache.dubbo.common.URL.getServiceInterface()
public String getServiceInterface() { return getParameter(Constants.INTERFACE_KEY, path); } public String getParameter(String key, String defaultValue) { String value = getParameter(key); if (value == null || value.length() == 0) { return defaultValue; } return value; } public String getParameter(String key) { String value = parameters.get(key); if (value == null || value.length() == 0) { value = parameters.get(Constants.DEFAULT_KEY_PREFIX + key); } return value; }
在org.apache.dubbo.common.Constants類中定義了常量:
public final static String PATH_SEPARATOR = "/"; public static final String CATEGORY_KEY = "category"; public static final String DEFAULT_CATEGORY = PROVIDERS_CATEGORY; public static final String PROVIDERS_CATEGORY = "providers"; public static final String GROUP_KEY = "group"; public static final String DEFAULT_KEY_PREFIX = "default."; public static final String INTERFACE_KEY = "interface";
2.2.4 Redis訂閱
首次訂閱時,會建立一個內部線程類Notifier,在啓動Notifier時異步訂閱通道,同時主線程一次性拉取註冊中心的全部事務信息。Notifier訂閱的通道推送事件實現後續註冊中心的信息變動。
RedisRegistry的 Notifier線程的run()方法部分代碼以下:
if (service.endsWith(Constants.ANY_VALUE)) { //若是以*結尾 if (!first) { //若是訂閱過,獲取所有key,更新本地緩存 first = false; Set<String> keys = jedis.keys(service); if (keys != null && !keys.isEmpty()) { for (String s : keys) { doNotify(jedis, s); } } resetSkip();//重置失敗計數器 } //訂閱 jedis.psubscribe(new NotifySub(jedisPool), service); } else { if (!first) { first = false; doNotify(jedis, service); resetSkip(); } jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); //訂閱指定類別 }
2.3 Simple
基於內存的默認實現。標準RPC服務,不支持集羣,可能出現單點故障。
2.4 Multicast
經過廣播地址實現互相發現。
3.緩存機制
緩存機制的目的是以空間換時間,若是每次遠程調用都要從註冊中心拉取一遍可用的遠程服務列表,會給網絡形成很大的壓力。在AbstractRegistry實現了通用的緩存機制。Dubbo在內存存儲一份,保存在properties對象中,在磁盤文件保存一份,放在file對象中。
//本地磁盤緩存 private final Properties properties = new Properties(); private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>(); //本地磁盤緩存文件 private File file; private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { //讀取磁盤文件 in = new FileInputStream(file); //加載到properties對象 properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry store file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry store file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
org.apache.dubbo.registry.support.AbstractRegistry.saveProperties(URL url)
private final boolean syncSaveFile; if (syncSaveFile) { //同步保存 doSaveProperties(version); } else { //線程池異步保存 registryCacheExecutor.execute(new SaveProperties(version)); }
4.重試機制
org.apache.dubbo.registry.support.FailBackRegistry添加了retry()方法。