Jedis是如何支持Cluster的

做者:莫那·魯道
原文:http://thinkinjava.cn/2018/08/Jedis-%E5%A6%82%E4%BD%95%E6%94%AF%E6%8C%81-Cluster/

前言

前面說了 Jedis(2.9.0) 如何支持 Redis Sentinel 的,今天看看 Jedis 是如何支持 Redis Cluster 的。java

1 初始化

Jedis Cluster 構造方法:node

 public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, final GenericObjectPoolConfig poolConfig) {
    super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, poolConfig);
  }

注意: Set<HostAndPort> jedisClusterNode 中包含全部主從節點。web

經過層層跟蹤,咱們來到了 initializeSlotsCache 方法。redis

 private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
    for (HostAndPort hostAndPort : startNodes) {
      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
      if (password != null) {
        jedis.auth(password);
      }
      try {
      // 
        cache.discoverClusterNodesAndSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
        // try next nodes
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
  }

這個 cache 設計上就是 Redis Cluster slot 的緩存,每一個 slot 都指向一個鏈接池。看看這個 cache 的內部結構:算法

public class JedisClusterInfoCache {
  private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();// ip:port 對應的鏈接池
  private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();// slot 對應的鏈接池

  private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  private final Lock r = rwl.readLock();
  private final Lock w = rwl.writeLock();
  private volatile boolean rediscovering;
  private final GenericObjectPoolConfig poolConfig;

  private int connectionTimeout;
  private int soTimeout;
  private String password;

  private static final int MASTER_NODE_INDEX = 2;// 主節點下標

其中,在 initializeSlotsCache 方法中,會遍歷全部的節點信息,可是,只會執行一次 cache.discoverClusterNodesAndSlots(jedis),若是失敗了,就繼續執行這個方法。爲何只須要執行一次呢?數據庫

來看看 cache.discoverClusterNodesAndSlots 方法:緩存

 public void discoverClusterNodesAndSlots(Jedis jedis) {
    w.lock();

    try {
      reset();
      List<Object> slots = jedis.clusterSlots();// 節點的槽位集合:[[10924, 16383, [[B@4ae82894, 6386], [[B@543788f3, 6387]], [5462, 10923, [[B@6d3af739, 6384], [[B@1da51a35, 6385]], [0, 5461, [[B@16022d9d, 6382], [[B@7e9a5fbe, 6383]]]

      for (Object slotInfoObj : slots) {// 遍歷集合
        List<Object> slotInfo = (List<Object>) slotInfoObj;

        if (slotInfo.size() <= MASTER_NODE_INDEX) {// 若是此節點信息少於3 個,跳過這次循環,通常是: slotIndex, slotIndex,{ip byte[], port},{ip byte[], port}
          continue;
        }

        List<Integer> slotNums = getAssignedSlotArray(slotInfo);// 獲得全部的 slot 數字

        // hostInfos
        int size = slotInfo.size();
        // 從第三位開始循環,是主節點信息
        for (int i = MASTER_NODE_INDEX; i < size; i++) {
          List<Object> hostInfos = (List<Object>) slotInfo.get(i);// 獲得主節點信息
          if (hostInfos.size() <= 0) {
            continue;
          }

          HostAndPort targetNode = generateHostAndPort(hostInfos); // 解析出 ip + port
          setupNodeIfNotExist(targetNode);// 建立鏈接池,並放入緩存
          if (i == MASTER_NODE_INDEX) {// 若是是主節點,就將該全部槽位指向同一個鏈接池
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {
      w.unlock();
    }
  }

該方法做用以下:經過任意一個節點,獲得全部主節點的信息。數據格式爲:微信

獲得這些信息後,根據 ip + port 建立鏈接池,並緩存全部的鏈接池,key 爲 「ip:port」,value 則是對應的鏈接池,若是是主節點,則更進一步,將 solt 和鏈接池也所有緩存,便於查詢。運維

該方法涉及的幾個方法以下:dom

private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
  List<Integer> slotNums = new ArrayList<Integer>();
  // 0位是起始 slot, 1 位是截止 slot, 這裏是獲得全部的 slot
  for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); slot++) {// 初始是第一個, slot 不能大於第二個 slot
    slotNums.add(slot);
  }
  return slotNums;
}
public JedisPool setupNodeIfNotExist(HostAndPort node) {
    w.lock();
    try {
      String nodeKey = getNodeKey(node); // ip:port
      JedisPool existingPool = nodes.get(nodeKey);// 從 map 裏獲取緩存
      if (existingPool != null) return existingPool;// 若是有,就再也不初始化
      // 建立鏈接池
      JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
          connectionTimeout, soTimeout, password, 0, null, false, null, null, null);
      nodes.put(nodeKey, nodePool);// 緩存
      return nodePool;
    } finally {
      w.unlock();
    }
  }
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
    w.lock();
    try {
      JedisPool targetPool = setupNodeIfNotExist(targetNode);// 獲取該節點的鏈接池
      for (Integer slot : targetSlots) {// 將全部槽位指向該鏈接池
        slots.put(slot, targetPool);
      }
    } finally {
      w.unlock();
    }
  }

因此,當這個步驟成功之後,全部的 slot 和對應的鏈接池都初始化好了,後面就直接 break 了。若是途中失敗了,則繼續嘗試。

2 發送命令和重試機制

好了,咱們已經知道,slot 和鏈接池是保存在 JedisClusterInfoCache 類中的,那麼,咱們使用 API 的時候又是怎麼操做的呢?

以 set 方法爲例:

 public String set(final String key, final String value) {
    return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value);
      }
    }.run(key);
  }

