Dubbo註冊中心

1.註冊中心的做用

利用註冊中心,服務提供者能夠動態添加刪除服務,服務消費者在收到更新通知後,能夠拉取最新的服務從而實現同步。能夠在註冊中心實現統一配置,參數的動態調整能夠自動通知到全部服務節點。redis

clipboard.png

2.Dubbo四種註冊中心實現

Dubbo註冊中心的實如今dubbo-registry模塊。apache

2.1 ZooKeeper

基於Zookeeper。ZooKeeper學習segmentfault

2.1.1 Zookeeper註冊中心數據結構

Zookeeper註冊中心數據結構

2.1.2 Zookeeper註冊中心實現原理

Zookeeper註冊中心採用"事件通知" + 「客戶端拉取」的實現方式:緩存

  • 客戶端在第一次鏈接註冊中心的時候,會獲取對應目錄下的全量數據。
  • 客戶端會在訂閱的節點上註冊一個watch,客戶端與註冊中心之間保持TCP長鏈接。
  • 當節點發生事務操做,節點的版本號發生改變,就會觸發watch事件,推送數據給訂閱方,訂閱方收到通知以後,就會拉取對應目錄下的數據。
  • 服務治理中心會訂閱全部service層的數據,service被設置成"*",表示訂閱所有。
