Redis-探究-集羣擴容致使的Jedis客戶端報JedisMovedDataException異常

0 問題的產生

因爲線上Redis集羣內存使用量已經接近達到預警閾值,須要對Redis集羣擴容。(使用的是Redis自帶的Redis-Cluster)java

目前有6臺主節點,6臺從節點。node

暫時稱爲:redis

  • redis-master001 ~ redis-master006
  • redis-slave001 ~ redis-slave006

須要增長3主3從。緩存

  • redis-master007 ~ redis-master009
  • redis-slave007 ~ redis-master009

以前Redis集羣的16384個槽均勻分配在6臺主節點中,每一個節點2730個槽。微信

爲保證擴容後,槽依然均勻分佈,須要將以前6臺的每臺機器上遷移出910個槽,方案以下:dom

  • redis-master001的910個slot遷移到redis-master007
  • redis-master002的910個slot遷移到redis-master007
  • redis-master003的910個slot遷移到redis-master008
  • redis-master004的910個slot遷移到redis-master008
  • redis-master005的910個slot遷移到redis-master009
  • redis-master006的910個slot遷移到redis-master009

分配完以後,每臺節點1820個slot。ide

當將redis-master001的910個slot遷移到redis-master007後,業務上開始報下面的異常源碼分析

JedisMovedDataException

在馬賽克的上一行,能夠看到是調Jedis的get方法出的問題。ui

1 緣由及解決方案

問題的緣由在於使用了Jedis客戶端,改成使用JedisCluster客戶端便可解決問題。this

出問題的get方法是這樣寫的(在Jedis原生基礎上包裝了一層)

// 本身封裝的get方法
public String get(String key) {
    String result = 「";
    // 爲了打印獲取鏈接耗時的日誌,這裏單獨獲取了一下Jedis鏈接
    try (Jedis jedis = this.getResourceLog(key)) {
        TimeCost timeCost = new TimeCost();
        result = jedis.get(key); // 這裏報錯
        debugLogger.debug("redis cluster get TimeCost={}", timeCost.getCostMillSeconds());
    }
    // 其實改成下面這樣get就能夠解決一直報JedisMovedDataException問題
    // return jedisCluster.get(key);
    return result;
}
複製代碼

getResourceLog方法的做用是根據key計算出這個key所在的slot,再經過slot獲取Redis鏈接。代碼以下

private Jedis getResourceLog(String key) {
    TimeCost tc = new TimeCost();
    int slot = JedisClusterCRC16.getSlot(key); // CRC計算slot
    debugLogger.debug("calc slot TimeCost={}", tc.getCostMillSeconds());
    tc.reset();
    Jedis jedis = connectionHandler.getConnectionFromSlot(slot); // 經過slot獲取鏈接
    debugLogger.debug("get connection TimeCost={}", tc.getCostMillSeconds());
    return jedis;
}
複製代碼

上面的get方法能夠直接改成JedisCluster的get方法解決。

再考慮另一種狀況,若是必須經過Jedis操做呢?好比watch方法,JedisCluster是不提供watch的,那麼只能經過上述方法在Redis集羣中根據key獲取到slot,再經過slot獲取到jedis連接,而後調用watch。這樣一來,在調watch的地方也會報JedisMovedDataException。

例以下面的代碼,在業務上須要保證事務的狀況下(或樂觀鎖),可能會這樣實現:

Jedis jedis = null;
String key = ...; // redis key
try {
    // 經過上面的getResource方法獲取jedis連接
    jedis = getResource(userId);
    // 經過jedis watch key
    if (RedisConstants.SAVE_TO_REDIS_OK.equals(jedis.watch(key))) {

        // .... 業務邏輯 ....
        // ....
        // 經過jedis連接開始事務
        Transaction transaction = jedis.multi();
        // ...
        // ... 執行一些transaction操做...
        // ...
        // 提交事務
        List<Object> execResult = transaction.exec();
        
        return ...;
    }
} catch (Exception ex) {
    // do something ...
} finally {
    if (jedis != null) {
        try {
            if (!flag) {
                jedis.unwatch();
            }
        } finally {
            jedis.close();
        }
    }
}
複製代碼