這裏會建立一個 Redis 命令對象,而後執行 run 方法,run 方法裏會回調命令對象的 execute 方法。run 方法內部調用的是 runWithRetries 方法,看名字,這是一個帶有重試機制的方法. 該方法有個參數就是 int attempts,用戶本身設置的重試次數。

看看 runWithRetries 方法實現(由於包含了失敗重試邏輯,因此很長):

   private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
    if (attempts <= 0) {// 重試次數
      throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
    }

    Jedis connection = null;
    try {

      if (asking) {// 第一次 false,若是節點 A 正在遷移槽 i 至節點 B , 那麼當節點 A 沒能在本身的數據庫中找到命令指定的數據庫鍵時, 節點 A 會向客戶端返回一個 ASK 錯誤, 指引客戶端到節點 B 繼續查找指定的數據庫鍵
        connection = askConnection.get();
        connection.asking();// 到目標節點打開客戶端鏈接標識

        // if asking success, reset asking flag
        asking = false;
      } else {
        if (tryRandomNode) {// 若是是隨機的
          connection = connectionHandler.getConnection();
        } else {// 默認不是隨機的,經過 CRC16 算法獲取 slot 對應的節點的鏈接池中的鏈接
          connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
        }
      }
      // 執行
      return execute(connection);

    } catch (JedisNoReachableClusterNodeException jnrcne) {// 集羣不存在
      throw jnrcne;
    } catch (JedisConnectionException jce) {// 鏈接異常
      // release current connection before recursion
      releaseConnection(connection);//歸還鏈接
      connection = null;

      if (attempts <= 1) {// 若是重試次數只有一次,那就更新鏈接池,並拋出異常
        this.connectionHandler.renewSlotCache();
        throw jce;
      }

      return runWithRetries(key, attempts - 1, tryRandomNode, asking);// 不然遞歸重試,重試次數減一
    } catch (JedisRedirectionException jre) {// 若是是重定向異常,例如 moved ,ASK
      // if MOVED redirection occurred,
      if (jre instanceof JedisMovedDataException) {// 節點在接到一個命令請求時, 會先檢查這個命令請求要處理的鍵所在的槽是否由本身負責, 若是不是的話, 節點將向客戶端返回一個 MOVED 錯誤, MOVED 錯誤攜帶的信息能夠指引客戶端轉向至正在負責相關槽的節點
        // 若是是 moved 錯誤,就更新鏈接池, ASK 就沒必要更新緩存,只須要臨時訪問就行
        this.connectionHandler.renewSlotCache(connection);
      }
      // 歸還舊的鏈接
      releaseConnection(connection);
      connection = null;
      // 若是是 ASK 
      if (jre instanceof JedisAskDataException) {
        asking = true;
        // 設置 ThreadLocal,新的鏈接是 ASK 指定的節點
        askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
      } else if (jre instanceof JedisMovedDataException) {// 若是是 moved 錯誤,不處理錯誤,重試。
      } else {
        throw new JedisClusterException(jre);
      }
      // 重試
      return runWithRetries(key, attempts - 1, false, asking);
    } finally {
      releaseConnection(connection);
    }
  }

