上篇咱們講了Dubbo中有一個很是本質和重要的功能,那就是服務的自動註冊與發現,而這個功能是經過註冊中心來實現的。上篇中使用zookeeper實現了註冊中心的功能,同時了提了dubbo中有其餘許多的註冊中心的實現。git
今天咱們就來看看另外一個註冊中心的實現吧: redis 。redis
dubbo在zk中的服務體現是一個個的文件路徑形式,如 /dubbo/xxx.xx.XxxService/providers/xxx 。 而在redis中,則體現是一個個的緩存key-value。具體分佈以下:數據庫
/dubbo/xxx.xx.XxxService/providers: 以hash類型存放全部提供者列表, 每一個hash的字段爲 url -> expireTime
/dubbo/xxx.xx.XxxService/consumers: 以hash類型存放全部消費者列表, 每一個hash的字段爲 url -> expireTime
/dubbo/xxx.xx.XxxService/configurators: 存放配置信息
/dubbo/xxx.xx.XxxService/routers: 存放路由配置信息apache
如上,一樣,redis也是以service爲粒度進行存儲劃分的。緩存
你可能須要先引入redis註冊依賴包:安全
<dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-registry-redis</artifactId> </dependency>
在配置dubbo服務時,須要將註冊中心換爲 redis, 以下選合適的一個便可:服務器
<dubbo:registry address="redis://127.0.0.1:6379" cluster="failover" /> <dubbo:registry address="redis://10.20.153.10:6379?backup=10.20.153.11:6379,10.20.153.12:6379" cluster="failover" /> <dubbo:registry protocol="redis" address="127.0.0.1:6379" cluster="failover" /> <dubbo:registry protocol="redis" address="10.20.153.10:6379,10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />
cluster 設置 redis 集羣策略,缺省爲 failover:(這個配置不會和集羣容錯配置有誤會麼,尷尬)網絡
failover: 失效轉移策略。只寫入和讀取任意一臺,失敗時重試另外一臺,須要服務器端自行配置數據同步;session
replicate: 複製模式策略。在客戶端同時寫入全部服務器,只讀取單臺,服務器端不須要同步,註冊中心集羣增大,性能壓力也會更大;併發
redis做爲註冊中心與zk做爲註冊的前置操做都是同樣的。都是一是做爲服務提供者時會在 ServiceConfig#doExportUrlsFor1Protocol 中,進行遠程服務暴露時會拉起。二是在消費者在進行遠程調用時會 ReferenceConfig#createProxy 時拉取以便獲取提供者列表。
只是在依賴注入 RegistryFactory 時,根據是 zookeeper/redis, 選擇了不同的 RegistryFactory, 因此建立了不一樣的註冊中心實例。
redis 中根據SPI的配置建立, RedisRegistryFactory 工廠, 配置文件 META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory 的內容以下:
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
/** * Get an instance of registry based on the address of invoker * * @param originInvoker * @return */ protected Registry getRegistry(final Invoker<?> originInvoker) { URL registryUrl = getRegistryUrl(originInvoker); // RegistryFactory 又是經過 SPI 機制生成的 // 會根據具體的註冊中心的類型建立調用具體實例,如此處爲: redis, 因此會調用 RedisRegistryFactory.getRegistry() return registryFactory.getRegistry(registryUrl); } // 全部 RegistryFactory 都會被包裝成 RegistryFactoryWrapper, 以便修飾 // org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry @Override public Registry getRegistry(URL url) { // 對於zk, 會調用 RedisRegistryFactory return new ListenerRegistryWrapper(registryFactory.getRegistry(url), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class) .getActivateExtension(url, "registry.listeners"))); } // org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL) @Override public Registry getRegistry(URL url) { if (destroyed.get()) { LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " + "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of."); return DEFAULT_NOP_REGISTRY; } url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = createRegistryCacheKey(url); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //create registry by spi/ioc // 調用子類方法建立 registry 實例,此處爲 RedisRegistryFactory.createRegistry registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } } // org.apache.dubbo.registry.redis.RedisRegistryFactory#createRegistry @Override protected Registry createRegistry(URL url) { // 最終將redis組件接入到應用中了,後續就可使用redis提供的相應功能了 return new RedisRegistry(url); }
至此,redis被接入了。咱們先來看下 redis 註冊中心構造方法實現:
// org.apache.dubbo.registry.redis.RedisRegistry#RedisRegistry public RedisRegistry(URL url) { // RedisRegistry 與zk同樣,一樣繼承了 FailbackRegistry // 因此,一樣會建立retryTimer, 一樣會建立緩存文件 super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 使用redis鏈接池處理事務 // 設置各配置項 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); config.setTestOnReturn(url.getParameter("test.on.return", false)); config.setTestWhileIdle(url.getParameter("test.while.idle", false)); if (url.getParameter("max.idle", 0) > 0) { config.setMaxIdle(url.getParameter("max.idle", 0)); } if (url.getParameter("min.idle", 0) > 0) { config.setMinIdle(url.getParameter("min.idle", 0)); } if (url.getParameter("max.active", 0) > 0) { config.setMaxTotal(url.getParameter("max.active", 0)); } if (url.getParameter("max.total", 0) > 0) { config.setMaxTotal(url.getParameter("max.total", 0)); } if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) { config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0))); } if (url.getParameter("num.tests.per.eviction.run", 0) > 0) { config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0)); } if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) { config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); } if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) { config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); } // redis 複用了cluster配置項? String cluster = url.getParameter("cluster", "failover"); if (!"failover".equals(cluster) && !"replicate".equals(cluster)) { throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate."); } replicate = "replicate".equals(cluster); List<String> addresses = new ArrayList<>(); addresses.add(url.getAddress()); String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]); if (ArrayUtils.isNotEmpty(backups)) { addresses.addAll(Arrays.asList(backups)); } //得到Redis主節點名稱 String masterName = url.getParameter(REDIS_MASTER_NAME_KEY); if (StringUtils.isEmpty(masterName)) { //單機版redis for (String address : addresses) { int i = address.indexOf(':'); String host; int port; if (i > 0) { host = address.substring(0, i); port = Integer.parseInt(address.substring(i + 1)); } else { host = address; port = DEFAULT_REDIS_PORT; } this.jedisPools.put(address, new JedisPool(config, host, port, url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(), url.getParameter("db.index", 0))); } } else { //哨兵版redis Set<String> sentinelSet = new HashSet<>(addresses); int index = url.getParameter("db.index", 0); int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); String password = StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(); JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinelSet, config, timeout, password, index); this.jedisPools.put(masterName, pool); } this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD); String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; } if (!group.endsWith(PATH_SEPARATOR)) { group = group + PATH_SEPARATOR; } this.root = group; // session=60000, 默認1分鐘過時 this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT); // 使用定時任務刷新存活狀態,至關於心跳維護線程,定時任務頻率爲 session有效其的1/2 this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> { try { deferExpired(); // Extend the expiration time } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t); } }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS); }
RedisRegistry構造方法中,主要完成redis配置信息的轉換接入,建立鏈接池,默認使用0號數據庫。另外,每一個客戶端都是單例的RedisRegistry, 因此也就是說會開啓一個過時掃描定時任務(能夠稱之爲心跳任務)。
與ZK過程相似,服務註冊主要就分兩步:1. 獲取registry實例(經過SPI機制); 2. 將服務的信息註冊到註冊中心。只是zk是路徑,redis是kv.
// org.apache.dubbo.registry.redis.RedisRegistry#doRegister @Override public void doRegister(URL url) { // 與zk一致,按服務組裝key前綴 String key = toCategoryPath(url); // 全服務路徑做爲value String value = url.toFullString(); String expire = String.valueOf(System.currentTimeMillis() + expirePeriod); boolean success = false; RpcException exception = null; for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { Pool<Jedis> jedisPool = entry.getValue(); try { try (Jedis jedis = jedisPool.getResource()) { // 使用hash存儲提供者/消費者 標識,帶過時時間(該時間需後續主動斷定,redis並不維護該狀態) // 註冊好自向標識後,pub一條消息,以便其餘客戶端能夠sub感知到該服務 jedis.hset(key, value, expire); jedis.publish(key, REGISTER); success = true; // 若是不是複製模式的redis 服務(即爲failover模式),只需往一個redis寫數據便可, // 剩餘redis自行同步實際上這裏應該是存在數據一致性問題的 if (!replicate) { break; // If the server side has synchronized data, just write a single machine } } } catch (Throwable t) { exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } // 只要有一個成功,即算成功 if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); } else { throw exception; } } }
以hash類型存放全部提供者列表, key爲服務粒度的前綴信息: /dubbo/xxx.xx.XxxService/providers, hash中每一個field->value表示,服務全路徑信息->過時時間。
經過redis的 pub/sub 機制,通知其餘客戶端變化。註冊時發佈一條消息到提供者路徑, publish <key> register 。
服務註冊的目的,主要是讓註冊中心及其餘應用端能夠發現本身。而服務訂閱則爲了讓本身能夠發現別的系統的變化。如查找全部提供者列表,接收應用上下線通知,開啓監聽等等。
// org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe @Override public void doSubscribe(final URL url, final NotifyListener listener) { String service = toServicePath(url); // 基於service開啓訂閱線程 Notifier notifier = notifiers.get(service); if (notifier == null) { // 主動開啓一個 notifier 線程,進行subscribe處理 // 若是service不少,那就意味着有不少的此類線程,這並非件好事 Notifier newNotifier = new Notifier(service); notifiers.putIfAbsent(service, newNotifier); notifier = notifiers.get(service); if (notifier == newNotifier) { notifier.start(); } } boolean success = false; RpcException exception = null; for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { Pool<Jedis> jedisPool = entry.getValue(); try { try (Jedis jedis = jedisPool.getResource()) { if (service.endsWith(ANY_VALUE)) { admin = true; Set<String> keys = jedis.keys(service); if (CollectionUtils.isNotEmpty(keys)) { Map<String, Set<String>> serviceKeys = new HashMap<>(); for (String key : keys) { String serviceKey = toServicePath(key); Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>()); sk.add(key); } for (Set<String> sk : serviceKeys.values()) { doNotify(jedis, sk, url, Collections.singletonList(listener)); } } } else { // 首次訂閱,使用 keys xx/* 將全部服務信息存儲到本地 doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener)); } success = true; break; // Just read one server's data } } catch (Throwable t) { // Try the next server exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); } else { throw exception; } } }
與zk的直接調用zkClient.addChildListener()實現訂閱不一樣,redis中使用了多個獨立的訂閱線程,使用pub/sub機制進行處理。(因redis的pub/sub是基於channel進行的長鏈接通訊,因此每一個service只能使用單獨的線程,有點傷!)。 使用 doNotify() 將redis中的數據接入應用中。在作訂閱的同時,也拉取了提供者服務列表達到初始化的做用。
當應用要關閉,或者註冊失敗時,須要進行服務下線。固然,若是應用沒有及時作下線處理,zk會經過其自身的臨時節點過時機制,也會將該服務作下線處理。從而避免消費者或管理臺看到無效的服務存在。
應用服務的主動下線操做是由 ShutdownHookCallbacks 和在判斷服務不可用時進行的 invoker.destroy() 來實現優雅下線。
// org.apache.dubbo.registry.integration.RegistryDirectory#destroy @Override public void destroy() { if (isDestroyed()) { return; } // unregister. try { if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) { registry.unregister(getRegisteredConsumerUrl()); } } catch (Throwable t) { logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t); } // unsubscribe. try { if (getConsumerUrl() != null && registry != null && registry.isAvailable()) { registry.unsubscribe(getConsumerUrl(), this); } ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension() .removeListener(ApplicationModel.getApplication(), CONSUMER_CONFIGURATION_LISTENER); } catch (Throwable t) { logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t); } super.destroy(); // must be executed after unsubscribing try { destroyAllInvokers(); } catch (Throwable t) { logger.warn("Failed to destroy service " + serviceKey, t); } } // org.apache.dubbo.registry.support.FailbackRegistry#unregister @Override public void unregister(URL url) { super.unregister(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a cancellation request to the server side doUnregister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedUnregistered(url); } } // org.apache.dubbo.registry.redis.RedisRegistry#doUnregister @Override public void doUnregister(URL url) { String key = toCategoryPath(url); String value = url.toFullString(); RpcException exception = null; boolean success = false; for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { Pool<Jedis> jedisPool = entry.getValue(); try { try (Jedis jedis = jedisPool.getResource()) { // 直接刪除當前服務對應的 key-field 信息 // 而後發佈一條 UNREGISTER 消息,通知其餘客戶端 jedis.hdel(key, value); jedis.publish(key, UNREGISTER); success = true; // 若是redis 是複製模型,須要在每一個redis上都作一次刪除 // 此時各應用端將會重複收到消息,重複處理,看起來並非件好事 if (!replicate) { break; // If the server side has synchronized data, just write a single machine } } } catch (Throwable t) { exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); } else { throw exception; } } }
總結: 下線處理兩步驟: 1. 刪除對應的hash key-field; 2. publish 一個下線消息通知其餘應用; 3. 針對redis的集羣配置決定是刪除1次或n次,且反覆通知操做;
事實上,redis的 doUnsubscribe, 已再也不處理任何事件。
@Override public void doUnsubscribe(URL url, NotifyListener listener) { }
那麼,前面註冊的多個 Notifier 監聽線程就無論了嗎?那確定是不行的,它會在 destroy() 被調用時進行收尾處理。實際上,它是 unregister() 的後續工做。
// org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll /** * Close all created registries */ public static void destroyAll() { if (!destroyed.compareAndSet(false, true)) { return; } if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } // Lock up the registry shutdown process LOCK.lock(); try { for (Registry registry : getRegistries()) { try { registry.destroy(); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } REGISTRIES.clear(); } finally { // Release the lock LOCK.unlock(); } } // org.apache.dubbo.registry.redis.RedisRegistry#destroy @Override public void destroy() { // 該方法甚至能夠去調用 unregister(), unsubscribe() 方法 super.destroy(); try { expireFuture.cancel(true); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { // 遍歷全部 notifiers, 依次調用 shutdown, 即中止訂閱工做 for (Notifier notifier : notifiers.values()) { notifier.shutdown(); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { Pool<Jedis> jedisPool = entry.getValue(); try { jedisPool.destroy(); } catch (Throwable t) { logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); } } // 最後優雅關閉過時掃描定時任務線程池,即 shutdown()..awaitTermination()的應用。 ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod); } // 中止notifier // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#shutdown public void shutdown() { try { // step1. 設置中止標識 // step2. 斷開redis鏈接,這不僅是一斷開的操做,它會中止psubscribe的調用,從而間接停止訂閱線程工做 running = false; jedis.disconnect(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } // 以下方法,便是其父類的 destroy(), 裏面涵蓋了未關閉的 地址信息,則會觸發 unregister, unsubscribe // org.apache.dubbo.registry.support.AbstractRegistry#destroy @Override public void destroy() { if (logger.isInfoEnabled()) { logger.info("Destroy registry:" + getUrl()); } Set<URL> destroyRegistered = new HashSet<>(getRegistered()); // step1. unregister 未下線的服務 if (!destroyRegistered.isEmpty()) { for (URL url : new HashSet<>(getRegistered())) { if (url.getParameter(DYNAMIC_KEY, true)) { try { unregister(url); if (logger.isInfoEnabled()) { logger.info("Destroy unregister url " + url); } } catch (Throwable t) { logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } // step2. unsubscribe 未取消訂閱的服務 Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed()); if (!destroySubscribed.isEmpty()) { for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { try { unsubscribe(url, listener); if (logger.isInfoEnabled()) { logger.info("Destroy unsubscribe url " + url); } } catch (Throwable t) { logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } // step3. 從已註冊列表中刪除當前實例 AbstractRegistryFactory.removeDestroyedRegistry(this); } // org.apache.dubbo.registry.support.AbstractRegistryFactory#removeDestroyedRegistry public static void removeDestroyedRegistry(Registry toRm) { LOCK.lock(); try { REGISTRIES.entrySet().removeIf(entry -> entry.getValue().equals(toRm)); } finally { LOCK.unlock(); } }
總結:此處講了更多unregister,unsubscribe的前置操做。而 notifier.shutdown(); 纔是關閉redis訂閱相關工做的關鍵。它是經過設置中止循環標識,以及關閉redis鏈接實現的。事實上,這各取消訂閱方式並無很優雅。
redis自己只是一個緩存存儲系統,心跳邏輯須要自行實現。實際上,咱們也能夠依賴於redis的自動過時機制,進行心跳續期。那麼,redis註冊中心是否也是這樣實現的呢?好像並非!
// 在 RedisRegistry 的構造方法中,初始化了一個定時任務的調度 this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> { try { deferExpired(); // Extend the expiration time } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t); } }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS); // org.apache.dubbo.registry.redis.RedisRegistry#deferExpired private void deferExpired() { for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { Pool<Jedis> jedisPool = entry.getValue(); try { try (Jedis jedis = jedisPool.getResource()) { // 取出全部註冊了的服務,進行心跳更新 for (URL url : new HashSet<>(getRegistered())) { if (url.getParameter(DYNAMIC_KEY, true)) { String key = toCategoryPath(url); // 增長過時時間+expirePeriod, url -> expireAt if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { // 若是是第一次新增該值,或者從新新增該值(可能因爲原來的地址過時被刪除),則觸發一次regiter的消息發佈,自會有相應訂閱者處理該變動 jedis.publish(key, REGISTER); } } } // 若是是管理類配置,interface=*, 則會開啓清理服務功能,注意此類操做會很重,將會消耗很大 // 該值會在subscribe()的時候置爲 true // 按文檔說明該操做會在 監控中心執行,而非存在於應用端 if (admin) { clean(jedis); } if (!replicate) { break;// If the server side has synchronized data, just write a single machine } } } catch (Throwable t) { logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); } } } // The monitoring center is responsible for deleting outdated dirty data private void clean(Jedis jedis) { // redis: keys * , 列舉全部相關的key, 根據服務數量來定該值多少 Set<String> keys = jedis.keys(root + ANY_VALUE); if (CollectionUtils.isNotEmpty(keys)) { for (String key : keys) { // redis: hgetall <key> Map<String, String> values = jedis.hgetAll(key); if (CollectionUtils.isNotEmptyMap(values)) { boolean delete = false; long now = System.currentTimeMillis(); for (Map.Entry<String, String> entry : values.entrySet()) { URL url = URL.valueOf(entry.getKey()); // 根據hash中value 指定的時間,斷定是否過時,若是過時則作刪除操做 // redis: hdel <key> <field> if (url.getParameter(DYNAMIC_KEY, true)) { long expire = Long.parseLong(entry.getValue()); if (expire < now) { jedis.hdel(key, entry.getKey()); delete = true; if (logger.isWarnEnabled()) { logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now)); } } } } // 只要有一個服務被斷定爲過時,則訂閱了該服務的客戶端都應該被通知到 // 多個服務下線只會被通知一次 if (delete) { jedis.publish(key, UNREGISTER); } } } } }
deferExpired() 的做用,就是維護本實例的全部服務的有效性,作續期做用。兩個重量級操做: 1. 依次延期某service下的全部url的過時時間;2. 作全量清理過時服務url;keys xx* 的操做,也對redis提出了一些要求,由於有些redis出於安全限制可能會禁用keys命令。
redis註冊中心其實不會主動發現服務變動,只有應用本身發佈regiter或unregister消息後,其餘應用才能感知到變化。前面在 doRegister() 時,我看到,應用是經過hash添加字段註冊本身,並同時發佈 REGISTER 消息通知全部訂閱者。在 doSubscribe() 時開啓另外一個服務線程處理subscribe();
// org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe @Override public void doSubscribe(final URL url, final NotifyListener listener) { String service = toServicePath(url); // 訂閱是基於服務處理的,每一個服務一個訂閱處理線程 Notifier notifier = notifiers.get(service); if (notifier == null) { Notifier newNotifier = new Notifier(service); notifiers.putIfAbsent(service, newNotifier); notifier = notifiers.get(service); // 此處應爲防止併發所作的努力 if (notifier == newNotifier) { notifier.start(); } } boolean success = false; RpcException exception = null; for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { Pool<Jedis> jedisPool = entry.getValue(); try { try (Jedis jedis = jedisPool.getResource()) { // 使用 /dubbo/* 表明是管理服務,其須要作清理過時key的做用 if (service.endsWith(ANY_VALUE)) { admin = true; ... } else { // 使用 keys xxx/* 命令,列舉出該服務下全部緩存key, 實際上就是 providers, consumers, configurators, routers doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener)); } success = true; break; // Just read one server's data } } catch (Throwable t) { // Try the next server exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); } else { throw exception; } } } // 根據列如上獲得redis-key信息,作服務信息變動 private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) { if (keys == null || keys.isEmpty() || listeners == null || listeners.isEmpty()) { return; } long now = System.currentTimeMillis(); List<URL> result = new ArrayList<>(); List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0])); String consumerService = url.getServiceInterface(); for (String key : keys) { if (!ANY_VALUE.equals(consumerService)) { // 截取出 service String providerService = toServiceName(key); if (!providerService.equals(consumerService)) { continue; } } String category = toCategoryName(key); // consumers應用只會處理, providers,routers,configurators 的服務, 從而忽略 consumers 下的數據 if (!categories.contains(ANY_VALUE) && !categories.contains(category)) { continue; } List<URL> urls = new ArrayList<>(); // 獲取全部hash值 Map<String, String> values = jedis.hgetAll(key); if (CollectionUtils.isNotEmptyMap(values)) { for (Map.Entry<String, String> entry : values.entrySet()) { URL u = URL.valueOf(entry.getKey()); // 判斷服務是否過時,過時且存在的服務將不會被利用,但不會作更多處理 if (!u.getParameter(DYNAMIC_KEY, true) || Long.parseLong(entry.getValue()) >= now) { if (UrlUtils.isMatch(url, u)) { urls.add(u); } } } } // 若是沒有找到合適的可用服務,則添加一個 empty:// 的地址 if (urls.isEmpty()) { urls.add(URLBuilder.from(url) .setProtocol(EMPTY_PROTOCOL) .setAddress(ANYHOST_VALUE) .setPath(toServiceName(key)) .addParameter(CATEGORY_KEY, category) .build()); } result.addAll(urls); if (logger.isInfoEnabled()) { logger.info("redis notify: " + key + " = " + urls); } } if (CollectionUtils.isEmpty(result)) { return; } // 調用父類 FailbackRegistry.notify 方法,與zk調用一致了 // 刷新提供者列表,路由,配置等本地緩存信息 for (NotifyListener listener : listeners) { notify(url, listener, result); } } private String toServiceName(String categoryPath) { // 截取root+interfaceName // 截取 interfaceName String servicePath = toServicePath(categoryPath); return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath; } private String toServicePath(String categoryPath) { int i; // 排除root路徑,找到第一個'/', 取出servicePath if (categoryPath.startsWith(root)) { i = categoryPath.indexOf(PATH_SEPARATOR, root.length()); } else { i = categoryPath.indexOf(PATH_SEPARATOR); } return i > 0 ? categoryPath.substring(0, i) : categoryPath; } // 另外,對於某個服務發生變動時,須要遍歷全部consumer, 確認是否須要刷新 // 額,意義嘛,暫是沒太明白 private void doNotify(Jedis jedis, String key) { for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) { doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue())); } }
總結:
1. redis 作初次subscribe時,notify會經過redis-keys 命令獲取全部須要的key, 而後依次將其提供者、路由、配置等信息都緩存起來。
2. 針對每一個服務,都會開啓相關的訂閱線程Notifier處理訂閱工做。
3. 最終的listener處理默認會由 RegistryDirectory 處理。
接下來,咱們來看 Notifier 是如何處理訂閱的?
// org.apache.dubbo.registry.redis.RedisRegistry.Notifier#run @Override public void run() { // 每一個訂閱線程,死循環處理只是爲了不網絡等其餘異常狀況出現,以便從新嘗試鏈接redis 訂閱channel while (running) { try { // 額,這是個優化,我不懂的 if (!isSkip()) { try { for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { Pool<Jedis> jedisPool = entry.getValue(); try { if (jedisPool.isClosed()) { continue; } jedis = jedisPool.getResource(); if (!jedis.isConnected()) { continue; } try { if (service.endsWith(ANY_VALUE)) { if (first) { first = false; Set<String> keys = jedis.keys(service); if (CollectionUtils.isNotEmpty(keys)) { for (String s : keys) { doNotify(jedis, s); } } resetSkip(); } jedis.psubscribe(new NotifySub(jedisPool), service); // blocking } else { if (first) { // 首次處理,通知RegistryDirectory 按service刷新緩存 first = false; doNotify(jedis, service); resetSkip(); } // 使用 psubscribe channel 命令,阻塞監聽channel信息 // 當消息返回時,使用 NotifySub 進行業務處理,實際就是調用 doNotify() 的過程 // 訂閱的channel 爲: /dubbo/xxx.xx.XxxService/* jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking } break; } finally { jedis.close(); } } catch (Throwable t) { // Retry another server logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources sleep(reconnectPeriod); } } } catch (Throwable t) { logger.error(t.getMessage(), t); // 異常發生後,sleep片刻再重試 sleep(reconnectPeriod); } } } catch (Throwable t) { logger.error(t.getMessage(), t); } } } // org.apache.dubbo.registry.redis.RedisRegistry.NotifySub#onMessage @Override public void onMessage(String key, String msg) { if (logger.isInfoEnabled()) { logger.info("redis event: " + key + " = " + msg); } // 只關注 REGISTER / UNREGISTER, 兩個消息 if (msg.equals(REGISTER) || msg.equals(UNREGISTER)) { try { Jedis jedis = jedisPool.getResource(); try { // 複用 doNotify doNotify(jedis, key); } finally { jedis.close(); } } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee logger.error(t.getMessage(), t); } } } // 最後仍是來看下 isSkip() 的小優化吧 // 雖然不懂爲何,可是感受很厲害的樣子 // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#isSkip private boolean isSkip() { // connectSkip: 已經跳過鏈接的總次數, connectSkipped: 當前週期內已跳過鏈接的次數 // step1. 在connectSkip < 10 狀況下,直接用 connectSkipped 與其比較,connectSkipped<connectSkip, 則繼續跳過本次,不然不跳過,進入鏈接邏輯connectSkipped, connectSkip次數增長 // step2. connectSkip >= 10, 不可再用其做爲斷定跳過次數, 使用一個10-20間的隨機值,做爲跳過鏈接次數斷定 // step3. 若是本次斷定爲不跳過,則重置 connectSkipped已鏈接次數自增 int skip = connectSkip.get(); // Growth of skipping times if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number if (connectRandom == 0) { connectRandom = ThreadLocalRandom.current().nextInt(10); } skip = 10 + connectRandom; } if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times return true; } connectSkip.incrementAndGet(); connectSkipped.set(0); connectRandom = 0; return false; }
監聽服務就作好一件事就行,調用 psubscribe命令訂閱channel, 發生變化時調用 doNotify() 回調listener處理刷新。爲避免異常狀況下訂閱功能仍然成立,使用外部的while循環包裹訂閱邏輯重試。
注意其訂閱的redis channel 爲 /dubbo/xxx.xx.XxxService/*, 因此至關於其自身的變動也被包含在內了。而是否要處理該事件,則依賴於url中的categorys配置,如消費爲:category=providers,configurators,routers, 即它會處理這三種類型的key變動。
dubbo用redis作註冊中心,能夠看做是一個簡單的擴展實現。其核心是基於redis的 pub/sub 能力。
但和zk比起來,redis功能實現會相對困難些,甚至看起來有些蹩腳(如其redis集羣策略須要自行從外部保證同步,這恐怕不會是件容易的事,現有的主從,集羣方案都徹底沒法cover其場景。既要保證任意寫,又要保證全同步(數據一致性),呵呵)。由於它須要單獨去維護一些心跳、過時類的事務。過多的服務會致使這類工做更加繁重。
但這也許不能成爲你們拒絕應用的理由,畢竟,按官方說明阿里內部是基於數據庫實現的註冊中心,天然有其道理。