此時若是發生slot遷移,就會報JedisMovedDataException。

那這種狀況下的解決方案是什麼呢?

其實,優先catch住JedisMovedDataException,而後經過JedisCluster.get(key);一下就行,以下:

Jedis jedis = null;
String key = ...; // redis key
try {
    // 經過上面的getResource方法獲取jedis連接
    jedis = getResource(userId);
    // 經過jedis watch key
    if (RedisConstants.SAVE_TO_REDIS_OK.equals(jedis.watch(key))) {

        // .... 業務邏輯 ....
        // ....
        // 經過jedis連接開始事務
        Transaction transaction = jedis.multi();
        // ...
        // ... 執行一些transaction操做...
        // ...
        // 提交事務
        List<Object> execResult = transaction.exec();
        
        return ...;
    }
} catch (JedisMovedDataException jmde) {
    jmde.printStackTrace();
    // redisClusterService中維護着jedisCluster實例,這個get實際上調用的是jedisCluster的get
    redisClusterService.get(key);
    return ...;
} catch (Exception ex) {
    // do something ...
} finally {
    if (jedis != null) {
        try {
            if (!flag) {
                jedis.unwatch();
            }
        } finally {
            jedis.close();
        }
    }
}
複製代碼

須要注意的是,用Jedis的get是不能解決的。

2 JedisCluster類圖

JedisCluster總體的UML關係以下,先有個總體的印象,在後面的源碼分析中,能夠再回來看。

Jedis類圖

3 爲何經過RedisCluster.get一下能夠解決?

下面經過JedisCluster源碼解釋爲何這麼作能夠解決問題,註釋中會有詳細說明。

JedisCluster.get源碼以下:

@Override
public String get(final String key) {
  return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
    @Override
    public String execute(Jedis connection) {
      return connection.get(key);
    }
  }.run(key);
}
複製代碼

發現他是委託給JedisClusterCommand來完成get操做的,也能夠發現execute方法其實是使用Jedis來執行的get。這個Jedis實際上就是經過上述方法,先計算出slot,再經過slot獲取到Jedis連接的。關鍵在於最下面run方法的執行,下面具體看一下。

Run方法源碼以下:

public T run(String key) {
  // JedisClusterCRC16.getSlot(key) 計算出slot
  return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
}
複製代碼

runWithRetries源碼以下

private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
  // 這裏是一個重試機制,報異常時觸發
  if (attempts <= 0) {
    throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
  }

  Jedis connection = null;
  try {

    if (redirect != null) {
      connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
      if (redirect instanceof JedisAskDataException) {
        // TODO: Pipeline asking with the original command to make it faster....
        connection.asking();
      }
    } else {
      if (tryRandomNode) {
        connection = connectionHandler.getConnection();
      } else {
        // 執行到這裏,經過slot獲取到Jedis connection
        // 內部是經過一個map維護的slot到JedisPool的映射關係
        connection = connectionHandler.getConnectionFromSlot(slot);
      }
    }

    // 執行上面JedisClusterCommand定義的execute方法。
    return execute(connection);

  } catch (JedisNoReachableClusterNodeException jnrcne) {
    throw jnrcne;
  } catch (JedisConnectionException jce) {
    // release current connection before recursion
    releaseConnection(connection);
    connection = null;

    if (attempts <= 1) {
      //We need this because if node is not reachable anymore - we need to finally initiate slots
      //renewing, or we can stuck with cluster state without one node in opposite case.
      //But now if maxAttempts = [1 or 2] we will do it too often.
      //TODO make tracking of successful/unsuccessful operations for node - do renewing only
      //if there were no successful responses from this node last few seconds
      this.connectionHandler.renewSlotCache();
    }

    return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
  } catch (JedisRedirectionException are) { // *** 關鍵在這 ***
    // if MOVED redirection occurred,
    // JedisMovedDataException是JedisRedirectionException的子類,因此會執行下面if中的代碼
    if (jre instanceof JedisMovedDataException) {
      // it rebuilds cluster's slot cache recommended by Redis cluster specification
      // 從新經過這個jedis連接獲取RedisCluster中的Node信息以及slot信息
      this.connectionHandler.renewSlotCache(connection);
    }

    // release current connection before recursion
    releaseConnection(connection);
    connection = null;

    return runWithRetries(slot, attempts - 1, false, jre);
  } finally {
    releaseConnection(connection);
  }
}
複製代碼

