Dubbo源碼解析(六)註冊中心——redis

註冊中心——redis

目標:解釋覺得redis實現的註冊中心原理,解讀duubo-registry-redis的源碼

Redis是一個key-value存儲系統,交換數據很是快,redis之內存做爲數據存儲的介質,因此讀寫數據的效率極高,遠遠超過數據庫。redis支持豐富的數據類型,dubbo就利用了redis的value支持map的數據類型。redis的key爲服務名稱和服務的類型。map中的key爲URL地址,map中的value爲過時時間,用於判斷髒數據,髒數據由監控中心刪除。java

dubbo利用JRedis來鏈接到Redis分佈式哈希鍵-值數據庫,由於Jedis實例不是線程安全的,因此不能夠多個線程共用一個Jedis實例,可是建立太多的實現也很差由於這意味着會創建不少sokcet鏈接。 因此dubbo又用了JedisPool,JedisPool是一個線程安全的網絡鏈接池。能夠用JedisPool建立一些可靠Jedis實例,能夠從池中獲取Jedis實例,使用完後再把Jedis實例還回JedisPool。這種方式能夠避免建立大量socket鏈接而且會實現高效的性能。git

上述稍微介紹了dubbo用redis實現註冊中心的依賴,接下來讓咱們來看看具體的實現邏輯。下圖是包的結構:github

註冊中心redis目錄

包結構很是相似。接下來咱們就來解讀一下這兩個類。redis

(一)RedisRegistry

該類繼承了FailbackRegistry類,該類就是針對註冊中心核心的功能註冊、訂閱、取消註冊、取消訂閱,查詢註冊列表進行展開,基於redis來實現。數據庫

1.屬性

// 日誌記錄
private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);

// 默認的redis鏈接端口
private static final int DEFAULT_REDIS_PORT = 6379;

// 默認 Redis 根節點,涉及到的是dubbo的分組配置
private final static String DEFAULT_ROOT = "dubbo";

// 任務調度器
private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));

//Redis Key 過時機制執行器
private final ScheduledFuture<?> expireFuture;

// Redis 根節點
private final String root;

// JedisPool集合,map 的key爲 "ip:port"的形式
private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<String, JedisPool>();

// 通知器集合,key爲 Root + Service的形式
// 例如 /dubbo/com.alibaba.dubbo.demo.DemoService
private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();

// 重連時間間隔,單位:ms
private final int reconnectPeriod;

// 過時週期,單位:ms
private final int expirePeriod;

// 是否經過監控中心,用於判斷髒數據,髒數據由監控中心刪除
private volatile boolean admin = false;

// 是否複製模式
private boolean replicate;

能夠從屬性中看到基於redis的註冊中心能夠被監控中心監控,而且對過時的節點有清理的機制。segmentfault

2.構造方法