該方法主要步驟以下:

  1. 默認是使用 CRC16 算法經過 key 獲得 slot ,而後,根據 slot 獲得 Jedis 鏈接,也就是從咱們剛剛說的緩存裏獲取鏈接。

  2. 獲得鏈接後,回調命令對象的 execute 方法。

  3. 若是發生了 JedisNoReachableClusterNodeException 異常,代表集羣不存在,則直接拋出異常,結束方法。

  4. 若是發生了 JedisConnectionException 鏈接異常,則進行遞歸重試,若是重試次數只剩一次,則刷新鏈接池緩存。

  5. 若是發生了 JedisRedirectionException 重定向異常,若是返回的是 moved,則刷新鏈接池。若是是 ASK,則不刷新鏈接池,在下次遞歸中直接使用 ASK 返回的信息進行調用。下次遞歸時,先執行 asking 命令打開新的客戶端鏈接,若是成功,則執行真正的命令。

  6. 最終,歸還鏈接。

大體的流程圖以下:

這裏說一下 ASK 和 MOVED:

ASK:若是節點 A 正在遷移槽 i 至節點 B , 那麼當節點 A 沒能在本身的數據庫中找到命令指定的數據庫鍵時, 節點 A 會向客戶端返回一個 ASK 錯誤, 指引客戶端到節點 B 繼續查找指定的數據庫鍵。

MOVED:節點在接到一個命令請求時, 會先檢查這個命令請求要處理的鍵所在的槽是否由本身負責, 若是不是的話, 節點將向客戶端返回一個 MOVED 錯誤, MOVED 錯誤攜帶的信息能夠指引客戶端轉向至正在負責相關槽的節點。

二者的共同點都是重定向,不一樣點是:ASK 是遷移過程當中返回的,MOVED 是遷移結束後返回的。如返回 ASK ,那麼就沒必要更新客戶端緩存,由於客戶端沒法知道何時遷移完成,所以只能是臨時性的重定向。可是 MOVED 重定向說明鍵對應的 slot 已經成功的轉移到了新的節點,那麼就能夠換成這些鏈接。

注意:當重試次數不夠時,會拋出 throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?") 異常,緣由是節點宕機或請求超時觸發了重試,而重試次數耗盡就會觸發這個異常。

當 Cluster 進行故障發現到完成故障轉移,須要必定的時間,節點宕機期間,全部指向這個節點的命令都會觸發重試,當收到 moved 命令則會進行鏈接刷新 —— 也就是 renewSlotCache 方法。

注意:更新鏈接池的過程是串行加鎖的!!

代碼以下:

 public void renewClusterSlots(Jedis jedis) {
    //If rediscovering is already in process - no need to start one more same rediscovering, just return
    if (!rediscovering) {
      try {
        w.lock();
        rediscovering = true;

        if (jedis != null) {
          try {
            discoverClusterSlots(jedis);
            return;
          } catch (JedisException e) {
            //try nodes from all pools
          }
        }

        for (JedisPool jp : getShuffledNodesPool()) {
          try {
            jedis = jp.getResource();
            discoverClusterSlots(jedis);
            return;
          } catch (JedisConnectionException e) {
            // try next nodes
          } finally {
            if (jedis != null) {
              jedis.close();
            }
          }
        }
      } finally {
        rediscovering = false;
        w.unlock();
      }
    }
  }

注意:代碼中使用了寫鎖,而獲取鏈接池時則使用了讀鎖,讀寫鎖是互斥的,這時將致使全部訪問集羣的線程阻塞!!!

固然,只有出現 MOVED 錯誤或者 JedisConnectionException 異常且沒法繼續重試時,纔會進行刷新鏈接池操做。

3 總結

本文旨在分析 Jedis 如何支持 Redis Cluster,由於 Redis Cluster 須要客戶端來支持分片。Jedis 內部使用了一個 JedisClusterInfoCache 保存 slot 和 pool,ip:port 和 pool 的映射關係,ip:port 的緩存更可能是服務於 ask 時尋找節點。

在使用客戶端是時候,Jedis 會有重試機制,用戶能夠設置重試次數,若是發生了 ask,客戶端會自動根據返回值重定向,若是發生了 moved,則會刷新鏈接池中的 slot,由於集羣發生了遷移。

須要注意的是,當集羣進行遷移的時候,若是有客戶端訪問遷移的節點,那麼將會致使刷新鏈接池,而這個鏈接池是有鎖,當刷新的時候,使用的是寫鎖,將致使全部的讀都會阻塞,因此,遷移儘可能在業務低谷進行。

瞭解客戶端的原理,有助於咱們理解 Redis Cluster 的運行原理,也有助於咱們平時編寫代碼,運維緩存,排查故障。


本文分享自微信公衆號 - 碼農沉思錄(code-thinker)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索