Jedis cluster命令執行流程剖析

在Redis Cluster集羣模式下,因爲key分佈在各個節點上,會形成沒法直接實現mget、sInter等功能。所以,不管咱們使用什麼客戶端來操做Redis,都要考慮單一key命令操做、批量key命令操做和多節點命令操做的狀況,以及效率問題。java

在以前的文章中剖析了Jedis cluster集羣初始化源碼,分析了源碼以後能夠得知,在Jedis中,使用的是JedisClusterConnection集羣鏈接類來與Redis集羣節點進行命令交互,它使用裝飾模式對JedisCluster命令執行類進行了一層包裝,同時對這三種不一樣類型的命令操做作了分類處理。node

下面就看下JedisClusterConnection類中,如何實現這三種類型的key命令操做。在這裏只列舉一些典型的命令進行說明。本文基於spring-data-redis-1.8.4-RELEASE.jar和jedis-2.9.0.jar進行源碼剖析,Redis版本爲Redis 3.2.8。redis

單一key命令操做

對於單一命令操做,經常使用的就是get、set了。在JedisClusterConnection類中,get方法的實現以下:spring

public byte[] get(byte[] key) {
	try {
		return cluster.get(key);
	} catch (Exception ex) {
		throw convertJedisAccessException(ex);
	}
}

在上面代碼中,執行cluster.get()方法時,實際上調用的是BinaryJedisCluster類的get()方法:緩存

public byte[] get(final byte[] key) {
    return new JedisClusterCommand<byte[]>(connectionHandler, maxAttempts) {
      @Override
      public byte[] execute(Jedis connection) {
        return connection.get(key);
      }
    }.runBinary(key);
  }

BinaryJedisCluster類的get()方法的核心操做是由JedisClusterCommand類runBinary()方法完成的,下面剖析一下該類的核心代碼:服務器

public abstract class JedisClusterCommand<T> {

	// 集羣節點鏈接器
	private JedisClusterConnectionHandler connectionHandler;
	// 重試次數,默認5次
	private int maxAttempts;

	// 模板回調方法,執行相關的redis命令
	public abstract T execute(Jedis connection);

   public T runBinary(byte[] key) {
	    if (key == null) {
	      throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
	    }
		
	    return runWithRetries(key, this.maxAttempts, false, false);
  	}

	/**
	 * 利用重試機制運行鍵命令
	 * 
	 * @param key
	 *            要操做的鍵
	 * @param attempts
	 *            重試次數,每重試一次減1
	 * @param tryRandomNode
	 *            標識是否隨機獲取活躍節點鏈接,true爲是,false爲否
	 * @param asking
	 * @return
	 */
	private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
		if (attempts <= 0) {
			throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
		}

		Jedis connection = null;
		try {

			if (asking) {
				// TODO: Pipeline asking with the original command to make it
				// faster....
				connection = askConnection.get();
				connection.asking();

				// if asking success, reset asking flag
				asking = false;
			} else {
				if (tryRandomNode) {
					// 隨機獲取活躍節點鏈接
					connection = connectionHandler.getConnection();
				} else {
					// 計算key的slot值,而後根據slot緩存獲取節點鏈接
					connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
				}
			}

			// 調用具體的模板方法實現執行命令
			return execute(connection);

			// 集羣節點不可達,直接拋出異常
		} catch (JedisNoReachableClusterNodeException jnrcne) {
			throw jnrcne;
		} catch (JedisConnectionException jce) {
			// 在遞歸執行runWithRetries方法以前釋放鏈接
			releaseConnection(connection);
			connection = null;

			// 若是節點不能鏈接,從新初始化slot緩存
			if (attempts <= 1) {
				this.connectionHandler.renewSlotCache();

				throw jce;
			}

			// 出現鏈接錯誤重試執行命令
			return runWithRetries(key, attempts - 1, tryRandomNode, asking);
		} catch (JedisRedirectionException jre) {
			// 若是出現MOVE重定向錯誤,在鏈接上執行cluster slots命令從新初始化slot緩存
			if (jre instanceof JedisMovedDataException) {
				this.connectionHandler.renewSlotCache(connection);
			}

			// 在遞歸執行runWithRetries方法或者重建slot緩存以前釋放鏈接,從而避免在錯誤的鏈接上執行命令,也爲了不鏈接泄露問題
			releaseConnection(connection);
			connection = null;

			if (jre instanceof JedisAskDataException) {
				asking = true;
				askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
			} else if (jre instanceof JedisMovedDataException) {
			} else {
				throw new JedisClusterException(jre);
			}
			// slot初始化後重試執行命令
			return runWithRetries(key, attempts - 1, false, asking);
		} finally {
			//釋放鏈接
			releaseConnection(connection);
		}
	}
}