public RedisRegistry(URL url) {
    super(url);
    // 判斷地址是否爲空
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 實例化對象池
    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
    // 若是 testOnBorrow 被設置,pool 會在 borrowObject 返回對象以前使用 PoolableObjectFactory的 validateObject 來驗證這個對象是否有效
    // 要是對象沒經過驗證,這個對象會被丟棄,而後從新選擇一個新的對象。
    config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
    // 若是 testOnReturn 被設置, pool 會在 returnObject 的時候經過 PoolableObjectFactory 的validateObject 方法驗證對象
    // 若是對象沒經過驗證,對象會被丟棄,不會被放到池中。
    config.setTestOnReturn(url.getParameter("test.on.return", false));
    // 指定空閒對象是否應該使用 PoolableObjectFactory 的 validateObject 校驗,若是校驗失敗,這個對象會從對象池中被清除。
    // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0) 的時候纔會生效。
    config.setTestWhileIdle(url.getParameter("test.while.idle", false));
    if (url.getParameter("max.idle", 0) > 0)
        // 控制一個pool最多有多少個狀態爲空閒的jedis實例。
        config.setMaxIdle(url.getParameter("max.idle", 0));
    if (url.getParameter("min.idle", 0) > 0)
        // 控制一個pool最少有多少個狀態爲空閒的jedis實例。
        config.setMinIdle(url.getParameter("min.idle", 0));
    if (url.getParameter("max.active", 0) > 0)
        // 控制一個pool最多有多少個jedis實例。
        config.setMaxTotal(url.getParameter("max.active", 0));
    if (url.getParameter("max.total", 0) > 0)
        config.setMaxTotal(url.getParameter("max.total", 0));
    if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0)
        //表示當引入一個jedis實例時,最大的等待時間,若是超過等待時間,則直接拋出JedisConnectionException;
        config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
    if (url.getParameter("num.tests.per.eviction.run", 0) > 0)
        // 設置驅逐線程每次檢測對象的數量。這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候纔會生效。
        config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
    if (url.getParameter("time.between.eviction.runs.millis", 0) > 0)
        // 指定驅逐線程的休眠時間。若是這個值不是正數( >0),不會有驅逐線程運行。
        config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
    if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
        // 指定最小的空閒驅逐的時間間隔(空閒超過指定的時間的對象,會被清除掉)。
        // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候纔會生效。
        config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));

    // 獲取url中的集羣配置
    String cluster = url.getParameter("cluster", "failover");
    if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
        throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
    }
    // 設置是否爲複製模式
    replicate = "replicate".equals(cluster);

    List<String> addresses = new ArrayList<String>();
    addresses.add(url.getAddress());
    // 備用地址
    String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
    if (backups != null && backups.length > 0) {
        addresses.addAll(Arrays.asList(backups));
    }

    for (String address : addresses) {
        int i = address.indexOf(':');
        String host;
        int port;
        // 分割地址和端口號
        if (i > 0) {
            host = address.substring(0, i);
            port = Integer.parseInt(address.substring(i + 1));
        } else {
            // 沒有端口的設置默認端口
            host = address;
            port = DEFAULT_REDIS_PORT;
        }
        // 建立鏈接池並加入集合
        this.jedisPools.put(address, new JedisPool(config, host, port,
                url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
                url.getParameter("db.index", 0)));
    }

    // 設置url攜帶的鏈接超時時間,若是沒有配置,則設置默認爲3s
    this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
    // 獲取url中的分組配置,若是沒有配置,則默認爲dubbo
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    if (!group.endsWith(Constants.PATH_SEPARATOR)) {
        group = group + Constants.PATH_SEPARATOR;
    }
    // 設置redis 的根節點
    this.root = group;

    // 獲取過時週期配置,若是沒有,則默認爲60s
    this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    // 建立過時機制執行器
    this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                // 延長到期時間
                deferExpired(); // Extend the expiration time
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }
    }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}

構造方法首先是調用了父類的構造函數,而後是對對象池的一些配置進行了初始化,具體的我已經在註釋中寫明。在構造方法中還作了鏈接池的建立、過時機制執行器的建立,其中過時會進行延長到期時間的操做具體是在deferExpired方法中實現。還有一個關注點事該執行器的時間是取週期的一半。安全

3.deferExpired

private void deferExpired() {
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            // 得到鏈接池中的Jedis實例
            Jedis jedis = jedisPool.getResource();
            try {
                // 遍歷已經註冊的服務url集合
                for (URL url : new HashSet<URL>(getRegistered())) {
                    // 若是是非動態管理模式
                    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                        // 得到分類路徑
                        String key = toCategoryPath(url);
                        // 以hash 散列表的形式存儲
                        if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                            // 發佈 Redis 註冊事件
                            jedis.publish(key, Constants.REGISTER);
                        }
                    }
                }
                // 若是經過監控中心
                if (admin) {
                    // 刪除過期的髒數據
                    clean(jedis);
                }
                // 若是服務器端已同步數據,只需寫入單臺機器
                if (!replicate) {
                    break;//  If the server side has synchronized data, just write a single machine
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
            logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
        }
    }
}

該方法實現了延長到期時間的邏輯,遍歷了已經註冊的服務url,這裏會有一個是否爲非動態管理模式的判斷,也就是判斷該節點是否爲動態節點,只有動態節點是須要延長過時時間,由於動態節點須要人工刪除節點。延長過時時間就是從新註冊一次。而其餘的節點則會被監控中心清除,也就是調用了clean方法。clean方法下面會講到。服務器

4.clean

