目標:解釋覺得redis實現的註冊中心原理,解讀duubo-registry-redis的源碼
Redis是一個key-value存儲系統,交換數據很是快,redis之內存做爲數據存儲的介質,因此讀寫數據的效率極高,遠遠超過數據庫。redis支持豐富的數據類型,dubbo就利用了redis的value支持map的數據類型。redis的key爲服務名稱和服務的類型。map中的key爲URL地址,map中的value爲過時時間,用於判斷髒數據,髒數據由監控中心刪除。java
dubbo利用JRedis來鏈接到Redis分佈式哈希鍵-值數據庫,由於Jedis實例不是線程安全的,因此不能夠多個線程共用一個Jedis實例,可是建立太多的實現也很差由於這意味着會創建不少sokcet鏈接。 因此dubbo又用了JedisPool,JedisPool是一個線程安全的網絡鏈接池。能夠用JedisPool建立一些可靠Jedis實例,能夠從池中獲取Jedis實例,使用完後再把Jedis實例還回JedisPool。這種方式能夠避免建立大量socket鏈接而且會實現高效的性能。git
上述稍微介紹了dubbo用redis實現註冊中心的依賴,接下來讓咱們來看看具體的實現邏輯。下圖是包的結構:github
包結構很是相似。接下來咱們就來解讀一下這兩個類。redis
該類繼承了FailbackRegistry類,該類就是針對註冊中心核心的功能註冊、訂閱、取消註冊、取消訂閱,查詢註冊列表進行展開,基於redis來實現。數據庫
// 日誌記錄 private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class); // 默認的redis鏈接端口 private static final int DEFAULT_REDIS_PORT = 6379; // 默認 Redis 根節點,涉及到的是dubbo的分組配置 private final static String DEFAULT_ROOT = "dubbo"; // 任務調度器 private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true)); //Redis Key 過時機制執行器 private final ScheduledFuture<?> expireFuture; // Redis 根節點 private final String root; // JedisPool集合,map 的key爲 "ip:port"的形式 private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<String, JedisPool>(); // 通知器集合,key爲 Root + Service的形式 // 例如 /dubbo/com.alibaba.dubbo.demo.DemoService private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>(); // 重連時間間隔,單位:ms private final int reconnectPeriod; // 過時週期,單位:ms private final int expirePeriod; // 是否經過監控中心,用於判斷髒數據,髒數據由監控中心刪除 private volatile boolean admin = false; // 是否複製模式 private boolean replicate;
能夠從屬性中看到基於redis的註冊中心能夠被監控中心監控,而且對過時的節點有清理的機制。segmentfault
public RedisRegistry(URL url) { super(url); // 判斷地址是否爲空 if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 實例化對象池 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // 若是 testOnBorrow 被設置,pool 會在 borrowObject 返回對象以前使用 PoolableObjectFactory的 validateObject 來驗證這個對象是否有效 // 要是對象沒經過驗證,這個對象會被丟棄,而後從新選擇一個新的對象。 config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); // 若是 testOnReturn 被設置, pool 會在 returnObject 的時候經過 PoolableObjectFactory 的validateObject 方法驗證對象 // 若是對象沒經過驗證,對象會被丟棄,不會被放到池中。 config.setTestOnReturn(url.getParameter("test.on.return", false)); // 指定空閒對象是否應該使用 PoolableObjectFactory 的 validateObject 校驗,若是校驗失敗,這個對象會從對象池中被清除。 // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0) 的時候纔會生效。 config.setTestWhileIdle(url.getParameter("test.while.idle", false)); if (url.getParameter("max.idle", 0) > 0) // 控制一個pool最多有多少個狀態爲空閒的jedis實例。 config.setMaxIdle(url.getParameter("max.idle", 0)); if (url.getParameter("min.idle", 0) > 0) // 控制一個pool最少有多少個狀態爲空閒的jedis實例。 config.setMinIdle(url.getParameter("min.idle", 0)); if (url.getParameter("max.active", 0) > 0) // 控制一個pool最多有多少個jedis實例。 config.setMaxTotal(url.getParameter("max.active", 0)); if (url.getParameter("max.total", 0) > 0) config.setMaxTotal(url.getParameter("max.total", 0)); if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) //表示當引入一個jedis實例時,最大的等待時間,若是超過等待時間,則直接拋出JedisConnectionException; config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0))); if (url.getParameter("num.tests.per.eviction.run", 0) > 0) // 設置驅逐線程每次檢測對象的數量。這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候纔會生效。 config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0)); if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) // 指定驅逐線程的休眠時間。若是這個值不是正數( >0),不會有驅逐線程運行。 config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) // 指定最小的空閒驅逐的時間間隔(空閒超過指定的時間的對象,會被清除掉)。 // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候纔會生效。 config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); // 獲取url中的集羣配置 String cluster = url.getParameter("cluster", "failover"); if (!"failover".equals(cluster) && !"replicate".equals(cluster)) { throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate."); } // 設置是否爲複製模式 replicate = "replicate".equals(cluster); List<String> addresses = new ArrayList<String>(); addresses.add(url.getAddress()); // 備用地址 String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]); if (backups != null && backups.length > 0) { addresses.addAll(Arrays.asList(backups)); } for (String address : addresses) { int i = address.indexOf(':'); String host; int port; // 分割地址和端口號 if (i > 0) { host = address.substring(0, i); port = Integer.parseInt(address.substring(i + 1)); } else { // 沒有端口的設置默認端口 host = address; port = DEFAULT_REDIS_PORT; } // 建立鏈接池並加入集合 this.jedisPools.put(address, new JedisPool(config, host, port, url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(), url.getParameter("db.index", 0))); } // 設置url攜帶的鏈接超時時間,若是沒有配置,則設置默認爲3s this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD); // 獲取url中的分組配置,若是沒有配置,則默認爲dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } if (!group.endsWith(Constants.PATH_SEPARATOR)) { group = group + Constants.PATH_SEPARATOR; } // 設置redis 的根節點 this.root = group; // 獲取過時週期配置,若是沒有,則默認爲60s this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT); // 建立過時機制執行器 this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { // 延長到期時間 deferExpired(); // Extend the expiration time } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t); } } }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS); }
構造方法首先是調用了父類的構造函數,而後是對對象池的一些配置進行了初始化,具體的我已經在註釋中寫明。在構造方法中還作了鏈接池的建立、過時機制執行器的建立,其中過時會進行延長到期時間的操做具體是在deferExpired方法中實現。還有一個關注點事該執行器的時間是取週期的一半。安全
private void deferExpired() { for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { // 得到鏈接池中的Jedis實例 Jedis jedis = jedisPool.getResource(); try { // 遍歷已經註冊的服務url集合 for (URL url : new HashSet<URL>(getRegistered())) { // 若是是非動態管理模式 if (url.getParameter(Constants.DYNAMIC_KEY, true)) { // 得到分類路徑 String key = toCategoryPath(url); // 以hash 散列表的形式存儲 if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { // 發佈 Redis 註冊事件 jedis.publish(key, Constants.REGISTER); } } } // 若是經過監控中心 if (admin) { // 刪除過期的髒數據 clean(jedis); } // 若是服務器端已同步數據,只需寫入單臺機器 if (!replicate) { break;// If the server side has synchronized data, just write a single machine } } finally { jedis.close(); } } catch (Throwable t) { logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); } } }
該方法實現了延長到期時間的邏輯,遍歷了已經註冊的服務url,這裏會有一個是否爲非動態管理模式的判斷,也就是判斷該節點是否爲動態節點,只有動態節點是須要延長過時時間,由於動態節點須要人工刪除節點。延長過時時間就是從新註冊一次。而其餘的節點則會被監控中心清除,也就是調用了clean方法。clean方法下面會講到。服務器
// The monitoring center is responsible for deleting outdated dirty data private void clean(Jedis jedis) { // 得到全部的服務 Set<String> keys = jedis.keys(root + Constants.ANY_VALUE); if (keys != null && !keys.isEmpty()) { // 遍歷全部的服務 for (String key : keys) { // 返回hash表key對應的全部域和值 // redis的key爲服務名稱和服務的類型。map中的key爲URL地址,map中的value爲過時時間,用於判斷髒數據,髒數據由監控中心刪除 Map<String, String> values = jedis.hgetAll(key); if (values != null && values.size() > 0) { boolean delete = false; long now = System.currentTimeMillis(); for (Map.Entry<String, String> entry : values.entrySet()) { URL url = URL.valueOf(entry.getKey()); // 是否爲動態節點 if (url.getParameter(Constants.DYNAMIC_KEY, true)) { long expire = Long.parseLong(entry.getValue()); // 判斷是否過時 if (expire < now) { // 刪除記錄 jedis.hdel(key, entry.getKey()); delete = true; if (logger.isWarnEnabled()) { logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now)); } } } } // 取消註冊 if (delete) { jedis.publish(key, Constants.UNREGISTER); } } } } }
該方法就是用來清理過時數據的,以前我提到過dubbo在redis存儲數據的數據結構形式,就是redis的key爲服務名稱和服務的類型。map中的key爲URL地址,map中的value爲過時時間,用於判斷髒數據,髒數據由監控中心刪除,那麼判斷過時就是經過map中的value來判別。邏輯就是在redis中先把記錄刪除,而後在取消訂閱。微信
@Override public boolean isAvailable() { // 遍歷鏈接池集合 for (JedisPool jedisPool : jedisPools.values()) { try { // 從鏈接池中得到jedis實例 Jedis jedis = jedisPool.getResource(); try { // 判斷是否有redis服務器被鏈接着 // 只要有一臺鏈接,則算註冊中心可用 if (jedis.isConnected()) { return true; // At least one single machine is available. } } finally { jedis.close(); } } catch (Throwable t) { } } return false; }
該方法是判斷註冊中心是否可用,經過redis是否鏈接來判斷,只要有一臺redis可鏈接,就算註冊中心可用。網絡
@Override public void destroy() { super.destroy(); try { // 關閉過時執行器 expireFuture.cancel(true); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { // 關閉通知器 for (Notifier notifier : notifiers.values()) { notifier.shutdown(); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { // 銷燬鏈接池 jedisPool.destroy(); } catch (Throwable t) { logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); } } // 關閉任務調度器 ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod); }
這是銷燬的方法,邏輯很清晰,gracefulShutdown方法在《dubbo源碼解析(四)註冊中心——dubbo》中已經講到。
@Override public void doRegister(URL url) { // 得到分類路徑 String key = toCategoryPath(url); // 得到URL字符串做爲 Value String value = url.toFullString(); // 計算過時時間 String expire = String.valueOf(System.currentTimeMillis() + expirePeriod); boolean success = false; RpcException exception = null; // 遍歷鏈接池集合 for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { Jedis jedis = jedisPool.getResource(); try { // 寫入 Redis Map 鍵 jedis.hset(key, value, expire); // 發佈 Redis 註冊事件 // 這樣訂閱該 Key 的服務消費者和監控中心,就會實時從 Redis 讀取該服務的最新數據。 jedis.publish(key, Constants.REGISTER); success = true; // 若是服務器端已同步數據,只需寫入單臺機器 if (!replicate) { break; // If the server side has synchronized data, just write a single machine } } finally { jedis.close(); } } catch (Throwable t) { exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); } else { throw exception; } } }
該方法是實現了父類FailbackRegistry的抽象方法,主要是實現了註冊的功能,具體的邏輯是先將須要註冊的服務信息保存到redis中,而後發佈redis註冊事件。
@Override public void doUnregister(URL url) { // 得到分類路徑 String key = toCategoryPath(url); // 得到URL字符串做爲 Value String value = url.toFullString(); RpcException exception = null; boolean success = false; for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { Jedis jedis = jedisPool.getResource(); try { // 刪除redis中的記錄 jedis.hdel(key, value); // 發佈redis取消註冊事件 jedis.publish(key, Constants.UNREGISTER); success = true; // 若是服務器端已同步數據,只需寫入單臺機器 if (!replicate) { break; // If the server side has synchronized data, just write a single machine } } finally { jedis.close(); } } catch (Throwable t) { exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); } else { throw exception; } } }
該方法也是實現了父類的抽象方法,當服務消費者或者提供者關閉時,會調用該方法來取消註冊。邏輯就是跟註冊方法方法,先從redis中刪除服務相關記錄,而後發佈取消註冊的事件,從而實時通知訂閱者們。
@Override public void doSubscribe(final URL url, final NotifyListener listener) { // 返回服務地址 String service = toServicePath(url); // 得到通知器 Notifier notifier = notifiers.get(service); // 若是沒有該服務的通知器,則建立一個 if (notifier == null) { Notifier newNotifier = new Notifier(service); notifiers.putIfAbsent(service, newNotifier); notifier = notifiers.get(service); // 保證併發狀況下,有且只有一個通知器啓動 if (notifier == newNotifier) { notifier.start(); } } boolean success = false; RpcException exception = null; // 遍歷鏈接池集合進行訂閱,直到有一個訂閱成功,僅僅向一個redis進行訂閱 for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { Jedis jedis = jedisPool.getResource(); try { // 若是服務地址爲*結尾,也就是處理全部的服務層發起的訂閱 if (service.endsWith(Constants.ANY_VALUE)) { admin = true; // 得到分類層的集合 例如:/dubbo/com.alibaba.dubbo.demo.DemoService/providers Set<String> keys = jedis.keys(service); if (keys != null && !keys.isEmpty()) { // 按照服務聚合url Map<String, Set<String>> serviceKeys = new HashMap<String, Set<String>>(); for (String key : keys) { // 得到服務路徑,截掉多餘部分 String serviceKey = toServicePath(key); Set<String> sk = serviceKeys.get(serviceKey); if (sk == null) { sk = new HashSet<String>(); serviceKeys.put(serviceKey, sk); } sk.add(key); } // 按照每一個服務層進行發起通知,由於服務地址爲*結尾 for (Set<String> sk : serviceKeys.values()) { doNotify(jedis, sk, url, Arrays.asList(listener)); } } } else { // 處理指定的服務層發起的通知 doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Arrays.asList(listener)); } // 只在一個redis上進行訂閱 success = true; break; // Just read one server's data } finally { jedis.close(); } } catch (Throwable t) { // Try the next server exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); } } if (exception != null) { // 雖然發生異常,但結果仍然成功 if (success) { logger.warn(exception.getMessage(), exception); } else { throw exception; } } }
該方法是實現了訂閱的功能。注意如下幾個點:
@Override public void doUnsubscribe(URL url, NotifyListener listener) { }
該方法原本是取消訂閱的實現,不過dubbo中並未實現該邏輯。
private void doNotify(Jedis jedis, String key) { // 遍歷全部的通知器,調用重載方法今天通知 for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(getSubscribed()).entrySet()) { doNotify(jedis, Arrays.asList(key), entry.getKey(), new HashSet<NotifyListener>(entry.getValue())); } } private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) { if (keys == null || keys.isEmpty() || listeners == null || listeners.isEmpty()) { return; } long now = System.currentTimeMillis(); List<URL> result = new ArrayList<URL>(); // 得到分類集合 List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0])); // 經過url得到服務接口 String consumerService = url.getServiceInterface(); // 遍歷分類路徑,例如/dubbo/com.alibaba.dubbo.demo.DemoService/providers for (String key : keys) { // 判斷服務是否匹配 if (!Constants.ANY_VALUE.equals(consumerService)) { String prvoiderService = toServiceName(key); if (!prvoiderService.equals(consumerService)) { continue; } } // 從分類路徑上得到分類名 String category = toCategoryName(key); // 判斷訂閱的分類是否包含該分類 if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) { continue; } List<URL> urls = new ArrayList<URL>(); // 返回全部的URL集合 Map<String, String> values = jedis.hgetAll(key); if (values != null && values.size() > 0) { for (Map.Entry<String, String> entry : values.entrySet()) { URL u = URL.valueOf(entry.getKey()); // 判斷是否爲動態節點,由於動態節點不受過時限制。而且判斷是否過時 if (!u.getParameter(Constants.DYNAMIC_KEY, true) || Long.parseLong(entry.getValue()) >= now) { // 判斷url是否合法 if (UrlUtils.isMatch(url, u)) { urls.add(u); } } } } // 若不存在匹配的url,則建立 `empty://` 的 URL返回,用於清空該服務的該分類。 if (urls.isEmpty()) { urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL) .setAddress(Constants.ANYHOST_VALUE) .setPath(toServiceName(key)) .addParameter(Constants.CATEGORY_KEY, category)); } result.addAll(urls); if (logger.isInfoEnabled()) { logger.info("redis notify: " + key + " = " + urls); } } if (result == null || result.isEmpty()) { return; } // 所有數據完成後,調用通知方法,來通知監聽器 for (NotifyListener listener : listeners) { notify(url, listener, result); } }
該方法實現了通知的邏輯,有兩個重載方法,第二個比第一個多了幾個參數,其實惟一的區別就是第一個重載方法是通知了全部的監聽器,內部邏輯中調用了getSubscribed方法獲取全部的監聽器,該方法的解釋能夠查看《dubbo源碼解析(三)註冊中心——開篇》中關於subscribed屬性的解釋。而第二個重載方法就是對一個指定的監聽器進行通知。
具體的邏輯在第二個重載的方法中,其中有如下幾個須要注意的點:
private String toServiceName(String categoryPath) { String servicePath = toServicePath(categoryPath); return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath; }
該方法很簡單,就是從服務路徑上得到服務名,這裏就很少作解釋了。
private String toCategoryName(String categoryPath) { int i = categoryPath.lastIndexOf(Constants.PATH_SEPARATOR); return i > 0 ? categoryPath.substring(i + 1) : categoryPath; }
該方法的做用是從分類路徑上得到分類名。
private String toServicePath(String categoryPath) { int i; if (categoryPath.startsWith(root)) { i = categoryPath.indexOf(Constants.PATH_SEPARATOR, root.length()); } else { i = categoryPath.indexOf(Constants.PATH_SEPARATOR); } return i > 0 ? categoryPath.substring(0, i) : categoryPath; } private String toServicePath(URL url) { return root + url.getServiceInterface(); }
這兩個方法都是得到服務地址,第一個方法主要是截掉多餘的部分,第二個方法主要是從url配置中獲取關於服務地址的值跟根節點拼接。
private String toCategoryPath(URL url) { return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); }
該方法是得到分類路徑,格式是Root + Service + Type。
private class NotifySub extends JedisPubSub { private final JedisPool jedisPool; public NotifySub(JedisPool jedisPool) { this.jedisPool = jedisPool; } @Override public void onMessage(String key, String msg) { if (logger.isInfoEnabled()) { logger.info("redis event: " + key + " = " + msg); } // 若是是註冊事件或者取消註冊事件 if (msg.equals(Constants.REGISTER) || msg.equals(Constants.UNREGISTER)) { try { Jedis jedis = jedisPool.getResource(); try { // 通知監聽器 doNotify(jedis, key); } finally { jedis.close(); } } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee logger.error(t.getMessage(), t); } } } @Override public void onPMessage(String pattern, String key, String msg) { onMessage(key, msg); } @Override public void onSubscribe(String key, int num) { } @Override public void onPSubscribe(String pattern, int num) { } @Override public void onUnsubscribe(String key, int num) { } @Override public void onPUnsubscribe(String pattern, int num) { } }
NotifySub是RedisRegistry的一個內部類,繼承了JedisPubSub類,JedisPubSub類中定義了publish/subsribe的回調方法。經過繼承JedisPubSub類並從新實現這些回調方法,當publish/subsribe事件發生時,咱們能夠定製本身的處理邏輯。這裏實現了onMessage和onPMessage兩個方法,當收到註冊和取消註冊的事件的時候通知相關的監聽器數據變化,從而實現實時更新數據。
該類繼承 Thread 類,負責向 Redis 發起訂閱邏輯。
// 服務名:Root + Service private final String service; // 須要忽略鏈接的次數 private final AtomicInteger connectSkip = new AtomicInteger(); // 已經忽略鏈接的次數 private final AtomicInteger connectSkiped = new AtomicInteger(); // 隨機數 private final Random random = new Random(); // jedis實例 private volatile Jedis jedis; // 是不是首次通知 private volatile boolean first = true; // 是否運行中 private volatile boolean running = true; // 鏈接次數隨機數 private volatile int connectRandom;
上述屬性中,部分屬性都是爲了redis的重連策略,用於在和redis斷開連接時,忽略必定的次數和redis的鏈接,避免空跑。
private void resetSkip() { connectSkip.set(0); connectSkiped.set(0); connectRandom = 0; }
該方法就是重置忽略鏈接的信息。
private boolean isSkip() { // 得到忽略次數 int skip = connectSkip.get(); // Growth of skipping times // 若是忽略次數超過10次,那麼取隨機數,加上一個10之內的隨機數 // 鏈接失敗的次數越多,每一輪加大須要忽略的總次數,而且帶有必定的隨機性。 if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number if (connectRandom == 0) { connectRandom = random.nextInt(10); } skip = 10 + connectRandom; } // 自增忽略次數。若忽略次數不夠,則繼續忽略。 if (connectSkiped.getAndIncrement() < skip) { // Check the number of skipping times return true; } // 增長鬚要忽略的次數 connectSkip.incrementAndGet(); // 重置已忽略次數和隨機數 connectSkiped.set(0); connectRandom = 0; return false; }
該方法是用來判斷忽略本次對redis的鏈接。首先得到須要忽略的次數,若是忽略次數不小於10次,則加上一個10之內的隨機數,而後判斷自增的忽略次數,若是次數不夠,則繼續忽略,若是次數夠了,增長鬚要忽略的次數,重置已經忽略的次數和隨機數。主要的思想是鏈接失敗的次數越多,每一輪加大須要忽略的總次數,而且帶有必定的隨機性。
@Override public void run() { // 當通知器正在運行中時 while (running) { try { // 若是不忽略鏈接 if (!isSkip()) { try { for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { jedis = jedisPool.getResource(); try { // 是否爲監控中心 if (service.endsWith(Constants.ANY_VALUE)) { // 若是不是第一次通知 if (!first) { first = false; Set<String> keys = jedis.keys(service); if (keys != null && !keys.isEmpty()) { for (String s : keys) { // 通知 doNotify(jedis, s); } } // 重置 resetSkip(); } // 批准訂閱 jedis.psubscribe(new NotifySub(jedisPool), service); // blocking } else { // 若是不是監控中心,而且不是第一次通知 if (!first) { first = false; // 單獨通知一個服務 doNotify(jedis, service); // 重置 resetSkip(); } // 批准訂閱 jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking } break; } finally { jedis.close(); } } catch (Throwable t) { // Retry another server logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources // 發生異常,說明 Redis 鏈接斷開了,須要等待reconnectPeriod時間 //經過這樣的方式,避免執行,佔用大量的 CPU 資源。 sleep(reconnectPeriod); } } } catch (Throwable t) { logger.error(t.getMessage(), t); sleep(reconnectPeriod); } } } catch (Throwable t) { logger.error(t.getMessage(), t); } } }
該方法是線程的run方法,應該很熟悉,其中作了相關訂閱的邏輯,其中根據redis的重連策略作了一些忽略鏈接的策略,也就是調用了上述講解的isSkip方法,訂閱就是調用了jedis.psubscribe方法,它是訂閱給定模式相匹配的全部頻道。
public void shutdown() { try { // 更改狀態 running = false; // jedis斷開鏈接 jedis.disconnect(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
該方法是斷開鏈接的方法。
該類繼承了AbstractRegistryFactory類,實現了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原代碼:
public class RedisRegistryFactory extends AbstractRegistryFactory { @Override protected Registry createRegistry(URL url) { return new RedisRegistry(url); } }
能夠看到就是實例化了RedisRegistry而已,全部這裏就不解釋了。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了dubbo利用redis來實現註冊中心,其中關鍵的是須要弄明白dubbo在redis中存儲的數據結構,也就是key-value中key表明什麼,value表明什麼。還有就是須要了解JRedis和JedisPool,其餘的邏輯並不複雜。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。