單一key命令執行流程:dom

  1. 計算slot並根據slots緩存獲取目標節點鏈接,發送命令
  2. 若是出現鏈接錯誤,使用重試機制執行鍵命令,每次命令重試對 attempts參數減1
  3. 捕獲到MOVED重定向錯誤,使用cluster slots命令更新slots 緩存(renewSlotCache方法)
  4. 重複執行1 ~ 3步,直到命令執行成功,或者當attempts <= 0時拋出JedisClusterMaxRedirectionsException異常
  5. 在遞歸執行runWithRetries方法或者重建slot緩存以前釋放鏈接,從而避免在錯誤的鏈接上執行命令,也爲了不鏈接泄露問題

多節點命令操做

在Redis Cluster中,有些命令如keys、flushall和刪除指定模式的鍵這些操做,須要遍歷全部的節點才能夠完成。下面就以keys命令來講明這種狀況下JedisClusterConnection類是如何完成該操做的,該類中keys()方法代碼以下:異步

public Set<byte[]> keys(final byte[] pattern) {

		Assert.notNull(pattern, "Pattern must not be null!");

		//在全部主節點上執行keys命令,而後返回一個Collection集合
		Collection<Set<byte[]>> keysPerNode = clusterCommandExecutor
				.executeCommandOnAllNodes(new JedisClusterCommandCallback<Set<byte[]>>() {

					@Override
					public Set<byte[]> doInCluster(Jedis client) {
						return client.keys(pattern);
					}
				}).resultsAsList();

		//遍歷執行keys命令得到的結果,而後添加進Set集合返回
		Set<byte[]> keys = new HashSet<byte[]>();
		for (Set<byte[]> keySet : keysPerNode) {
			keys.addAll(keySet);
		}
		return keys;
	}

在上面代碼中咱們看到了keys()方法內部調用了ClusterCommandExecutor類的executeCommandOnAllNodes()方法,該類是一個集羣命令執行類,它提供了在多個集羣節點上批量執行命令的特性,因爲考慮到在多個節點上執行命令的效率問題,它使用Spring的org.springframework.core.task包裏面的AsyncTaskExecutor接口來爲命令執行操做提供異步支持,而後返回異步執行結果。ClusterCommandExecutor類的executeCommandOnAllNodes()方法及關聯方法實現剖析以下:ide

