Dubbo(七):redis註冊中心的應用

  上篇咱們講了Dubbo中有一個很是本質和重要的功能,那就是服務的自動註冊與發現,而這個功能是經過註冊中心來實現的。上篇中使用zookeeper實現了註冊中心的功能,同時了提了dubbo中有其餘許多的註冊中心的實現。git

  今天咱們就來看看另外一個註冊中心的實現吧: redis 。redis

 

1. dubbo在 Redis 中的服務分佈

  dubbo在zk中的服務體現是一個個的文件路徑形式,如 /dubbo/xxx.xx.XxxService/providers/xxx 。 而在redis中,則體現是一個個的緩存key-value。具體分佈以下:數據庫

    /dubbo/xxx.xx.XxxService/providers: 以hash類型存放全部提供者列表, 每一個hash的字段爲 url -> expireTime
    /dubbo/xxx.xx.XxxService/consumers: 以hash類型存放全部消費者列表, 每一個hash的字段爲 url -> expireTime
    /dubbo/xxx.xx.XxxService/configurators: 存放配置信息
    /dubbo/xxx.xx.XxxService/routers: 存放路由配置信息apache

  如上,一樣,redis也是以service爲粒度進行存儲劃分的。緩存

 

2. Redis 組件的接入

  你可能須要先引入redis註冊依賴包:安全

        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-registry-redis</artifactId>
        </dependency>

  在配置dubbo服務時,須要將註冊中心換爲 redis, 以下選合適的一個便可:服務器

    <dubbo:registry address="redis://127.0.0.1:6379" cluster="failover" />
    <dubbo:registry address="redis://10.20.153.10:6379?backup=10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />
    <dubbo:registry protocol="redis" address="127.0.0.1:6379" cluster="failover" />
    <dubbo:registry protocol="redis" address="10.20.153.10:6379,10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />

  cluster 設置 redis 集羣策略,缺省爲 failover:(這個配置不會和集羣容錯配置有誤會麼,尷尬)網絡

    failover: 失效轉移策略。只寫入和讀取任意一臺,失敗時重試另外一臺,須要服務器端自行配置數據同步;session

    replicate: 複製模式策略。在客戶端同時寫入全部服務器,只讀取單臺,服務器端不須要同步,註冊中心集羣增大,性能壓力也會更大;併發

  redis做爲註冊中心與zk做爲註冊的前置操做都是同樣的。都是一是做爲服務提供者時會在 ServiceConfig#doExportUrlsFor1Protocol 中,進行遠程服務暴露時會拉起。二是在消費者在進行遠程調用時會 ReferenceConfig#createProxy 時拉取以便獲取提供者列表。

  只是在依賴注入 RegistryFactory 時,根據是 zookeeper/redis, 選擇了不同的 RegistryFactory, 因此建立了不一樣的註冊中心實例。

  redis 中根據SPI的配置建立, RedisRegistryFactory 工廠, 配置文件 META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory 的內容以下:

redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
    /**
     * Get an instance of registry based on the address of invoker
     *
     * @param originInvoker
     * @return
     */
    protected Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        // RegistryFactory 又是經過 SPI 機制生成的    
        // 會根據具體的註冊中心的類型建立調用具體實例,如此處爲: redis, 因此會調用 RedisRegistryFactory.getRegistry()
        return registryFactory.getRegistry(registryUrl);
    }
    // 全部 RegistryFactory 都會被包裝成 RegistryFactoryWrapper, 以便修飾
    // org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry
    @Override
    public Registry getRegistry(URL url) {
        // 對於zk, 會調用 RedisRegistryFactory
        return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
                        .getActivateExtension(url, "registry.listeners")));
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL)
    @Override
    public Registry getRegistry(URL url) {
        if (destroyed.get()) {
            LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
            return DEFAULT_NOP_REGISTRY;
        }

        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            // 調用子類方法建立 registry 實例,此處爲 RedisRegistryFactory.createRegistry
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistryFactory#createRegistry
    @Override
    protected Registry createRegistry(URL url) {
        // 最終將redis組件接入到應用中了,後續就可使用redis提供的相應功能了
        return new RedisRegistry(url);
    }

  至此,redis被接入了。咱們先來看下 redis 註冊中心構造方法實現:

    // org.apache.dubbo.registry.redis.RedisRegistry#RedisRegistry
    public RedisRegistry(URL url) {
        // RedisRegistry 與zk同樣,一樣繼承了 FailbackRegistry
        // 因此,一樣會建立retryTimer, 一樣會建立緩存文件
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 使用redis鏈接池處理事務
        // 設置各配置項
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
        config.setTestOnReturn(url.getParameter("test.on.return", false));
        config.setTestWhileIdle(url.getParameter("test.while.idle", false));
        if (url.getParameter("max.idle", 0) > 0) {
            config.setMaxIdle(url.getParameter("max.idle", 0));
        }
        if (url.getParameter("min.idle", 0) > 0) {
            config.setMinIdle(url.getParameter("min.idle", 0));
        }
        if (url.getParameter("max.active", 0) > 0) {
            config.setMaxTotal(url.getParameter("max.active", 0));
        }
        if (url.getParameter("max.total", 0) > 0) {
            config.setMaxTotal(url.getParameter("max.total", 0));
        }
        if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
            config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
        }
        if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
            config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
        }
        if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
            config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
        }
        if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
            config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
        }
        // redis 複用了cluster配置項?
        String cluster = url.getParameter("cluster", "failover");
        if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
            throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
        }
        replicate = "replicate".equals(cluster);

        List<String> addresses = new ArrayList<>();
        addresses.add(url.getAddress());
        String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
        if (ArrayUtils.isNotEmpty(backups)) {
            addresses.addAll(Arrays.asList(backups));
        }
        //得到Redis主節點名稱
        String masterName = url.getParameter(REDIS_MASTER_NAME_KEY);
        if (StringUtils.isEmpty(masterName)) {
            //單機版redis
            for (String address : addresses) {
                int i = address.indexOf(':');
                String host;
                int port;
                if (i > 0) {
                    host = address.substring(0, i);
                    port = Integer.parseInt(address.substring(i + 1));
                } else {
                    host = address;
                    port = DEFAULT_REDIS_PORT;
                }
                this.jedisPools.put(address, new JedisPool(config, host, port,
                        url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
                        url.getParameter("db.index", 0)));
            }
        } else {
            //哨兵版redis
            Set<String> sentinelSet = new HashSet<>(addresses);
            int index = url.getParameter("db.index", 0);
            int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            String password = StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword();
            JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinelSet, config, timeout, password, index);
            this.jedisPools.put(masterName, pool);
        }

        this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD);
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        if (!group.endsWith(PATH_SEPARATOR)) {
            group = group + PATH_SEPARATOR;
        }
        this.root = group;
        // session=60000, 默認1分鐘過時
        this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
        // 使用定時任務刷新存活狀態,至關於心跳維護線程,定時任務頻率爲 session有效其的1/2
        this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
            try {
                deferExpired(); // Extend the expiration time
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
    }

  RedisRegistry構造方法中,主要完成redis配置信息的轉換接入,建立鏈接池,默認使用0號數據庫。另外,每一個客戶端都是單例的RedisRegistry, 因此也就是說會開啓一個過時掃描定時任務(能夠稱之爲心跳任務)。

 