// The monitoring center is responsible for deleting outdated dirty data
private void clean(Jedis jedis) {
    // 得到全部的服務
    Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);
    if (keys != null && !keys.isEmpty()) {
        // 遍歷全部的服務
        for (String key : keys) {
            // 返回hash表key對應的全部域和值
            // redis的key爲服務名稱和服務的類型。map中的key爲URL地址,map中的value爲過時時間,用於判斷髒數據,髒數據由監控中心刪除
            Map<String, String> values = jedis.hgetAll(key);
            if (values != null && values.size() > 0) {
                boolean delete = false;
                long now = System.currentTimeMillis();
                for (Map.Entry<String, String> entry : values.entrySet()) {
                    URL url = URL.valueOf(entry.getKey());
                    // 是否爲動態節點
                    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                        long expire = Long.parseLong(entry.getValue());
                        // 判斷是否過時
                        if (expire < now) {
                            // 刪除記錄
                            jedis.hdel(key, entry.getKey());
                            delete = true;
                            if (logger.isWarnEnabled()) {
                                logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
                            }
                        }
                    }
                }
                // 取消註冊
                if (delete) {
                    jedis.publish(key, Constants.UNREGISTER);
                }
            }
        }
    }
}

該方法就是用來清理過時數據的,以前我提到過dubbo在redis存儲數據的數據結構形式,就是redis的key爲服務名稱和服務的類型。map中的key爲URL地址,map中的value爲過時時間,用於判斷髒數據,髒數據由監控中心刪除,那麼判斷過時就是經過map中的value來判別。邏輯就是在redis中先把記錄刪除,而後在取消訂閱。微信

5.isAvailable

@Override
public boolean isAvailable() {
    // 遍歷鏈接池集合
    for (JedisPool jedisPool : jedisPools.values()) {
        try {
            // 從鏈接池中得到jedis實例
            Jedis jedis = jedisPool.getResource();
            try {
                // 判斷是否有redis服務器被鏈接着
                // 只要有一臺鏈接,則算註冊中心可用
                if (jedis.isConnected()) {
                    return true; // At least one single machine is available.
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
        }
    }
    return false;
}

該方法是判斷註冊中心是否可用,經過redis是否鏈接來判斷,只要有一臺redis可鏈接,就算註冊中心可用。網絡

6.destroy

@Override
public void destroy() {
    super.destroy();
    try {
        // 關閉過時執行器
        expireFuture.cancel(true);
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    try {
        // 關閉通知器
        for (Notifier notifier : notifiers.values()) {
            notifier.shutdown();
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            // 銷燬鏈接池
            jedisPool.destroy();
        } catch (Throwable t) {
            logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
        }
    }
    // 關閉任務調度器
    ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
}

這是銷燬的方法,邏輯很清晰,gracefulShutdown方法在《dubbo源碼解析(四)註冊中心——dubbo》中已經講到。

7.doRegister

@Override
public void doRegister(URL url) {
    // 得到分類路徑
    String key = toCategoryPath(url);
    // 得到URL字符串做爲 Value
    String value = url.toFullString();
    // 計算過時時間
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    // 遍歷鏈接池集合
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                // 寫入 Redis Map 鍵
                jedis.hset(key, value, expire);
                // 發佈 Redis 註冊事件
                // 這樣訂閱該 Key 的服務消費者和監控中心,就會實時從 Redis 讀取該服務的最新數據。
                jedis.publish(key, Constants.REGISTER);
                success = true;
                // 若是服務器端已同步數據,只需寫入單臺機器
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}

該方法是實現了父類FailbackRegistry的抽象方法,主要是實現了註冊的功能,具體的邏輯是先將須要註冊的服務信息保存到redis中,而後發佈redis註冊事件。

8.doUnregister

@Override
public void doUnregister(URL url) {
    // 得到分類路徑
    String key = toCategoryPath(url);
    // 得到URL字符串做爲 Value
    String value = url.toFullString();
    RpcException exception = null;
    boolean success = false;
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                // 刪除redis中的記錄
                jedis.hdel(key, value);
                // 發佈redis取消註冊事件
                jedis.publish(key, Constants.UNREGISTER);
                success = true;
                // 若是服務器端已同步數據,只需寫入單臺機器
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}

該方法也是實現了父類的抽象方法,當服務消費者或者提供者關閉時,會調用該方法來取消註冊。邏輯就是跟註冊方法方法,先從redis中刪除服務相關記錄,而後發佈取消註冊的事件,從而實時通知訂閱者們。

9.doSubscribe

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    // 返回服務地址
    String service = toServicePath(url);
    // 得到通知器
    Notifier notifier = notifiers.get(service);
    // 若是沒有該服務的通知器,則建立一個
    if (notifier == null) {
        Notifier newNotifier = new Notifier(service);
        notifiers.putIfAbsent(service, newNotifier);
        notifier = notifiers.get(service);
        // 保證併發狀況下,有且只有一個通知器啓動
        if (notifier == newNotifier) {
            notifier.start();
        }
    }
    boolean success = false;
    RpcException exception = null;
    // 遍歷鏈接池集合進行訂閱,直到有一個訂閱成功,僅僅向一個redis進行訂閱
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                // 若是服務地址爲*結尾,也就是處理全部的服務層發起的訂閱
                if (service.endsWith(Constants.ANY_VALUE)) {
                    admin = true;
                    // 得到分類層的集合 例如:/dubbo/com.alibaba.dubbo.demo.DemoService/providers
                    Set<String> keys = jedis.keys(service);
                    if (keys != null && !keys.isEmpty()) {
                        // 按照服務聚合url
                        Map<String, Set<String>> serviceKeys = new HashMap<String, Set<String>>();
                        for (String key : keys) {
                            // 得到服務路徑,截掉多餘部分
                            String serviceKey = toServicePath(key);
                            Set<String> sk = serviceKeys.get(serviceKey);
                            if (sk == null) {
                                sk = new HashSet<String>();
                                serviceKeys.put(serviceKey, sk);
                            }
                            sk.add(key);
                        }
                        // 按照每一個服務層進行發起通知,由於服務地址爲*結尾
                        for (Set<String> sk : serviceKeys.values()) {
                            doNotify(jedis, sk, url, Arrays.asList(listener));
                        }
                    }
                } else {
                    // 處理指定的服務層發起的通知
                    doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Arrays.asList(listener));
                }
                // 只在一個redis上進行訂閱
                success = true;
                break; // Just read one server's data
            } finally {
                jedis.close();
            }
        } catch (Throwable t) { // Try the next server
            exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        // 雖然發生異常,但結果仍然成功
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}

