Jedis源碼分析:JedisClusterConnectionHandler

JedisClusterConnectionHandler

   JedisClusterConnectionHandler提供了JedisCluster接口獲取資源池中Jedis鏈接對象的一個門面類,JedisClusterConnectionHandler提供了初始化集羣,獲取資源池中鏈接對象,刷新資源池等各類方法。JedisClusterConnectionHandler必須依賴於JedisClusterInfoCache類,由於 JedisClusterConnectionHandler自己的方法都是基於JedisClusterInfoCache類實現的,下列所講解的方法的底層實現會在JedisClusterInfoCache進行講述node

   JedisClusterConnectionHandler實例化時必須提供如下的參數(見代碼1-1),這些參數自己對於JedisClusterConnectionHandler沒什麼用處,徹底就是在該類的構造器中走了一遭,由於JedisClusterConnectionHandler內部維護着JedisClusterInfoCache實例。JedisClusterConnectionHandler將這些參數傳遞給cache實例後,經過initializeSlotsCache()方法獲取集羣信息,並調用cache.discoverClusterNodesAndSlots()將獲取到的集羣信息分別放入nodes緩存和slots緩存中。這兩個緩存的存儲形式爲 Map<String, JedisPool>,Map<Integer, JedisPool> ,雖然都是Map,可是存儲的數據內容徹底不一樣,nodes存儲着集羣全部節點信息,存儲形式爲<"host:port",JedisPool>,而slots存儲的信息爲<slot,JedisPool>,而且slots只存儲主節點信息。redis

   從代碼1-2中看出,經過遍歷節點集合,實例化Jedis對象,再調用cache.discoverClusterNodesAndSlots(jedis)方法將節點放入緩存Map中,獲取集羣信息的方法也很簡單,經過發送'cluster slots'便知道槽和節點的對應狀況,再將Redis服務端返回來的輸入流解析成Java對象便可。這裏註明下,發現集羣情況只須要一個可用的Jedis實例便可,可是集羣節點總會發生"掛掉"的狀況,建議在使用JedisCluster時配上全部的節點信息。後端

代碼1-1緩存

public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) { //實例化集羣信息緩存
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap); initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier); }

 

代碼1-2 集羣發現及初始化緩存池dom

 private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) { //遍歷節點集合,拿出一個可用的節點獲取集羣信息
      for (HostAndPort hostAndPort : startNodes) { Jedis jedis = null; try { jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier); //添加密碼認證
        if (password != null) { jedis.auth(password); } //設置客戶端名稱
        if (clientName != null) { jedis.clientSetname(clientName); } //TODO 根據節點發現集羣信息
 cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes
      } finally { if (jedis != null) { jedis.close(); } } } }

 

集羣節點的獲取

   JedisClusterConnectionHandler提供了四種獲取可用節點的方法,其中 getConnection()和 getConnectionFromSlot(int slot);被定義爲抽象方法,交由JedisSlotBasedConnectionHandler子類進行實現。值得注意的是,在獲取Jedis鏈接時,有兩個比較重要的點須要說明一下。前面說過JedisClusterInfoCache存放着nodes和slots兩個map對象,一般集羣模式都會作主從分離,即master節點負責讀寫操做,slave節點負責讀和備份.若是嘗試對slave節點進行寫操做,那麼會發生寫入錯誤。因此提供了getConnectionFromSlot(int slot) 方法來根據槽點值獲取對應的主節點信息,若是此時後端集羣發生主從切換,該方法會嘗試刷新集羣狀態來保證獲取可用的Jedis對象。若是集羣頻繁的出現切換,那麼即便刷新了集羣信息也不能保證獲取到的Jedis對象必定是有效的,代碼中也作了說明"It can't guaranteed to get valid connection because of node assignment(由於節點的分配問題並不能獲取到有效鏈接)",因此代碼中採用了重試機制和隨機數來儘可能保證獲取到的是有效的Jedis鏈接。第二個點是Jedis包依賴於Apache-CommonsPool2來作Jedis的二級緩存,爲了保證Jedis是有效的,Jedis包下的JedisFactory實現了PooledObjectFactory相關方法,經過配置DEFAULT_TEST_ON_CREATE,DEFAULT_TEST_ON_BORROW,DEFAULT_TEST_ON_RETURN,DEFAULT_NUM_TESTS_PER_EVICTION_RUN 參數保證在建立階段,借用階段,歸還階段時Jedis對象都是可用的。源碼分析

  • Jedis getConnectionFromNode(HostAndPort node)  使用二級緩存,先從JedisClusterInfoCache中獲取緩存對象,再去commons-pool包下獲取Jedis對象,若是commons-pool2中沒有存儲該對象,那麼先將該對象放入通用緩存池中再進行獲取。
  • Map<String, JedisPool> getNodes() 直接從JedisClusterInfoCache 的nodes緩存中獲取全部JedisPool對象
  • Jedis getConnection() 遍歷nodes中的節點,若是存在一個jedis對象可以成功執行ping命令,那麼返回該Jedis對象。若是遍歷完全部的node節點仍然不能找到一個有效的節點,那麼拋出JedisNoReachableClusterNodeException異常
  • Jedis getConnectionFromSlot(int slot) 根據槽點值獲取對應的主節點信息,slots緩存map中存放着槽點值->master node的對應關係。

代碼2-1:從緩存池中獲取Jedis對象ui

public Jedis getConnectionFromNode(HostAndPort node) { //使用二級緩存,先從JedisClusterInfoCache中獲取緩存對象,再去commons-pool包下獲取Jedis對象
    return cache.setupNodeIfNotExist(node).getResource(); } public Map<String, JedisPool> getNodes() { //直接從JedisClusterInfoCache 的nodes緩存中獲取全部JedisPool對象
    return cache.getNodes(); } public Jedis getConnection() { // In antirez's redis-rb-cluster implementation, // getRandomConnection always return valid connection (able to // ping-pong) // or exception if all connections are invalid //隨機獲取
    List<JedisPool> pools = cache.getShuffledNodesPool(); /** * 遍歷緩存池中的每一個池對象 */
    for (JedisPool pool : pools) { Jedis jedis = null; try { jedis = pool.getResource(); if (jedis == null) { continue; } String result = jedis.ping();//嘗試進行ping
        if (result.equalsIgnoreCase("pong")) return jedis;//返回可用鏈接
 jedis.close(); } catch (JedisException ex) { if (jedis != null) { jedis.close(); } } } //遍歷完成後仍然沒法獲取有效實例
    throw new JedisNoReachableClusterNodeException("No reachable node in cluster"); } public Jedis getConnectionFromSlot(int slot) { //從slots中獲取
    JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { // It can't guaranteed to get valid connection because of node // assignment
      return connectionPool.getResource(); } else { //刷新集羣信息
      renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
      connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { //TODO
        return connectionPool.getResource(); } else { //no choice, fallback to new connection to random node //回到隨機獲取
        return getConnection(); } } }

 

集羣信息的重置與關閉

JedisClusterConnectionHandler提供了三個方法來進行集羣信息的刷新和關閉操做,詳細的說明見《Jedis源碼分析:JedisClusterInfoCache》this

相關文章
相關標籤/搜索