3. Redis 服務提供者註冊

  與ZK過程相似,服務註冊主要就分兩步:1. 獲取registry實例(經過SPI機制); 2. 將服務的信息註冊到註冊中心。只是zk是路徑,redis是kv.

    // org.apache.dubbo.registry.redis.RedisRegistry#doRegister
    @Override
    public void doRegister(URL url) {
        // 與zk一致,按服務組裝key前綴
        String key = toCategoryPath(url);
        // 全服務路徑做爲value
        String value = url.toFullString();
        String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 使用hash存儲提供者/消費者 標識,帶過時時間(該時間需後續主動斷定,redis並不維護該狀態)
                    // 註冊好自向標識後,pub一條消息,以便其餘客戶端能夠sub感知到該服務
                    jedis.hset(key, value, expire);
                    jedis.publish(key, REGISTER);
                    success = true;
                    // 若是不是複製模式的redis 服務(即爲failover模式),只需往一個redis寫數據便可,
                    // 剩餘redis自行同步實際上這裏應該是存在數據一致性問題的
                    if (!replicate) {
                        break; //  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        // 只要有一個成功,即算成功
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

  以hash類型存放全部提供者列表, key爲服務粒度的前綴信息: /dubbo/xxx.xx.XxxService/providers, hash中每一個field->value表示,服務全路徑信息->過時時間。

  經過redis的 pub/sub 機制,通知其餘客戶端變化。註冊時發佈一條消息到提供者路徑, publish <key> register 。 

 

4. redis 消費者服務訂閱

  服務註冊的目的,主要是讓註冊中心及其餘應用端能夠發現本身。而服務訂閱則爲了讓本身能夠發現別的系統的變化。如查找全部提供者列表,接收應用上下線通知,開啓監聽等等。

    // org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        String service = toServicePath(url);
        // 基於service開啓訂閱線程
        Notifier notifier = notifiers.get(service);
        if (notifier == null) {
            // 主動開啓一個 notifier 線程,進行subscribe處理
            // 若是service不少,那就意味着有不少的此類線程,這並非件好事
            Notifier newNotifier = new Notifier(service);
            notifiers.putIfAbsent(service, newNotifier);
            notifier = notifiers.get(service);
            if (notifier == newNotifier) {
                notifier.start();
            }
        }
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    if (service.endsWith(ANY_VALUE)) {
                        admin = true;
                        Set<String> keys = jedis.keys(service);
                        if (CollectionUtils.isNotEmpty(keys)) {
                            Map<String, Set<String>> serviceKeys = new HashMap<>();
                            for (String key : keys) {
                                String serviceKey = toServicePath(key);
                                Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
                                sk.add(key);
                            }
                            for (Set<String> sk : serviceKeys.values()) {
                                doNotify(jedis, sk, url, Collections.singletonList(listener));
                            }
                        }
                    } else {
                        // 首次訂閱,使用 keys xx/* 將全部服務信息存儲到本地
                        doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
                    }
                    success = true;
                    break; // Just read one server's data
                }
            } catch (Throwable t) { // Try the next server
                exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

  與zk的直接調用zkClient.addChildListener()實現訂閱不一樣,redis中使用了多個獨立的訂閱線程,使用pub/sub機制進行處理。(因redis的pub/sub是基於channel進行的長鏈接通訊,因此每一個service只能使用單獨的線程,有點傷!)。 使用 doNotify() 將redis中的數據接入應用中。在作訂閱的同時,也拉取了提供者服務列表達到初始化的做用。

 

5. Redis 服務下線處理

  當應用要關閉,或者註冊失敗時,須要進行服務下線。固然,若是應用沒有及時作下線處理,zk會經過其自身的臨時節點過時機制,也會將該服務作下線處理。從而避免消費者或管理臺看到無效的服務存在。

  應用服務的主動下線操做是由 ShutdownHookCallbacks 和在判斷服務不可用時進行的 invoker.destroy() 來實現優雅下線。

    // org.apache.dubbo.registry.integration.RegistryDirectory#destroy
    @Override
    public void destroy() {
        if (isDestroyed()) {
            return;
        }

        // unregister.
        try {
            if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
                registry.unregister(getRegisteredConsumerUrl());
            }
        } catch (Throwable t) {
            logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t);
        }
        // unsubscribe.
        try {
            if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
                registry.unsubscribe(getConsumerUrl(), this);
            }
            ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
                    .removeListener(ApplicationModel.getApplication(), CONSUMER_CONFIGURATION_LISTENER);
        } catch (Throwable t) {
            logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t);
        }
        super.destroy(); // must be executed after unsubscribing
        try {
            destroyAllInvokers();
        } catch (Throwable t) {
            logger.warn("Failed to destroy service " + serviceKey, t);
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#unregister
    @Override
    public void unregister(URL url) {
        super.unregister(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a cancellation request to the server side
            doUnregister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedUnregistered(url);
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistry#doUnregister
    @Override
    public void doUnregister(URL url) {
        String key = toCategoryPath(url);
        String value = url.toFullString();
        RpcException exception = null;
        boolean success = false;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 直接刪除當前服務對應的 key-field 信息
                    // 而後發佈一條 UNREGISTER 消息,通知其餘客戶端
                    jedis.hdel(key, value);
                    jedis.publish(key, UNREGISTER);
                    success = true;
                    // 若是redis 是複製模型,須要在每一個redis上都作一次刪除
                    // 此時各應用端將會重複收到消息,重複處理,看起來並非件好事
                    if (!replicate) {
                        break; //  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

  總結: 下線處理兩步驟: 1. 刪除對應的hash key-field; 2. publish 一個下線消息通知其餘應用; 3. 針對redis的集羣配置決定是刪除1次或n次,且反覆通知操做;

 

6. redis 服務解除事件訂閱

  事實上,redis的 doUnsubscribe, 已再也不處理任何事件。

    @Override
    public void doUnsubscribe(URL url, NotifyListener listener) {
    }

  那麼,前面註冊的多個 Notifier 監聽線程就無論了嗎?那確定是不行的,它會在 destroy() 被調用時進行收尾處理。實際上,它是 unregister() 的後續工做。

    // org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll
    /**
     * Close all created registries
     */
    public static void destroyAll() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        LOCK.lock();
        try {
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistry#destroy
    @Override
    public void destroy() {
        // 該方法甚至能夠去調用 unregister(), unsubscribe() 方法
        super.destroy();
        try {
            expireFuture.cancel(true);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        try {
            // 遍歷全部 notifiers, 依次調用 shutdown, 即中止訂閱工做
            for (Notifier notifier : notifiers.values()) {
                notifier.shutdown();
            }
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                jedisPool.destroy();
            } catch (Throwable t) {
                logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
        // 最後優雅關閉過時掃描定時任務線程池,即 shutdown()..awaitTermination()的應用。
        ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
    }
        // 中止notifier
        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#shutdown
        public void shutdown() {
            try {
                // step1. 設置中止標識
                // step2. 斷開redis鏈接,這不僅是一斷開的操做,它會中止psubscribe的調用,從而間接停止訂閱線程工做
                running = false;
                jedis.disconnect();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    // 以下方法,便是其父類的 destroy(), 裏面涵蓋了未關閉的 地址信息,則會觸發 unregister, unsubscribe
    // org.apache.dubbo.registry.support.AbstractRegistry#destroy
    @Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        Set<URL> destroyRegistered = new HashSet<>(getRegistered());
        // step1. unregister 未下線的服務
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<>(getRegistered())) {
                if (url.getParameter(DYNAMIC_KEY, true)) {
                    try {
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // step2. unsubscribe 未取消訂閱的服務
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // step3. 從已註冊列表中刪除當前實例
        AbstractRegistryFactory.removeDestroyedRegistry(this);
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#removeDestroyedRegistry
    public static void removeDestroyedRegistry(Registry toRm) {
        LOCK.lock();
        try {
            REGISTRIES.entrySet().removeIf(entry -> entry.getValue().equals(toRm));
        } finally {
            LOCK.unlock();
        }
    }

  總結:此處講了更多unregister,unsubscribe的前置操做。而 notifier.shutdown(); 纔是關閉redis訂閱相關工做的關鍵。它是經過設置中止循環標識,以及關閉redis鏈接實現的。事實上,這各取消訂閱方式並無很優雅。

 

7. 服務心跳的維護處理

  redis自己只是一個緩存存儲系統,心跳邏輯須要自行實現。實際上,咱們也能夠依賴於redis的自動過時機制,進行心跳續期。那麼,redis註冊中心是否也是這樣實現的呢?好像並非!

    // 在 RedisRegistry 的構造方法中,初始化了一個定時任務的調度
     this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
            try {
                deferExpired(); // Extend the expiration time
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
    // org.apache.dubbo.registry.redis.RedisRegistry#deferExpired
    private void deferExpired() {
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 取出全部註冊了的服務,進行心跳更新
                    for (URL url : new HashSet<>(getRegistered())) {
                        if (url.getParameter(DYNAMIC_KEY, true)) {
                            String key = toCategoryPath(url);
                            // 增長過時時間+expirePeriod, url -> expireAt
                            if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                                // 若是是第一次新增該值,或者從新新增該值(可能因爲原來的地址過時被刪除),則觸發一次regiter的消息發佈,自會有相應訂閱者處理該變動
                                jedis.publish(key, REGISTER);
                            }
                        }
                    }
                    // 若是是管理類配置,interface=*, 則會開啓清理服務功能,注意此類操做會很重,將會消耗很大
                    // 該值會在subscribe()的時候置爲 true
                    // 按文檔說明該操做會在 監控中心執行,而非存在於應用端
                    if (admin) {
                        clean(jedis);
                    }
                    if (!replicate) {
                        break;//  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    // The monitoring center is responsible for deleting outdated dirty data
    private void clean(Jedis jedis) {
        // redis: keys * , 列舉全部相關的key, 根據服務數量來定該值多少
        Set<String> keys = jedis.keys(root + ANY_VALUE);
        if (CollectionUtils.isNotEmpty(keys)) {
            for (String key : keys) {
                // redis: hgetall <key>
                Map<String, String> values = jedis.hgetAll(key);
                if (CollectionUtils.isNotEmptyMap(values)) {
                    boolean delete = false;
                    long now = System.currentTimeMillis();
                    for (Map.Entry<String, String> entry : values.entrySet()) {
                        URL url = URL.valueOf(entry.getKey());
                        // 根據hash中value 指定的時間,斷定是否過時,若是過時則作刪除操做
                        // redis: hdel <key> <field>
                        if (url.getParameter(DYNAMIC_KEY, true)) {
                            long expire = Long.parseLong(entry.getValue());
                            if (expire < now) {
                                jedis.hdel(key, entry.getKey());
                                delete = true;
                                if (logger.isWarnEnabled()) {
                                    logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
                                }
                            }
                        }
                    }
                    // 只要有一個服務被斷定爲過時,則訂閱了該服務的客戶端都應該被通知到
                    // 多個服務下線只會被通知一次
                    if (delete) {
                        jedis.publish(key, UNREGISTER);
                    }
                }
            }
        }
    }

  deferExpired() 的做用,就是維護本實例的全部服務的有效性,作續期做用。兩個重量級操做: 1. 依次延期某service下的全部url的過時時間;2. 作全量清理過時服務url;keys xx* 的操做,也對redis提出了一些要求,由於有些redis出於安全限制可能會禁用keys命令。

 

8. 服務信息變動通知處理notify

  redis註冊中心其實不會主動發現服務變動,只有應用本身發佈regiter或unregister消息後,其餘應用才能感知到變化。前面在 doRegister() 時,我看到,應用是經過hash添加字段註冊本身,並同時發佈 REGISTER 消息通知全部訂閱者。在 doSubscribe() 時開啓另外一個服務線程處理subscribe();

    // org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        String service = toServicePath(url);
        // 訂閱是基於服務處理的,每一個服務一個訂閱處理線程
        Notifier notifier = notifiers.get(service);
        if (notifier == null) {
            Notifier newNotifier = new Notifier(service);
            notifiers.putIfAbsent(service, newNotifier);
            notifier = notifiers.get(service);
            // 此處應爲防止併發所作的努力
            if (notifier == newNotifier) {
                notifier.start();
            }
        }
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 使用 /dubbo/* 表明是管理服務,其須要作清理過時key的做用
                    if (service.endsWith(ANY_VALUE)) {
                        admin = true;
                        ...
                    } else {
                        // 使用 keys xxx/* 命令,列舉出該服務下全部緩存key, 實際上就是 providers, consumers, configurators, routers
                        doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
                    }
                    success = true;
                    break; // Just read one server's data
                }
            } catch (Throwable t) { // Try the next server
                exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }
    // 根據列如上獲得redis-key信息,作服務信息變動
    private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
        if (keys == null || keys.isEmpty()
                || listeners == null || listeners.isEmpty()) {
            return;
        }
        long now = System.currentTimeMillis();
        List<URL> result = new ArrayList<>();
        List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
        String consumerService = url.getServiceInterface();
        for (String key : keys) {
            if (!ANY_VALUE.equals(consumerService)) {
                // 截取出 service
                String providerService = toServiceName(key);
                if (!providerService.equals(consumerService)) {
                    continue;
                }
            }
            String category = toCategoryName(key);
            // consumers應用只會處理, providers,routers,configurators 的服務, 從而忽略 consumers 下的數據
            if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
                continue;
            }
            List<URL> urls = new ArrayList<>();
            // 獲取全部hash值
            Map<String, String> values = jedis.hgetAll(key);
            if (CollectionUtils.isNotEmptyMap(values)) {
                for (Map.Entry<String, String> entry : values.entrySet()) {
                    URL u = URL.valueOf(entry.getKey());
                    // 判斷服務是否過時,過時且存在的服務將不會被利用,但不會作更多處理
                    if (!u.getParameter(DYNAMIC_KEY, true)
                            || Long.parseLong(entry.getValue()) >= now) {
                        if (UrlUtils.isMatch(url, u)) {
                            urls.add(u);
                        }
                    }
                }
            }
            // 若是沒有找到合適的可用服務,則添加一個 empty:// 的地址
            if (urls.isEmpty()) {
                urls.add(URLBuilder.from(url)
                        .setProtocol(EMPTY_PROTOCOL)
                        .setAddress(ANYHOST_VALUE)
                        .setPath(toServiceName(key))
                        .addParameter(CATEGORY_KEY, category)
                        .build());
            }
            result.addAll(urls);
            if (logger.isInfoEnabled()) {
                logger.info("redis notify: " + key + " = " + urls);
            }
        }
        if (CollectionUtils.isEmpty(result)) {
            return;
        }
        // 調用父類 FailbackRegistry.notify 方法,與zk調用一致了
        // 刷新提供者列表,路由,配置等本地緩存信息
        for (NotifyListener listener : listeners) {
            notify(url, listener, result);
        }
    }
    private String toServiceName(String categoryPath) {
        // 截取root+interfaceName
        // 截取 interfaceName
        String servicePath = toServicePath(categoryPath);
        return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
    }
    private String toServicePath(String categoryPath) {
        int i;
        // 排除root路徑,找到第一個'/', 取出servicePath
        if (categoryPath.startsWith(root)) {
            i = categoryPath.indexOf(PATH_SEPARATOR, root.length());
        } else {
            i = categoryPath.indexOf(PATH_SEPARATOR);
        }
        return i > 0 ? categoryPath.substring(0, i) : categoryPath;
    }
    // 另外,對於某個服務發生變動時,須要遍歷全部consumer, 確認是否須要刷新
    // 額,意義嘛,暫是沒太明白
    private void doNotify(Jedis jedis, String key) {
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
            doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
        }
    }

  總結: 

    1. redis 作初次subscribe時,notify會經過redis-keys 命令獲取全部須要的key, 而後依次將其提供者、路由、配置等信息都緩存起來。
    2. 針對每一個服務,都會開啓相關的訂閱線程Notifier處理訂閱工做。
    3. 最終的listener處理默認會由 RegistryDirectory 處理。

  接下來,咱們來看 Notifier 是如何處理訂閱的?

        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#run
        @Override
        public void run() {
            // 每一個訂閱線程,死循環處理只是爲了不網絡等其餘異常狀況出現,以便從新嘗試鏈接redis 訂閱channel
            while (running) {
                try {
                    // 額,這是個優化,我不懂的
                    if (!isSkip()) {
                        try {
                            for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
                                Pool<Jedis> jedisPool = entry.getValue();
                                try {
                                    if (jedisPool.isClosed()) {
                                        continue;
                                    }
                                    jedis = jedisPool.getResource();
                                    if (!jedis.isConnected()) {
                                        continue;
                                    }
                                    try {
                                        if (service.endsWith(ANY_VALUE)) {
                                            if (first) {
                                                first = false;
                                                Set<String> keys = jedis.keys(service);
                                                if (CollectionUtils.isNotEmpty(keys)) {
                                                    for (String s : keys) {
                                                        doNotify(jedis, s);
                                                    }
                                                }
                                                resetSkip();
                                            }
                                            jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
                                        } else {
                                            if (first) {
                                                // 首次處理,通知RegistryDirectory 按service刷新緩存
                                                first = false;
                                                doNotify(jedis, service);
                                                resetSkip();
                                            }
                                            // 使用 psubscribe channel 命令,阻塞監聽channel信息
                                            // 當消息返回時,使用 NotifySub 進行業務處理,實際就是調用 doNotify() 的過程
                                            // 訂閱的channel 爲: /dubbo/xxx.xx.XxxService/*
                                            jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking
                                        }
                                        break;
                                    } finally {
                                        jedis.close();
                                    }
                                } catch (Throwable t) { // Retry another server
                                    logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
                                    // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
                                    sleep(reconnectPeriod);
                                }
                            }
                        } catch (Throwable t) {
                            logger.error(t.getMessage(), t);
                            // 異常發生後,sleep片刻再重試
                            sleep(reconnectPeriod);
                        }
                    }
                } catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                }
            }
        }
        // org.apache.dubbo.registry.redis.RedisRegistry.NotifySub#onMessage
        @Override
        public void onMessage(String key, String msg) {
            if (logger.isInfoEnabled()) {
                logger.info("redis event: " + key + " = " + msg);
            }
            // 只關注 REGISTER / UNREGISTER, 兩個消息
            if (msg.equals(REGISTER)
                    || msg.equals(UNREGISTER)) {
                try {
                    Jedis jedis = jedisPool.getResource();
                    try {
                        // 複用 doNotify
                        doNotify(jedis, key);
                    } finally {
                        jedis.close();
                    }
                } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
                    logger.error(t.getMessage(), t);
                }
            }
        }
        // 最後仍是來看下 isSkip() 的小優化吧
        // 雖然不懂爲何,可是感受很厲害的樣子
        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#isSkip
        private boolean isSkip() {
            // connectSkip: 已經跳過鏈接的總次數, connectSkipped: 當前週期內已跳過鏈接的次數
            // step1. 在connectSkip < 10 狀況下,直接用 connectSkipped 與其比較,connectSkipped<connectSkip, 則繼續跳過本次,不然不跳過,進入鏈接邏輯connectSkipped, connectSkip次數增長
            // step2. connectSkip >= 10, 不可再用其做爲斷定跳過次數, 使用一個10-20間的隨機值,做爲跳過鏈接次數斷定
            // step3. 若是本次斷定爲不跳過,則重置 connectSkipped已鏈接次數自增
            int skip = connectSkip.get(); // Growth of skipping times
            if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
                if (connectRandom == 0) {
                    connectRandom = ThreadLocalRandom.current().nextInt(10);
                }
                skip = 10 + connectRandom;
            }
            if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
                return true;
            }
            connectSkip.incrementAndGet();
            connectSkipped.set(0);
            connectRandom = 0;
            return false;
        }

  監聽服務就作好一件事就行,調用 psubscribe命令訂閱channel, 發生變化時調用 doNotify() 回調listener處理刷新。爲避免異常狀況下訂閱功能仍然成立,使用外部的while循環包裹訂閱邏輯重試。

  注意其訂閱的redis channel 爲 /dubbo/xxx.xx.XxxService/*, 因此至關於其自身的變動也被包含在內了。而是否要處理該事件,則依賴於url中的categorys配置,如消費爲:category=providers,configurators,routers, 即它會處理這三種類型的key變動。

 

9. 一點感想

  dubbo用redis作註冊中心,能夠看做是一個簡單的擴展實現。其核心是基於redis的 pub/sub 能力。

  但和zk比起來,redis功能實現會相對困難些,甚至看起來有些蹩腳(如其redis集羣策略須要自行從外部保證同步,這恐怕不會是件容易的事,現有的主從,集羣方案都徹底沒法cover其場景。既要保證任意寫,又要保證全同步(數據一致性),呵呵)。由於它須要單獨去維護一些心跳、過時類的事務。過多的服務會致使這類工做更加繁重。

  但這也許不能成爲你們拒絕應用的理由,畢竟,按官方說明阿里內部是基於數據庫實現的註冊中心,天然有其道理。

相關文章
相關標籤/搜索