/**
	 * 使用ClusterCommandCallback接口實現類的doInCluster()方法在全部可達的主節點上執行命令
	 *
	 * @param cmd
	 * @return
	 * @throws ClusterCommandExecutionFailureException
	 */
	public <S, T> MulitNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCallback<S, T> cmd) {
		// getClusterTopology().getActiveMasterNodes()獲取的是全部的主節點
		return executeCommandAsyncOnNodes(cmd, getClusterTopology().getActiveMasterNodes());
	}
	
	/**
	 * @param callback
	 * @param nodes
	 * @return
	 * @throws ClusterCommandExecutionFailureException
	 * @throws IllegalArgumentException
	 *             in case the node could not be resolved to a topology-known node
	 */
	public <S, T> MulitNodeResult<T> executeCommandAsyncOnNodes(final ClusterCommandCallback<S, T> callback, Iterable<RedisClusterNode> nodes) {

		Assert.notNull(callback, "Callback must not be null!");
		Assert.notNull(nodes, "Nodes must not be null!");

		List<RedisClusterNode> resolvedRedisClusterNodes = new ArrayList<RedisClusterNode>();
		ClusterTopology topology = topologyProvider.getTopology();

		// 遍歷Redis集羣節點集合nodes,獲取節點信息
		for (final RedisClusterNode node : nodes) {
			try {
				resolvedRedisClusterNodes.add(topology.lookup(node));
			} catch (ClusterStateFailureException e) {
				throw new IllegalArgumentException(String.format("Node %s is unknown to cluster", node), e);
			}
		}

		// 遍歷節點信息,在相應Redis集羣節點上執行相關命令
		Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<NodeExecution, Future<NodeResult<T>>>();
		for (final RedisClusterNode node : resolvedRedisClusterNodes) {

			futures.put(new NodeExecution(node), executor.submit(new Callable<NodeResult<T>>() {

				@Override
				public NodeResult<T> call() throws Exception {
					return executeCommandOnSingleNode(callback, node);
				}
			}));
		}

		// 解析執行結果並返回
		return collectResults(futures);
	}
	
	public <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> cmd, RedisClusterNode node) {
		return executeCommandOnSingleNode(cmd, node, 0);
	}

	private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> cmd, RedisClusterNode node, int redirectCount) {

		Assert.notNull(cmd, "ClusterCommandCallback must not be null!");
		Assert.notNull(node, "RedisClusterNode must not be null!");

		if (redirectCount > maxRedirects) {
			throw new TooManyClusterRedirectionsException(String.format(
					"Cannot follow Cluster Redirects over more than %s legs. Please consider increasing the number of redirects to follow. Current value is: %s.",
					redirectCount, maxRedirects));
		}

		RedisClusterNode nodeToUse = lookupNode(node);

		S client = this.resourceProvider.getResourceForSpecificNode(nodeToUse);
		Assert.notNull(client, "Could not acquire resource for node. Is your cluster info up to date?");

		try {
			// 在相應Redis節點上執行命令,具體執行命令的函數是實現ClusterCommandCallback接口的類的doInCluster方法
			return new NodeResult<T>(node, cmd.doInCluster(client));
		} catch (RuntimeException ex) {

			RuntimeException translatedException = convertToDataAccessExeption(ex);
			// 若是請求不被目標服務器接受,則進行重試,從新執行命令:redirectCount + 1
			if (translatedException instanceof ClusterRedirectException) {
				ClusterRedirectException cre = (ClusterRedirectException) translatedException;
				return executeCommandOnSingleNode(cmd, topologyProvider.getTopology().lookup(cre.getTargetHost(), cre.getTargetPort()),
						redirectCount + 1);
			} else {
				throw translatedException != null ? translatedException : ex;
			}
		} finally {
			this.resourceProvider.returnResourceForSpecificNode(nodeToUse, client);
		}
	}

多節點命令執行流程:函數

  1. 使用 getClusterTopology().getActiveMasterNodes()方法獲取全部的可達的主節點。這裏的可達表示主節點能夠被鏈接上且狀態不爲fail狀態
  2. 遍歷全部主節點,而後在這些節點上異步執行相應的命令,最後將結果做爲一個集合返回
  3. 若是請求不被目標服務器接受,則進行重試,從新執行命令,每次對redirectCount + 1,但redirectCount的次數不能大於maxRedirects最大重試次數,大於後會拋出TooManyClusterRedirectionsException異常

批量key命令操做

與keys、flushall等多節點命令類似,mget等批量key操做命令也要遍歷多個節點執行相關命令。下面就以mget命令來講明這種狀況下JedisClusterConnection類是如何完成該操做的,該類中mGet()方法代碼以下:

public List<byte[]> mGet(byte[]... keys) {

		Assert.noNullElements(keys, "Keys must not contain null elements!");

		// 若是進行批量操做的key的slot值相同,表示key都在同一節點上,則直接在key所在的節點執行命令
		if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
			return cluster.mget(keys);
		}

		// 若是進行批量操做的key的slot值不一樣,表示key不在同一節點上,則須要計算key的slot值,根據slot肯定key所在的節點,而後執行命令
		return this.clusterCommandExecutor.executeMuliKeyCommand(new JedisMultiKeyClusterCommandCallback<byte[]>() {

			@Override
			public byte[] doInCluster(Jedis client, byte[] key) {
				return client.get(key);
			}
		}, Arrays.asList(keys)).resultsAsListSortBy(keys);
	}

相似地,在上面代碼中咱們看到了mGet()方法內部調用了ClusterCommandExecutor類的executeMuliKeyCommand()方法。該方法實現剖析以下:

/**
	 * 在一組Redis集羣節點上進行一個或多個key操做
	 *
	 * @param cmd
	 * @return
	 * @throws ClusterCommandExecutionFailureException
	 */
	public <S, T> MulitNodeResult<T> executeMuliKeyCommand(final MultiKeyClusterCommandCallback<S, T> cmd, Iterable<byte[]> keys) {

		// 節點和key映射Map,一個節點上有多個key
		Map<RedisClusterNode, Set<byte[]>> nodeKeyMap = new HashMap<RedisClusterNode, Set<byte[]>>();

		// 遍歷key集合,將key添加到相應的Redis集羣節點集合中
		for (byte[] key : keys) {
			// 經過getClusterTopology().getKeyServingNodes(key)方法計算key的slot值,而後獲取key所在的Redis集羣節點信息
			for (RedisClusterNode node : getClusterTopology().getKeyServingNodes(key)) {

				if (nodeKeyMap.containsKey(node)) {
					nodeKeyMap.get(node).add(key);
				} else {
					Set<byte[]> keySet = new LinkedHashSet<byte[]>();
					keySet.add(key);
					nodeKeyMap.put(node, keySet);
				}
			}
		}

		Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<NodeExecution, Future<NodeResult<T>>>();

		// 遍歷nodeKeyMap,若是是節點是主節點,則執行相關key的命令操做
		for (final Entry<RedisClusterNode, Set<byte[]>> entry : nodeKeyMap.entrySet()) {

			if (entry.getKey().isMaster()) {
				for (final byte[] key : entry.getValue()) {
					futures.put(new NodeExecution(entry.getKey(), key), executor.submit(new Callable<NodeResult<T>>() {

						@Override
						public NodeResult<T> call() throws Exception {
							return executeMultiKeyCommandOnSingleNode(cmd, entry.getKey(), key);
						}
					}));
				}
			}
		}

		return collectResults(futures);
	}

批量key命令執行流程:

  1. 先使用ClusterSlotHashUtil.isSameSlotForAllKeys()方法計算出這些key的slot值,接下來判斷若是進行批量操做的key的slot值相同,表示key都在同一節點上,則直接在key所在的節點執行命令。不然,執行第2步
  2. 若是進行批量操做的key的slot值不一樣,表示key不在同一節點上,則須要計算key的slot值,根據slot肯定key所在的節點,而後在該節點上執行命令,最後封裝結果到集合裏面返回

總結

  1. 不管是哪一種類型的key操做,都是在Redis集羣的主節點上執行命令的。這跟Redis Cluster集羣的特性有關,Redis通常不容許在從節點上進行讀寫操做,在JedisClusterInfoCache類中,slots這個Map本地緩存保存的也是slot槽和主節點的鏈接池信息
  2. 對於keys等多節點命令來講,不須要計算key的slot值,只須要遍歷所有主節點而後執行命令便可
  3. 對於單一key和批量key命令操做來講,須要計算key的slot值,根據slot肯定key所在的節點,而後在該節點上執行命令
  4. Redis使用單線程模式執行命令,每一次命令執行須要通過發送命令、執行命令、返回結果三個階段。在集羣條件下,不一樣節點執行命令的效率是不一樣的,對於多節點命令和批量key命令操做,考慮到命令執行時間過長,可能致使其它命令阻塞的狀況,客戶端須要在命令執行時提供異步支持
相關文章
相關標籤/搜索