2.1.3 Zookeeper註冊和取消註冊
//註冊
protected void doRegister(URL url) {
    try {
        //建立目錄
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

//取消註冊
protected void doUnregister(URL url) {
    try {
        //刪除目錄
        zkClient.delete(toUrlPath(url));
    } catch (Throwable e) {
        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

private String toUrlPath(URL url) {
    return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}

//返回"/dubbo/接口名稱/providers"
private String toCategoryPath(URL url) {
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}

//返回"/dubbo/接口名稱"
private String toServicePath(URL url) {
    String name = url.getServiceInterface();
    if (Constants.ANY_VALUE.equals(name)) {
        return toRootPath();
    }
    return toRootDir() + URL.encode(name);
}

//返回"/"或者"/dubbo/"
private String toRootDir() {
    if (root.equals(Constants.PATH_SEPARATOR)) {
        return root;
    }
    return root + Constants.PATH_SEPARATOR;
}

private String toRootPath() {
    return root;
}

private final static String DEFAULT_ROOT = "dubbo";
private final ZookeeperClient zkClient;

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    //省略n行代碼
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    //添加前綴"/"
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    zkClient = zookeeperTransporter.connect(url);
    //省略n行代碼
}

ZookeeperClient和ZookeeperTransporter都是接口,在Dubbo中有兩種實現,一種是基於curator客戶端的CuratorZookeeperClient和CuratorZookeeperTransporter;另外一種是基於zkclient客戶端的ZkclientZookeeperClient和ZkclientZookeeperTransporter。默認實現是curator。網絡

@SPI("curator")
public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}

在不能根據Constants.CLIENT_KEY和Constants.TRANSPORTER_KEY找到匹配的URL的時候,就會使用默認的Curator實現。數據結構

2.1.4 Zookeeper訂閱的實現

先看看ConcurrentMap的put和putIfAbsent的區別異步

public static final String ANY_VALUE = "*";
public static final String INTERFACE_KEY = "interface";
public static final String CHECK_KEY = "check";

protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            //當接口名稱是*的時候表示訂閱所有Service
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                //默認值是"/dubbo"
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    //listeners爲空,說明緩存中沒有
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    //zkListener爲空,說明是第一次調用
                    listeners.putIfAbsent(listener, new ChildListener() {
                        //這個內部方法不會當即執行,會在觸發變動通知時執行
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            //遍歷全部子節點
                            for (String child : currentChilds) {
                                child = URL.decode(child);
                                //若是有子節點沒有訂閱,就進行訂閱。
                                if (!anyServices.contains(child)) {
                                    anyServices.add(child);
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                //建立持久節點
                zkClient.create(root, false);
                //遍歷訂閱持久節點的直接子節點
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList<URL>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
private String[] toCategoriesPath(URL url) {
    String[] categories;
    //若是類別是"*",訂閱四種類型的路徑(providers,consumers,routers,configurations)
    if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
        categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
                Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
    } else {
        //默認是providers
        categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
    }
    String[] paths = new String[categories.length];
    for (int i = 0; i < categories.length; i++) {
        paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
    }
    return paths;
}

2.2 Redis

基於Redis。Redis學習ide

2.2.1 Redis註冊中心數據結構

Redis註冊中心數據結構

2.2.2 Redis續期Key和清理Key

org.apache.dubbo.registry.redis.RedisRegistry.deferExpired()定義了Redis刷新key的過時時間和清除過時的key。服務提供者發佈一個服務,會先在Redis中建立一個Key,而後發佈register事件,在RedisRegistry的構造方法中啓動expireExecutor線程池週期性調用deferExpired方法刷新服務的過時時間,函數

private volatile boolean admin = false;

//獲取本地緩存全部已經註冊的key
for (URL url : new HashSet<URL>(getRegistered())) {
    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
        String key = toCategoryPath(url);
        //刷新過時時間
        if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
            //廣播,從新發布
            jedis.publish(key, Constants.REGISTER);
        }
    }
}
//若是是服務治理中心,須要清除過時的Key
if (admin) {
    clean(jedis);
}

org.apache.dubbo.common.Constants類定義常量。學習

public static final String DYNAMIC_KEY = "dynamic";

org.apache.dubbo.registry.support.AbstractRegistry.getRegistered()獲取緩存的key。

private final Set<URL> registered = new ConcurrentHashSet<URL>();

public Set<URL> getRegistered() {
    return registered;
}

public void register(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("register url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Register: " + url);
    }
    registered.add(url);
}

org.apache.dubbo.registry.redis.RedisRegistry.clean()服務治理中心刪除過時的key。

private void clean(Jedis jedis) {
    Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);
    if (keys != null && !keys.isEmpty()) {
        for (String key : keys) {
            //獲取jedis鏈接全部的key
            Map<String, String> values = jedis.hgetAll(key);
            if (values != null && values.size() > 0) {
                boolean delete = false;
                long now = System.currentTimeMillis();
                for (Map.Entry<String, String> entry : values.entrySet()) {
                    URL url = URL.valueOf(entry.getKey());
                    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                        long expire = Long.parseLong(entry.getValue());
                        //若是key的過時時間早於當前時間,說明過時了。
                        if (expire < now) {
                            jedis.hdel(key, entry.getKey());
                            delete = true;
                            if (logger.isWarnEnabled()) {
                                logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
                            }
                        }
                    }
                }
                if (delete) {  
                    //廣播通知取消註冊哪一個Key。
                    jedis.publish(key, Constants.UNREGISTER);
                }
            }
        }
    }
}
2.2.3 Redis註冊

(1)org.apache.dubbo.registry.redis.RedisRegistry.doRegister(URL url)

public void doRegister(URL url) {
    // /dubbo/interface/providers(or consumers or configurations or routes)
    String key = toCategoryPath(url);
    //URL
    String value = url.toFullString();
    //計算過時時間
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    //遍歷鏈接池中的全部節點
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                //保存鍵值
                jedis.hset(key, value, expire);
                //發佈
                jedis.publish(key, Constants.REGISTER);
                success = true;
                //非replicate模式只須要寫入一個節點,不然寫入所有節點
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}

(2)org.apache.dubbo.registry.redis.RedisRegistry.toCategoryPath(URL url)

private String toCategoryPath(URL url) {
    //toServicePath(url) 獲得根節點(默認獲得"/dubbo/")和接口名稱
    //Constants.PATH_SEPARATOR("/")
    //url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY)默認獲得"providers"
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}

//拼接根節點和接口名稱
private String toServicePath(URL url) {
    return root + url.getServiceInterface();
}