該方法是實現了訂閱的功能。注意如下幾個點:

  1. 服務只會向一個redis進行訂閱,只要有一個訂閱成功就結束訂閱。
  2. 根據url攜帶的服務地址來調用doNotify的兩個重載方法。其中一個只是遍歷通知了全部服務的監聽器,doNotify方法我會在後面講到。

10.doUnsubscribe

@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
}

該方法原本是取消訂閱的實現,不過dubbo中並未實現該邏輯。

11.doNotify

private void doNotify(Jedis jedis, String key) {
    // 遍歷全部的通知器,調用重載方法今天通知
    for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(getSubscribed()).entrySet()) {
        doNotify(jedis, Arrays.asList(key), entry.getKey(), new HashSet<NotifyListener>(entry.getValue()));
    }
}

private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
    if (keys == null || keys.isEmpty()
            || listeners == null || listeners.isEmpty()) {
        return;
    }
    long now = System.currentTimeMillis();
    List<URL> result = new ArrayList<URL>();
    // 得到分類集合
    List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
    // 經過url得到服務接口
    String consumerService = url.getServiceInterface();
    // 遍歷分類路徑,例如/dubbo/com.alibaba.dubbo.demo.DemoService/providers
    for (String key : keys) {
        // 判斷服務是否匹配
        if (!Constants.ANY_VALUE.equals(consumerService)) {
            String prvoiderService = toServiceName(key);
            if (!prvoiderService.equals(consumerService)) {
                continue;
            }
        }
        // 從分類路徑上得到分類名
        String category = toCategoryName(key);
        // 判斷訂閱的分類是否包含該分類
        if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
            continue;
        }
        List<URL> urls = new ArrayList<URL>();
        // 返回全部的URL集合
        Map<String, String> values = jedis.hgetAll(key);
        if (values != null && values.size() > 0) {
            for (Map.Entry<String, String> entry : values.entrySet()) {
                URL u = URL.valueOf(entry.getKey());
                // 判斷是否爲動態節點,由於動態節點不受過時限制。而且判斷是否過時
                if (!u.getParameter(Constants.DYNAMIC_KEY, true)
                        || Long.parseLong(entry.getValue()) >= now) {
                    // 判斷url是否合法
                    if (UrlUtils.isMatch(url, u)) {
                        urls.add(u);
                    }
                }
            }
        }
        // 若不存在匹配的url,則建立 `empty://` 的 URL返回,用於清空該服務的該分類。
        if (urls.isEmpty()) {
            urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
                    .setAddress(Constants.ANYHOST_VALUE)
                    .setPath(toServiceName(key))
                    .addParameter(Constants.CATEGORY_KEY, category));
        }
        result.addAll(urls);
        if (logger.isInfoEnabled()) {
            logger.info("redis notify: " + key + " = " + urls);
        }
    }
    if (result == null || result.isEmpty()) {
        return;
    }
    // 所有數據完成後,調用通知方法,來通知監聽器
    for (NotifyListener listener : listeners) {
        notify(url, listener, result);
    }
}

