https://segmentfault.com/a/1190000002690506java
本專欄與Redis相關的文章redis
Redis Sentinel機制與用法(一)
Redis Sentinel機制與用法(二)
Jedis的JedisSentinelPool源代碼分析
Jedis的Sharded源代碼分析
Redis 主從 Replication 的配置
詳解Redis SORT命令
JedisCommand接口說明數據庫
Jedis是Redis官方推薦的Java客戶端,更多Redis的客戶端能夠參考Redis官網客戶端列表。Redis-Sentinel做爲官方推薦的HA解決方案,Jedis也在客戶端角度實現了對Sentinel的支持,主要實如今JedisSentinelPool.java
這個類中,下文會分析這個類的實現。apache
JedisSentinelPool類裏有如下的屬性:segmentfault
//基於apache的commom-pool2的對象池配置 protected GenericObjectPoolConfig poolConfig; //超時時間,默認是2000 protected int timeout = Protocol.DEFAULT_TIMEOUT; //sentinel的密碼 protected String password; //redis數據庫的數目 protected int database = Protocol.DEFAULT_DATABASE; //master監聽器,當master的地址發生改變時,會觸發這些監聽者 protected Set<MasterListener> masterListeners = new HashSet<MasterListener>(); protected Logger log = Logger.getLogger(getClass().getName()); //Jedis實例建立工廠 private volatile JedisFactory factory; //當前的master,HostAndPort是一個簡單的包裝了ip和port的模型類 private volatile HostAndPort currentHostMaster;
構造器的代碼以下:多線程
public JedisSentinelPool(String masterName, Set<String> sentinels, final GenericObjectPoolConfig poolConfig, int timeout, final String password, final int database) { this.poolConfig = poolConfig; this.timeout = timeout; this.password = password; this.database = database; HostAndPort master = initSentinels(sentinels, masterName); initPool(master); }
構造器一開始對實例變量進行賦值,參數sentinels是客戶端所須要打交道的Redis-Sentinel,容許有多個,用一個集合來盛裝。ide
而後經過initSentinels方法與sentinel溝通後,肯定當前sentinel所監視的master是哪個。而後經過master來建立好對象池,以便後續從對象池中取出一個Jedis實例,來對master進行操做。memcached
initSentinels方法的代碼以下所示,我加了一些註釋:函數
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) { HostAndPort master = null; boolean sentinelAvailable = false; log.info("Trying to find master from available Sentinels..."); // 有多個sentinels,遍歷這些個sentinels for (String sentinel : sentinels) { // host:port表示的sentinel地址轉化爲一個HostAndPort對象。 final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); log.fine("Connecting to Sentinel " + hap); Jedis jedis = null; try { // 鏈接到sentinel jedis = new Jedis(hap.getHost(), hap.getPort()); // 根據masterName獲得master的地址,返回一個list,host= list[0], port = // list[1] List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); // connected to sentinel... sentinelAvailable = true; if (masterAddr == null || masterAddr.size() != 2) { log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + "."); continue; } master = toHostAndPort(masterAddr); log.fine("Found Redis master at " + master); // 若是在任何一個sentinel中找到了master,再也不遍歷sentinels break; } catch (JedisConnectionException e) { log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one."); } finally { // 關閉與sentinel的鏈接 if (jedis != null) { jedis.close(); } } } // 到這裏,若是master爲null,則說明有兩種狀況,一種是全部的sentinels節點都down掉了,一種是master節點沒有被存活的sentinels監控到 if (master == null) { if (sentinelAvailable) { // can connect to sentinel, but master name seems to not // monitored throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); } else { throw new JedisConnectionException( "All sentinels down, cannot determine where is " + masterName + " master is running..."); } } //若是走到這裏,說明找到了master的地址 log.info("Redis master running at " + master + ", starting Sentinel listeners..."); //啓動對每一個sentinels的監聽 for (String sentinel : sentinels) { final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort()); masterListeners.add(masterListener); masterListener.start(); } return master; }
能夠看到initSentinels
方法的參數有一個masterName,就是咱們所須要查找的master的名字。
一開始,遍歷多個sentinels,一個一個鏈接到sentinel,去詢問關於masterName的消息,能夠看到是經過jedis.sentinelGetMasterAddrByName()
方法去鏈接sentinel,並詢問當前的master的地址。點進這個方法去看看,源代碼是這樣寫的:ui
/** * <pre> * redis 127.0.0.1:26381> sentinel get-master-addr-by-name mymaster * 1) "127.0.0.1" * 2) "6379" * </pre> * @param masterName * @return two elements list of strings : host and port. */ public List<String> sentinelGetMasterAddrByName(String masterName) { client.sentinel(Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName); final List<Object> reply = client.getObjectMultiBulkReply(); return BuilderFactory.STRING_LIST.build(reply); }
調用的是與Jedis綁定的client去發送一個"get-master-addr-by-name"命令。
回到initSentinels
方法中,若是沒有詢問到master的地址,那就詢問下一個sentinel。若是詢問到了master的地址,那麼將再也不遍歷sentinel集合,直接break退出循環遍歷。
若是循環結束後,master的值爲null,那麼有兩種可能:
一種是全部的sentinel實例都不可用了
另一種是,sentinel實例有可用的,可是沒有監控名字爲masterName的Redis。
若是master爲null,程序會拋出異常,再也不往下走了。若是master不爲null呢,繼續往下走。
能夠從代碼中看到,爲每一個sentinel都啓動了一個監聽者MasterListener
。MasterListener自己是一個線程,它會去訂閱sentinel上關於master節點地址改變的消息。
接下來先分析構造方法中的另一個方法:initPool
。以後再看MasterListener的實現。
initPool的實現源代碼以下所示:
private void initPool(HostAndPort master) { if (!master.equals(currentHostMaster)) { currentHostMaster = master; if (factory == null) { factory = new JedisFactory(master.getHost(), master.getPort(), timeout, password, database); initPool(poolConfig, factory); } else { factory.setHostAndPort(currentHostMaster); // although we clear the pool, we still have to check the // returned object // in getResource, this call only clears idle instances, not // borrowed instances internalPool.clear(); } log.info("Created JedisPool to master at " + master); } }
能夠看到,做爲參數傳進來的master會與實例變量currentHostMaster做比較,看看是不是相同的,爲何要做這個比較呢,由於前文中提到的MasterListener
會在發現master地址改變之後,去調用initPool
方法。
若是是第一次調用initPool
方法(構造函數中調用),那麼會初始化Jedis實例建立工廠,若是不是第一次調用(MasterListener
中調用),那麼只對已經初始化的工廠進行從新設置。
從以上也能夠看出爲何currentHostMaster
和factory
這兩個變量爲何要聲明爲volatile
,它們會在多線程環境下被訪問和修改,所以必須保證可見性。
第一次調用時,會調用initPool(poolConfig, factory)
方法。
看看這個方法的源代碼:
public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) { if (this.internalPool != null) { try { closeInternalPool(); } catch (Exception e) { } } this.internalPool = new GenericObjectPool<T>(factory, poolConfig); }
基本上只幹了一件事:初始化內部對象池。
直接看它的run方法實現吧:
public void run() { running.set(true); while (running.get()) { j = new Jedis(host, port); try { //訂閱sentinel上關於master地址改變的消息 j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.fine("Sentinel " + host + ":" + port + " published: " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 3) { if (masterName.equals(switchMasterMsg[0])) { initPool(toHostAndPort(Arrays.asList( switchMasterMsg[3], switchMasterMsg[4]))); } else { log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + masterName); } } else { log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +switch-master: " + message); } } }, "+switch-master"); } catch (JedisConnectionException e) { if (running.get()) { log.severe("Lost connection to Sentinel at " + host + ":" + port + ". Sleeping 5000ms and retrying."); try { Thread.sleep(subscribeRetryWaitTimeMillis); } catch (InterruptedException e1) { e1.printStackTrace(); } } else { log.fine("Unsubscribing from Sentinel at " + host + ":" + port); } } } }
能夠看到它依然委託了Jedis去與sentinel打交道,訂閱了關於master地址變換的消息,當master地址變換時,就會再調用一次initPool
方法,從新設置對象池相關的設置。
Jedis的JedisSentinelPool的實現僅僅適用於單個master-slave。 如今有了更多的需求,既須要sentinel提供的自動主備切換機制,又須要客戶端可以作數據分片(Sharding),相似於memcached用一致性哈希進行數據分片。 接下來可能會本身在現有Jedis上實現一個支持一致性哈希分片的ShardedJedisSentinelPool。