註釋中說到了最終會經過this.connectionHandler.renewSlotCache(connection);來從新獲取slot信息。下面來看下這個方法。

public void renewSlotCache(Jedis jedis) {
  cache.renewClusterSlots(jedis);
}
複製代碼

調用了cache的renewClusterSlots方法來從新獲取slot信息,這個cache是JedisClusterInfoCache類的實例,他裏面維護這Node和Slot信息,以下:

public class JedisClusterInfoCache {
  private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
  private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

  // ..
}
複製代碼

renewClusterSlots方法以下

public void renewClusterSlots(Jedis jedis) {
  //If rediscovering is already in process - no need to start one more same rediscovering, just return
  if (!rediscovering) {
    try {
      w.lock();
      if (!rediscovering) {
        rediscovering = true;

        try {
          if (jedis != null) {
            try {
              // 關鍵在於這一步,這個方法會從新從遠程集羣中獲取最新的slot信息
              discoverClusterSlots(jedis);
              return;
            } catch (JedisException e) {
              //try nodes from all pools
            }
          }

          for (JedisPool jp : getShuffledNodesPool()) {
            Jedis j = null;
            try {
              j = jp.getResource();
              discoverClusterSlots(j);
              return;
            } catch (JedisConnectionException e) {
              // try next nodes
            } finally {
              if (j != null) {
                j.close();
              }
            }
          }
        } finally {
          rediscovering = false;      
        }
      }
    } finally {
      w.unlock();
    }
  }
}
複製代碼

關鍵在於discoverClusterSlots方法,這個方法的實現以下:

private void discoverClusterSlots(Jedis jedis) {
  // 經過slots命令從遠程獲取slot信息
  List<Object> slots = jedis.clusterSlots(); 
  this.slots.clear(); // 清除本地緩存slot信息

  // 每一個slotInfoObj包含集羣中某一節點的slot信息
  for (Object slotInfoObj : slots) {
    List<Object> slotInfo = (List<Object>) slotInfoObj;

    if (slotInfo.size() <= MASTER_NODE_INDEX) {
      continue;
    }
    // 計算當前節點的slot信息
    List<Integer> slotNums = getAssignedSlotArray(slotInfo);

    // hostInfos
    // 獲取這組slot所在的節點信息
    List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
    if (hostInfos.isEmpty()) {
      continue;
    }

    // at this time, we just use master, discard slave information
    HostAndPort targetNode = generateHostAndPort(hostInfos);
    // 從新關聯這組slot到遠程節點的映射,至此,完成slot信息的刷新
    assignSlotsToNode(slotNums, targetNode);
  }
}
複製代碼

4 爲何Jedis的get不行?

首先咱們來對比一下JedisCluster的get和Jedis的get

JedisCluster.get

@Override
public String get(final String key) {
  return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
    @Override
    public String execute(Jedis connection) {
      return connection.get(key); // 這裏追蹤進去,就是Jedis.get
    }
  }.run(key);
}
複製代碼

Jedis.get

@Override
public String get(final String key) {
  checkIsInMultiOrPipeline();
  client.get(key);
  return client.getBulkReply();
}
複製代碼

由此可知,Jedis.get沒有了run方法中的異常重試和從新發現機制,因此Jedis.get不行。

5 總結

本文從一次線上擴容引起問題的討論,由擴容引出了slot的遷移,由slot的遷移引出線上報錯-JedisMovedDataException,而後說明了引起這個異常的緣由,是由於咱們使用了Jedis客戶端,致使沒法自動發現遠程集羣slot的變化。

而後提出瞭解決方案,經過使用JedisCluster來解決沒法自動發現slot變化的問題。並從源碼的角度說明了爲何JedisCluster的get方法能夠自動發現遠程slot的變化。


歡迎關注個人微信公衆號

公衆號
相關文章
相關標籤/搜索