該方法實現了通知的邏輯,有兩個重載方法,第二個比第一個多了幾個參數,其實惟一的區別就是第一個重載方法是通知了全部的監聽器,內部邏輯中調用了getSubscribed方法獲取全部的監聽器,該方法的解釋能夠查看《dubbo源碼解析(三)註冊中心——開篇》中關於subscribed屬性的解釋。而第二個重載方法就是對一個指定的監聽器進行通知。

具體的邏輯在第二個重載的方法中,其中有如下幾個須要注意的點:

  1. 通知的事件要和監聽器匹配。
  2. 不一樣的角色會關注不一樣的分類,服務消費者會關注providers、configurations、routes這幾個分類,而服務提供者會關注consumers分類,監控中心會關注全部分類。
  3. 遍歷分類路徑,分類路徑是Root + Service + Type。

12.toServiceName

private String toServiceName(String categoryPath) {
    String servicePath = toServicePath(categoryPath);
    return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
}

該方法很簡單,就是從服務路徑上得到服務名,這裏就很少作解釋了。

13.toCategoryName

private String toCategoryName(String categoryPath) {
    int i = categoryPath.lastIndexOf(Constants.PATH_SEPARATOR);
    return i > 0 ? categoryPath.substring(i + 1) : categoryPath;
}

該方法的做用是從分類路徑上得到分類名。

14.toServicePath

private String toServicePath(String categoryPath) {
    int i;
    if (categoryPath.startsWith(root)) {
        i = categoryPath.indexOf(Constants.PATH_SEPARATOR, root.length());
    } else {
        i = categoryPath.indexOf(Constants.PATH_SEPARATOR);
    }
    return i > 0 ? categoryPath.substring(0, i) : categoryPath;
}

private String toServicePath(URL url) {
    return root + url.getServiceInterface();
}

這兩個方法都是得到服務地址,第一個方法主要是截掉多餘的部分,第二個方法主要是從url配置中獲取關於服務地址的值跟根節點拼接。

15.toCategoryPath

private String toCategoryPath(URL url) {
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}

該方法是得到分類路徑,格式是Root + Service + Type。

16.內部類NotifySub

private class NotifySub extends JedisPubSub {

    private final JedisPool jedisPool;

    public NotifySub(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    @Override
    public void onMessage(String key, String msg) {
        if (logger.isInfoEnabled()) {
            logger.info("redis event: " + key + " = " + msg);
        }
        // 若是是註冊事件或者取消註冊事件
        if (msg.equals(Constants.REGISTER)
                || msg.equals(Constants.UNREGISTER)) {
            try {
                Jedis jedis = jedisPool.getResource();
                try {
                    // 通知監聽器
                    doNotify(jedis, key);
                } finally {
                    jedis.close();
                }
            } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
                logger.error(t.getMessage(), t);
            }
        }
    }

    @Override
    public void onPMessage(String pattern, String key, String msg) {
        onMessage(key, msg);
    }

    @Override
    public void onSubscribe(String key, int num) {
    }

    @Override
    public void onPSubscribe(String pattern, int num) {
    }

    @Override
    public void onUnsubscribe(String key, int num) {
    }

    @Override
    public void onPUnsubscribe(String pattern, int num) {
    }

}

NotifySub是RedisRegistry的一個內部類,繼承了JedisPubSub類,JedisPubSub類中定義了publish/subsribe的回調方法。經過繼承JedisPubSub類並從新實現這些回調方法,當publish/subsribe事件發生時,咱們能夠定製本身的處理邏輯。這裏實現了onMessage和onPMessage兩個方法,當收到註冊和取消註冊的事件的時候通知相關的監聽器數據變化,從而實現實時更新數據。

17.內部類Notifier

該類繼承 Thread 類,負責向 Redis 發起訂閱邏輯。

1.屬性

// 服務名:Root + Service
private final String service;
// 須要忽略鏈接的次數
private final AtomicInteger connectSkip = new AtomicInteger();
// 已經忽略鏈接的次數
private final AtomicInteger connectSkiped = new AtomicInteger();
// 隨機數
private final Random random = new Random();
// jedis實例
private volatile Jedis jedis;
// 是不是首次通知
private volatile boolean first = true;
// 是否運行中
private volatile boolean running = true;
// 鏈接次數隨機數
private volatile int connectRandom;

