在Redis Cluster集羣模式下,因爲key分佈在各個節點上,會形成沒法直接實現mget、sInter等功能。所以,不管咱們使用什麼客戶端來操做Redis,都要考慮單一key命令操做、批量key命令操做和多節點命令操做的狀況,以及效率問題。java
在以前的文章中剖析了Jedis cluster集羣初始化源碼,分析了源碼以後能夠得知,在Jedis中,使用的是JedisClusterConnection集羣鏈接類來與Redis集羣節點進行命令交互,它使用裝飾模式對JedisCluster命令執行類進行了一層包裝,同時對這三種不一樣類型的命令操做作了分類處理。node
下面就看下JedisClusterConnection類中,如何實現這三種類型的key命令操做。在這裏只列舉一些典型的命令進行說明。本文基於spring-data-redis-1.8.4-RELEASE.jar和jedis-2.9.0.jar進行源碼剖析,Redis版本爲Redis 3.2.8。redis
對於單一命令操做,經常使用的就是get、set了。在JedisClusterConnection類中,get方法的實現以下:spring
public byte[] get(byte[] key) { try { return cluster.get(key); } catch (Exception ex) { throw convertJedisAccessException(ex); } }
在上面代碼中,執行cluster.get()方法時,實際上調用的是BinaryJedisCluster類的get()方法:緩存
public byte[] get(final byte[] key) { return new JedisClusterCommand<byte[]>(connectionHandler, maxAttempts) { @Override public byte[] execute(Jedis connection) { return connection.get(key); } }.runBinary(key); }
BinaryJedisCluster類的get()方法的核心操做是由JedisClusterCommand類runBinary()方法完成的,下面剖析一下該類的核心代碼:服務器
public abstract class JedisClusterCommand<T> { // 集羣節點鏈接器 private JedisClusterConnectionHandler connectionHandler; // 重試次數,默認5次 private int maxAttempts; // 模板回調方法,執行相關的redis命令 public abstract T execute(Jedis connection); public T runBinary(byte[] key) { if (key == null) { throw new JedisClusterException("No way to dispatch this command to Redis Cluster."); } return runWithRetries(key, this.maxAttempts, false, false); } /** * 利用重試機制運行鍵命令 * * @param key * 要操做的鍵 * @param attempts * 重試次數,每重試一次減1 * @param tryRandomNode * 標識是否隨機獲取活躍節點鏈接,true爲是,false爲否 * @param asking * @return */ private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) { if (attempts <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { // 隨機獲取活躍節點鏈接 connection = connectionHandler.getConnection(); } else { // 計算key的slot值,而後根據slot緩存獲取節點鏈接 connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } // 調用具體的模板方法實現執行命令 return execute(connection); // 集羣節點不可達,直接拋出異常 } catch (JedisNoReachableClusterNodeException jnrcne) { throw jnrcne; } catch (JedisConnectionException jce) { // 在遞歸執行runWithRetries方法以前釋放鏈接 releaseConnection(connection); connection = null; // 若是節點不能鏈接,從新初始化slot緩存 if (attempts <= 1) { this.connectionHandler.renewSlotCache(); throw jce; } // 出現鏈接錯誤重試執行命令 return runWithRetries(key, attempts - 1, tryRandomNode, asking); } catch (JedisRedirectionException jre) { // 若是出現MOVE重定向錯誤,在鏈接上執行cluster slots命令從新初始化slot緩存 if (jre instanceof JedisMovedDataException) { this.connectionHandler.renewSlotCache(connection); } // 在遞歸執行runWithRetries方法或者重建slot緩存以前釋放鏈接,從而避免在錯誤的鏈接上執行命令,也爲了不鏈接泄露問題 releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } // slot初始化後重試執行命令 return runWithRetries(key, attempts - 1, false, asking); } finally { //釋放鏈接 releaseConnection(connection); } } }
單一key命令執行流程:dom
在Redis Cluster中,有些命令如keys、flushall和刪除指定模式的鍵這些操做,須要遍歷全部的節點才能夠完成。下面就以keys命令來講明這種狀況下JedisClusterConnection類是如何完成該操做的,該類中keys()方法代碼以下:異步
public Set<byte[]> keys(final byte[] pattern) { Assert.notNull(pattern, "Pattern must not be null!"); //在全部主節點上執行keys命令,而後返回一個Collection集合 Collection<Set<byte[]>> keysPerNode = clusterCommandExecutor .executeCommandOnAllNodes(new JedisClusterCommandCallback<Set<byte[]>>() { @Override public Set<byte[]> doInCluster(Jedis client) { return client.keys(pattern); } }).resultsAsList(); //遍歷執行keys命令得到的結果,而後添加進Set集合返回 Set<byte[]> keys = new HashSet<byte[]>(); for (Set<byte[]> keySet : keysPerNode) { keys.addAll(keySet); } return keys; }
在上面代碼中咱們看到了keys()方法內部調用了ClusterCommandExecutor類的executeCommandOnAllNodes()方法,該類是一個集羣命令執行類,它提供了在多個集羣節點上批量執行命令的特性,因爲考慮到在多個節點上執行命令的效率問題,它使用Spring的org.springframework.core.task包裏面的AsyncTaskExecutor接口來爲命令執行操做提供異步支持,而後返回異步執行結果。ClusterCommandExecutor類的executeCommandOnAllNodes()方法及關聯方法實現剖析以下:ide
/** * 使用ClusterCommandCallback接口實現類的doInCluster()方法在全部可達的主節點上執行命令 * * @param cmd * @return * @throws ClusterCommandExecutionFailureException */ public <S, T> MulitNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCallback<S, T> cmd) { // getClusterTopology().getActiveMasterNodes()獲取的是全部的主節點 return executeCommandAsyncOnNodes(cmd, getClusterTopology().getActiveMasterNodes()); } /** * @param callback * @param nodes * @return * @throws ClusterCommandExecutionFailureException * @throws IllegalArgumentException * in case the node could not be resolved to a topology-known node */ public <S, T> MulitNodeResult<T> executeCommandAsyncOnNodes(final ClusterCommandCallback<S, T> callback, Iterable<RedisClusterNode> nodes) { Assert.notNull(callback, "Callback must not be null!"); Assert.notNull(nodes, "Nodes must not be null!"); List<RedisClusterNode> resolvedRedisClusterNodes = new ArrayList<RedisClusterNode>(); ClusterTopology topology = topologyProvider.getTopology(); // 遍歷Redis集羣節點集合nodes,獲取節點信息 for (final RedisClusterNode node : nodes) { try { resolvedRedisClusterNodes.add(topology.lookup(node)); } catch (ClusterStateFailureException e) { throw new IllegalArgumentException(String.format("Node %s is unknown to cluster", node), e); } } // 遍歷節點信息,在相應Redis集羣節點上執行相關命令 Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<NodeExecution, Future<NodeResult<T>>>(); for (final RedisClusterNode node : resolvedRedisClusterNodes) { futures.put(new NodeExecution(node), executor.submit(new Callable<NodeResult<T>>() { @Override public NodeResult<T> call() throws Exception { return executeCommandOnSingleNode(callback, node); } })); } // 解析執行結果並返回 return collectResults(futures); } public <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> cmd, RedisClusterNode node) { return executeCommandOnSingleNode(cmd, node, 0); } private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> cmd, RedisClusterNode node, int redirectCount) { Assert.notNull(cmd, "ClusterCommandCallback must not be null!"); Assert.notNull(node, "RedisClusterNode must not be null!"); if (redirectCount > maxRedirects) { throw new TooManyClusterRedirectionsException(String.format( "Cannot follow Cluster Redirects over more than %s legs. Please consider increasing the number of redirects to follow. Current value is: %s.", redirectCount, maxRedirects)); } RedisClusterNode nodeToUse = lookupNode(node); S client = this.resourceProvider.getResourceForSpecificNode(nodeToUse); Assert.notNull(client, "Could not acquire resource for node. Is your cluster info up to date?"); try { // 在相應Redis節點上執行命令,具體執行命令的函數是實現ClusterCommandCallback接口的類的doInCluster方法 return new NodeResult<T>(node, cmd.doInCluster(client)); } catch (RuntimeException ex) { RuntimeException translatedException = convertToDataAccessExeption(ex); // 若是請求不被目標服務器接受,則進行重試,從新執行命令:redirectCount + 1 if (translatedException instanceof ClusterRedirectException) { ClusterRedirectException cre = (ClusterRedirectException) translatedException; return executeCommandOnSingleNode(cmd, topologyProvider.getTopology().lookup(cre.getTargetHost(), cre.getTargetPort()), redirectCount + 1); } else { throw translatedException != null ? translatedException : ex; } } finally { this.resourceProvider.returnResourceForSpecificNode(nodeToUse, client); } }
多節點命令執行流程:函數
與keys、flushall等多節點命令類似,mget等批量key操做命令也要遍歷多個節點執行相關命令。下面就以mget命令來講明這種狀況下JedisClusterConnection類是如何完成該操做的,該類中mGet()方法代碼以下:
public List<byte[]> mGet(byte[]... keys) { Assert.noNullElements(keys, "Keys must not contain null elements!"); // 若是進行批量操做的key的slot值相同,表示key都在同一節點上,則直接在key所在的節點執行命令 if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) { return cluster.mget(keys); } // 若是進行批量操做的key的slot值不一樣,表示key不在同一節點上,則須要計算key的slot值,根據slot肯定key所在的節點,而後執行命令 return this.clusterCommandExecutor.executeMuliKeyCommand(new JedisMultiKeyClusterCommandCallback<byte[]>() { @Override public byte[] doInCluster(Jedis client, byte[] key) { return client.get(key); } }, Arrays.asList(keys)).resultsAsListSortBy(keys); }
相似地,在上面代碼中咱們看到了mGet()方法內部調用了ClusterCommandExecutor類的executeMuliKeyCommand()方法。該方法實現剖析以下:
/** * 在一組Redis集羣節點上進行一個或多個key操做 * * @param cmd * @return * @throws ClusterCommandExecutionFailureException */ public <S, T> MulitNodeResult<T> executeMuliKeyCommand(final MultiKeyClusterCommandCallback<S, T> cmd, Iterable<byte[]> keys) { // 節點和key映射Map,一個節點上有多個key Map<RedisClusterNode, Set<byte[]>> nodeKeyMap = new HashMap<RedisClusterNode, Set<byte[]>>(); // 遍歷key集合,將key添加到相應的Redis集羣節點集合中 for (byte[] key : keys) { // 經過getClusterTopology().getKeyServingNodes(key)方法計算key的slot值,而後獲取key所在的Redis集羣節點信息 for (RedisClusterNode node : getClusterTopology().getKeyServingNodes(key)) { if (nodeKeyMap.containsKey(node)) { nodeKeyMap.get(node).add(key); } else { Set<byte[]> keySet = new LinkedHashSet<byte[]>(); keySet.add(key); nodeKeyMap.put(node, keySet); } } } Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<NodeExecution, Future<NodeResult<T>>>(); // 遍歷nodeKeyMap,若是是節點是主節點,則執行相關key的命令操做 for (final Entry<RedisClusterNode, Set<byte[]>> entry : nodeKeyMap.entrySet()) { if (entry.getKey().isMaster()) { for (final byte[] key : entry.getValue()) { futures.put(new NodeExecution(entry.getKey(), key), executor.submit(new Callable<NodeResult<T>>() { @Override public NodeResult<T> call() throws Exception { return executeMultiKeyCommandOnSingleNode(cmd, entry.getKey(), key); } })); } } } return collectResults(futures); }
批量key命令執行流程: