redis客戶端jedis&spring-data-redis源碼賞析

背景

本文主要對當下開源流行的redis客戶端jedis和spring-data-redis的部分核心源碼進行剖析,記錄一下怎麼去實現一個redis的java客戶端以及在使用redis集羣時客戶端的操做須要注意的要點。java

版本

jedis:v2.9.0、node

spring-data-redis:v2.0.8.RELEASEios

源代碼的分析

  • 先看最核心的入口類:org.springframework.data.redis.core.RedisTemplate

它繼承於org.springframework.data.redis.core.RedisAccessor,主要是設置org.springframework.data.redis.connection.RedisConnectionFactory,並在spring bean初始化完畢對connectionFactory進行爲空校驗;redis

實現的核心接口org.springframework.data.redis.core.RedisOperations主要提供了一些基礎操做,但這個接口並不常用,由於redis的數據結構比較複雜,更多具體的操做都封裝在了ValueOperations、ListOperations等,以及BoundValueOperations、BoundListOperations等這兩類操做中,這兩類操做的主要區別在因而否綁定key了,BoundXX接口繼承了BoundKeyOperations這個對key基礎操做的接口。spring

與redis服務端的交互全部操做幾乎都調用了以下方法:緩存

/**
	 * Executes the given action object within a connection that can be exposed or not. Additionally, the connection can
	 * be pipelined. Note the results of the pipeline are discarded (making it suitable for write-only scenarios).
	 *
	 * @param <T> return type
	 * @param action callback object to execute
	 * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
	 * @param pipeline whether to pipeline or not the connection for the execution
	 * @return object returned by the action
	 */
	@Nullable
	public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {

		Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
		Assert.notNull(action, "Callback object must not be null");

		RedisConnectionFactory factory = getRequiredConnectionFactory();
		RedisConnection conn = null;
		try {

			if (enableTransactionSupport) {
				// only bind resources in case of potential transaction synchronization
				conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
			} else {
				conn = RedisConnectionUtils.getConnection(factory);
			}

			boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);

			RedisConnection connToUse = preProcessConnection(conn, existingConnection);

			boolean pipelineStatus = connToUse.isPipelined();
			if (pipeline && !pipelineStatus) {
				connToUse.openPipeline();
			}

			RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
			T result = action.doInRedis(connToExpose);

			// close pipeline
			if (pipeline && !pipelineStatus) {
				connToUse.closePipeline();
			}

			// TODO: any other connection processing?
			return postProcessResult(result, connToUse, existingConnection);
		} finally {
			RedisConnectionUtils.releaseConnection(conn, factory);
		}
	}

咱們能夠看到:數據結構

  1. 對事務、管道單獨作了一些程序邏輯處理;
  2. 不少初始化的操做都在spring容器啓動完成時執行的;
  3. 有對鏈接選擇是否進行生成代理鏈接的操做(通常,直接調用redisTemplate使用代理;使用各類operations間接處理各自的數據結構時並不使用代理鏈接進行操做);
  4. 利用匿名回調函數做爲傳參進行切面操做(鏈接的創建、數據處理、鏈接的釋放):
Connection conn = null;
try {
   conn = connectionFactory.getConnection();   
   // handle data
   // ...

} finally {
   if (conn != null)
      conn.close();// or release, or disconnect
}
  • 鏈接的創建和釋放(jedis、jedisSentinel、jedisCluster的比較)
/*
	 * (non-Javadoc)
	 * @see org.springframework.data.redis.connection.RedisConnectionFactory#getConnection()
	 */
	public RedisConnection getConnection() {

		if (isRedisClusterAware()) {// 若是clusterConfiguration不爲空,那麼使用集羣模式鏈接
			return getClusterConnection();
		}

		Jedis jedis = fetchJedisConnector();// 不然,用Jedis單個節點進行鏈接(是否用鏈接池技術能夠進行設置)
		String clientName = clientConfiguration.getClientName().orElse(null);
		JedisConnection connection = (getUsePool() ? new JedisConnection(jedis, pool, getDatabase(), clientName)
				: new JedisConnection(jedis, null, getDatabase(), clientName));
		connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
		return postProcessConnection(connection);
	}

集羣模式的鏈接創建:app

/*
	 * (non-Javadoc)
	 * @see org.springframework.data.redis.connection.RedisConnectionFactory#getClusterConnection()
	 */
	@Override
	public RedisClusterConnection getClusterConnection() {

		if (!isRedisClusterAware()) {
			throw new InvalidDataAccessApiUsageException("Cluster is not configured!");
		}
		return new JedisClusterConnection(cluster, clusterCommandExecutor);
	}// 將當前類的JedisCluster對象傳遞給一個新的JedisClusterConnection
// 而咱們又發現:JedisConnectionFactory implements InitializingBean

/*
	 * (non-Javadoc)
	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
	 */
	public void afterPropertiesSet() {

...............
..............



		if (isRedisClusterAware()) {
			this.cluster = createCluster();
		}
	}


// 再看createCluster():
...
private JedisCluster createCluster() {

		JedisCluster cluster = createCluster(this.clusterConfig, getPoolConfig());
		JedisClusterTopologyProvider topologyProvider = new JedisClusterTopologyProvider(cluster);
		this.clusterCommandExecutor = new ClusterCommandExecutor(topologyProvider,
				new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster, topologyProvider), EXCEPTION_TRANSLATION);
		return cluster;
	}

	/**
	 * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}.
	 *
	 * @param clusterConfig must not be {@literal null}.
	 * @param poolConfig can be {@literal null}.
	 * @return the actual {@link JedisCluster}.
	 * @since 1.7
	 */
	protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) {

		Assert.notNull(clusterConfig, "Cluster configuration must not be null!");

		Set<HostAndPort> hostAndPort = new HashSet<>();
		for (RedisNode node : clusterConfig.getClusterNodes()) {
			hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()));
		}

		int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects() : 5;

		int connectTimeout = getConnectTimeout();
		int readTimeout = getReadTimeout();

		return StringUtils.hasText(getPassword())
				? new JedisCluster(hostAndPort, connectTimeout, readTimeout, redirects, getPassword(), poolConfig)
				: new JedisCluster(hostAndPort, connectTimeout, readTimeout, redirects, poolConfig);
	}

// 能夠看出在spring容器初始化完成以後執行了當前類中JedisCluster對象的實例化

單節點模式的鏈接創建:dom

/**
	 * Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a
	 * pool.
	 *
	 * @return Jedis instance ready for wrapping into a {@link RedisConnection}.
	 */
	protected Jedis fetchJedisConnector() {
		try {

			if (getUsePool() && pool != null) {
				return pool.getResource();
			}

			Jedis jedis = createJedis();
			// force initialization (see Jedis issue #82)
			jedis.connect();

			potentiallySetClientName(jedis);
			return jedis;
		} catch (Exception ex) {
			throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);
		}
	}

	private Jedis createJedis() {

		if (providedShardInfo) {
			return new Jedis(getShardInfo());
		}
        // 能夠看出,每次都從新創建新的客戶端鏈接
		Jedis jedis = new Jedis(getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), isUseSsl(),
				clientConfiguration.getSslSocketFactory().orElse(null), //
				clientConfiguration.getSslParameters().orElse(null), //
				clientConfiguration.getHostnameVerifier().orElse(null));

		Client client = jedis.getClient();

		getRedisPassword().map(String::new).ifPresent(client::setPassword);
		client.setDb(getDatabase());

		return jedis;
	}

再看兩種模式下鏈接的關閉:socket

/*
	 * (non-Javadoc)
	 * @see org.springframework.beans.factory.DisposableBean#destroy()
	 */
	public void destroy() {
        // spring容器銷燬時,鏈接池和jediscluster進行銷燬或關閉(若是有的話)
		if (getUsePool() && pool != null) {

			try {
				pool.destroy();
			} catch (Exception ex) {
				log.warn("Cannot properly close Jedis pool", ex);
			}
			pool = null;
		}

		if (cluster != null) {

			try {
				cluster.close();
			} catch (Exception ex) {
				log.warn("Cannot properly close Jedis cluster", ex);
			}

			try {
				clusterCommandExecutor.destroy();
			} catch (Exception ex) {
				log.warn("Cannot properly close cluster command executor", ex);
			}
		}
	}

除此以外,單節點模式下的關閉:org.springframework.data.redis.connection.jedis.JedisConnection#close;

public void close() throws DataAccessException {
		super.close();
		// return the connection to the pool
		if (pool != null) {
			if (!broken) {
				// reset the connection
				try {
					if (dbIndex > 0) {
						jedis.select(0);
					}
					pool.returnResource(jedis);
					return;
				} catch (Exception ex) {
					DataAccessException dae = convertJedisAccessException(ex);
					if (broken) {
						pool.returnBrokenResource(jedis);
					} else {
						pool.returnResource(jedis);
					}
					throw dae;
				}
			} else {
				pool.returnBrokenResource(jedis);
				return;
			}
		}
		// else close the connection normally (doing the try/catch dance)
		Exception exc = null;
		if (isQueueing()) {
			try {
				client.quit();
			} catch (Exception ex) {
				exc = ex;
			}
			try {
				client.disconnect();
			} catch (Exception ex) {
				exc = ex;
			}
			return;
		}
		try {
			jedis.quit();
		} catch (Exception ex) {
			exc = ex;
		}
		try {
			jedis.disconnect();
		} catch (Exception ex) {
			exc = ex;
		}
		if (exc != null)
			throw convertJedisAccessException(exc);
	}

集羣模式下的關閉:org.springframework.data.redis.connection.jedis.JedisClusterConnection#close

/*
	 * (non-Javadoc)
	 * @see org.springframework.data.redis.connection.RedisConnection#close()
	 */
	@Override
	public void close() throws DataAccessException {

		if (!closed && disposeClusterCommandExecutorOnClose) {
			try {
				clusterCommandExecutor.destroy();
			} catch (Exception ex) {
				log.warn("Cannot properly close cluster command executor", ex);
			}
		}

		closed = true;// 僅僅是設置了一下狀態,頂多銷燬一下相關的bean,bean銷燬的代碼以下:
	}

...
...

org.springframework.data.redis.connection.ClusterCommandExecutor#destroy

/*
	 * (non-Javadoc)
	 * @see org.springframework.beans.factory.DisposableBean#destroy()
	 */
	@Override
	public void destroy() throws Exception {

		if (executor instanceof DisposableBean) {
			((DisposableBean) executor).destroy();
		}

		if (resourceProvider instanceof DisposableBean) {
			((DisposableBean) resourceProvider).destroy();
		}
	}
  • 繼續追蹤jediscluster對鏈接的創建和釋放:
redis.clients.jedis.JedisClusterCommand#runWithRetries:
// 這裏又看到了try..finally操做,對鏈接進行創建和釋放
// JedisCluster繼承於BinaryJedisCluster
// BinaryJedisCluster又使用抽象類JedisClusterCommand實現的

  private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
    if (attempts <= 0) {
      throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
    }

    Jedis connection = null;
    try {

      if (asking) {
        // TODO: Pipeline asking with the original command to make it
        // faster....
        connection = askConnection.get();
        connection.asking();

        // if asking success, reset asking flag
        asking = false;
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
        }
      }

      return execute(connection);

    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {
      // release current connection before recursion
      releaseConnection(connection);
      connection = null;

      if (attempts <= 1) {
        //We need this because if node is not reachable anymore - we need to finally initiate slots renewing,
        //or we can stuck with cluster state without one node in opposite case.
        //But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request.
        //TODO make tracking of successful/unsuccessful operations for node - do renewing only
        //if there were no successful responses from this node last few seconds
        this.connectionHandler.renewSlotCache();

        //no more redirections left, throw original exception, not JedisClusterMaxRedirectionsException, because it's not MOVED situation
        throw jce;
      }

      return runWithRetries(key, attempts - 1, tryRandomNode, asking);
    } catch (JedisRedirectionException jre) {
      // if MOVED redirection occurred,
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache
        // recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }

      // release current connection before recursion or renewing
      releaseConnection(connection);
      connection = null;

      if (jre instanceof JedisAskDataException) {
        asking = true;
        askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
      } else if (jre instanceof JedisMovedDataException) {
      } else {
        throw new JedisClusterException(jre);
      }

      return runWithRetries(key, attempts - 1, false, asking);
    } finally {
      releaseConnection(connection);
    }
  }

補充

Jedis的實現:

// 由Jedis extends BinaryJedis查看一下redis.clients.jedis.BinaryJedis中引用了redis.clients.jedis.Client對象進行數據交互;
// 由Client extends BinaryClient,且BinaryClient extends Connection能夠知道鏈接的開啓和關閉:

// redis.clients.jedis.Connection使用Socket與服務端進行通訊

public void connect() {
    if (!isConnected()) {
      try {
        socket = new Socket();
        // ->@wjw_add
        socket.setReuseAddress(true);
        socket.setKeepAlive(true); // Will monitor the TCP connection is
        // valid
        socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
        // ensure timely delivery of data
        socket.setSoLinger(true, 0); // Control calls close () method,
        // the underlying socket is closed
        // immediately
        // <-@wjw_add

        socket.connect(new InetSocketAddress(host, port), connectionTimeout);
        socket.setSoTimeout(soTimeout);

        if (ssl) {
          if (null == sslSocketFactory) {
            sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
          }
          socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true);
          if (null != sslParameters) {
            ((SSLSocket) socket).setSSLParameters(sslParameters);
          }
          if ((null != hostnameVerifier) &&
              (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) {
            String message = String.format(
                "The connection to '%s' failed ssl/tls hostname verification.", host);
            throw new JedisConnectionException(message);
          }
        }

        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException(ex);
      }
    }
  }

  @Override
  public void close() {
    disconnect();
  }

  public void disconnect() {
    if (isConnected()) {
      try {
        outputStream.flush();
        socket.close();
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException(ex);
      } finally {
        IOUtils.closeQuietly(socket);
      }
    }
  }

總結

  • RedisCluster的出現(主要是redis3.0及之後),實現了:自動分割數據到不一樣的節點,加強集羣的可用性;可是同時它致使了Redis集羣並不支持處理多個keys的命令,由於這須要在不一樣的節點間移動數據,從而達不到像Redis那樣的性能,在高負載的狀況下可能會致使不可預料的錯誤.
  • redis-cli 對集羣的支持是很是基本的, 因此它老是依靠 Redis 集羣節點來將它轉向(redirect)至正確的節點。一個真正的(serious)集羣客戶端應該作得比這更好: 它應該用緩存記錄起哈希槽與節點地址之間的映射(map), 從而直接將命令發送到正確的節點上面。這種映射只會在集羣的配置出現某些修改時變化, 好比說, 在一次故障轉移(failover)以後, 或者系統管理員經過添加節點或移除節點來修改了集羣的佈局(layout)以後, 諸如此類。
  • 基於前兩點的考慮,咱們能夠將redis集羣不支持的一些命令和功能如:mset、mget、rename、keys、scan、事務、管道、腳本等經過在客戶端實現以hash槽和master數據節點進行分組,變相實現這些功能,固然這須要結合所處的業務去考慮,由於分組後的結果集可能由多個節點多步操做才能完成,致使與最初命令執行的原子性有所差別。
  • Jedis對rediscluster的封裝仍是略微有些粗糙的,JedisCluster內部必須使用鏈接池,並且每次調用都睡獲取鏈接和釋放鏈接,若是想要保持多步操做使用同一個鏈接(好比事務),還須要對鏈接和用戶線程進行綁定,以防止:一、下次拿錯鏈接(其餘不一樣節點);二、鏈接頻繁創建和關閉或取出和釋放(從鏈接池)產生多餘的資源消耗。
相關文章
相關標籤/搜索