上述屬性中,部分屬性都是爲了redis的重連策略,用於在和redis斷開連接時,忽略必定的次數和redis的鏈接,避免空跑。

2.resetSkip

private void resetSkip() {
    connectSkip.set(0);
    connectSkiped.set(0);
    connectRandom = 0;
}

該方法就是重置忽略鏈接的信息。

3.isSkip

private boolean isSkip() {
    // 得到忽略次數
    int skip = connectSkip.get(); // Growth of skipping times
    // 若是忽略次數超過10次,那麼取隨機數,加上一個10之內的隨機數
    // 鏈接失敗的次數越多,每一輪加大須要忽略的總次數,而且帶有必定的隨機性。
    if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
        if (connectRandom == 0) {
            connectRandom = random.nextInt(10);
        }
        skip = 10 + connectRandom;
    }
    // 自增忽略次數。若忽略次數不夠,則繼續忽略。
    if (connectSkiped.getAndIncrement() < skip) { // Check the number of skipping times
        return true;
    }
    // 增長鬚要忽略的次數
    connectSkip.incrementAndGet();
    // 重置已忽略次數和隨機數
    connectSkiped.set(0);
    connectRandom = 0;
    return false;
}

該方法是用來判斷忽略本次對redis的鏈接。首先得到須要忽略的次數,若是忽略次數不小於10次,則加上一個10之內的隨機數,而後判斷自增的忽略次數,若是次數不夠,則繼續忽略,若是次數夠了,增長鬚要忽略的次數,重置已經忽略的次數和隨機數。主要的思想是鏈接失敗的次數越多,每一輪加大須要忽略的總次數,而且帶有必定的隨機性。

4.run

@Override
public void run() {
    // 當通知器正在運行中時
    while (running) {
        try {
            // 若是不忽略鏈接
            if (!isSkip()) {
                try {
                    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
                        JedisPool jedisPool = entry.getValue();
                        try {
                            jedis = jedisPool.getResource();
                            try {
                                // 是否爲監控中心
                                if (service.endsWith(Constants.ANY_VALUE)) {
                                    // 若是不是第一次通知
                                    if (!first) {
                                        first = false;
                                        Set<String> keys = jedis.keys(service);
                                        if (keys != null && !keys.isEmpty()) {
                                            for (String s : keys) {
                                                // 通知
                                                doNotify(jedis, s);
                                            }
                                        }
                                        // 重置
                                        resetSkip();
                                    }
                                    // 批准訂閱
                                    jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
                                } else {
                                    // 若是不是監控中心,而且不是第一次通知
                                    if (!first) {
                                        first = false;
                                        // 單獨通知一個服務
                                        doNotify(jedis, service);
                                        // 重置
                                        resetSkip();
                                    }
                                    // 批准訂閱
                                    jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
                                }
                                break;
                            } finally {
                                jedis.close();
                            }
                        } catch (Throwable t) { // Retry another server
                            logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
                            // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
                            // 發生異常,說明 Redis 鏈接斷開了,須要等待reconnectPeriod時間
                            //經過這樣的方式,避免執行,佔用大量的 CPU 資源。
                            sleep(reconnectPeriod);
                        }
                    }
                } catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                    sleep(reconnectPeriod);
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }
}

該方法是線程的run方法,應該很熟悉,其中作了相關訂閱的邏輯,其中根據redis的重連策略作了一些忽略鏈接的策略,也就是調用了上述講解的isSkip方法,訂閱就是調用了jedis.psubscribe方法,它是訂閱給定模式相匹配的全部頻道。

4.shutdown

public void shutdown() {
    try {
        // 更改狀態
        running = false;
        // jedis斷開鏈接
        jedis.disconnect();
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

該方法是斷開鏈接的方法。

(二)RedisRegistryFactory

該類繼承了AbstractRegistryFactory類,實現了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原代碼:

public class RedisRegistryFactory extends AbstractRegistryFactory {

    @Override
    protected Registry createRegistry(URL url) {
        return new RedisRegistry(url);
    }

}

能夠看到就是實例化了RedisRegistry而已,全部這裏就不解釋了。

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了dubbo利用redis來實現註冊中心,其中關鍵的是須要弄明白dubbo在redis中存儲的數據結構,也就是key-value中key表明什麼,value表明什麼。還有就是須要了解JRedis和JedisPool,其餘的邏輯並不複雜。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。

相關文章
相關標籤/搜索