RedisRegistry的構造函數對root進行了賦值:

private final static String DEFAULT_ROOT = "dubbo";

public RedisRegistry(URL url) {
    ....省略N行代碼...
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    if (!group.endsWith(Constants.PATH_SEPARATOR)) {
        group = group + Constants.PATH_SEPARATOR;
    }
    this.root = group;
    ....省略N行代碼...
}

再來看看org.apache.dubbo.common.URL.getServiceInterface()

public String getServiceInterface() {
    return getParameter(Constants.INTERFACE_KEY, path);
}

public String getParameter(String key, String defaultValue) {
    String value = getParameter(key);
    if (value == null || value.length() == 0) {
        return defaultValue;
    }
    return value;
}

public String getParameter(String key) {
    String value = parameters.get(key);
    if (value == null || value.length() == 0) {
        value = parameters.get(Constants.DEFAULT_KEY_PREFIX + key);
    }
    return value;
}

在org.apache.dubbo.common.Constants類中定義了常量:

public final static String PATH_SEPARATOR = "/";
public static final String CATEGORY_KEY = "category";
public static final String DEFAULT_CATEGORY = PROVIDERS_CATEGORY;
public static final String PROVIDERS_CATEGORY = "providers";
public static final String GROUP_KEY = "group";
public static final String DEFAULT_KEY_PREFIX = "default.";
public static final String INTERFACE_KEY = "interface";
2.2.4 Redis訂閱

首次訂閱時,會建立一個內部線程類Notifier,在啓動Notifier時異步訂閱通道,同時主線程一次性拉取註冊中心的全部事務信息。Notifier訂閱的通道推送事件實現後續註冊中心的信息變動。

RedisRegistry的 Notifier線程的run()方法部分代碼以下:

if (service.endsWith(Constants.ANY_VALUE)) {
    //若是以*結尾
    if (!first) {
        //若是訂閱過,獲取所有key,更新本地緩存
        first = false;
        Set<String> keys = jedis.keys(service);
        if (keys != null && !keys.isEmpty()) {
            for (String s : keys) {
                doNotify(jedis, s);
            }
        }
        resetSkip();//重置失敗計數器
    }
    //訂閱
    jedis.psubscribe(new NotifySub(jedisPool), service);
} else {
    if (!first) {
        first = false;
        doNotify(jedis, service);
        resetSkip();
    }
    jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); //訂閱指定類別
}

2.3 Simple

基於內存的默認實現。標準RPC服務,不支持集羣,可能出現單點故障。

2.4 Multicast

經過廣播地址實現互相發現。

3.緩存機制

緩存機制的目的是以空間換時間,若是每次遠程調用都要從註冊中心拉取一遍可用的遠程服務列表,會給網絡形成很大的壓力。在AbstractRegistry實現了通用的緩存機制。Dubbo在內存存儲一份,保存在properties對象中,在磁盤文件保存一份,放在file對象中。

//本地磁盤緩存
private final Properties properties = new Properties();
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
//本地磁盤緩存文件
private File file;

private void loadProperties() {
    if (file != null && file.exists()) {
        InputStream in = null;
        try {
            //讀取磁盤文件
            in = new FileInputStream(file);
            //加載到properties對象
            properties.load(in);
            if (logger.isInfoEnabled()) {
                logger.info("Load registry store file " + file + ", data: " + properties);
            }
        } catch (Throwable e) {
            logger.warn("Failed to load registry store file " + file, e);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
}

org.apache.dubbo.registry.support.AbstractRegistry.saveProperties(URL url)

private final boolean syncSaveFile;
if (syncSaveFile) {
    //同步保存
    doSaveProperties(version);
} else {
    //線程池異步保存
    registryCacheExecutor.execute(new SaveProperties(version));
}

4.重試機制

org.apache.dubbo.registry.support.FailBackRegistry添加了retry()方法。

相關文章
相關標籤/搜索