聊聊redisTemplate對lettuce的封裝

本文主要研究一下redisTemplate對lettuce的封裝html

RedisTemplate

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/core/RedisTemplate.javajava

public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware {
	//......
	/**
	 * Executes the given action object within a connection that can be exposed or not. Additionally, the connection can
	 * be pipelined. Note the results of the pipeline are discarded (making it suitable for write-only scenarios).
	 *
	 * @param <T> return type
	 * @param action callback object to execute
	 * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
	 * @param pipeline whether to pipeline or not the connection for the execution
	 * @return object returned by the action
	 */
	@Nullable
	public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {

		Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
		Assert.notNull(action, "Callback object must not be null");

		RedisConnectionFactory factory = getRequiredConnectionFactory();
		RedisConnection conn = null;
		try {

			if (enableTransactionSupport) {
				// only bind resources in case of potential transaction synchronization
				conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
			} else {
				conn = RedisConnectionUtils.getConnection(factory);
			}

			boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);

			RedisConnection connToUse = preProcessConnection(conn, existingConnection);

			boolean pipelineStatus = connToUse.isPipelined();
			if (pipeline && !pipelineStatus) {
				connToUse.openPipeline();
			}

			RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
			T result = action.doInRedis(connToExpose);

			// close pipeline
			if (pipeline && !pipelineStatus) {
				connToUse.closePipeline();
			}

			// TODO: any other connection processing?
			return postProcessResult(result, connToUse, existingConnection);
		} finally {
			RedisConnectionUtils.releaseConnection(conn, factory);
		}
	}

	//......
}
  • redisTemplate內部有諸多方法,這裏以execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline)爲例
  • 該方法內部是先獲取RedisConnection,而後調用action.doInRedis方法
  • 默認這裏的connection是DefaultStringRedisConnection
  • 最後這裏還會調用RedisConnectionUtils.releaseConnection(conn, factory)來釋放鏈接

RedisConnectionUtils.getConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/core/RedisConnectionUtils.javareact

/**
	 * Gets a Redis connection from the given factory. Is aware of and will return any existing corresponding connections
	 * bound to the current thread, for example when using a transaction manager. Will always create a new connection
	 * otherwise.
	 *
	 * @param factory connection factory for creating the connection
	 * @return an active Redis connection without transaction management.
	 */
	public static RedisConnection getConnection(RedisConnectionFactory factory) {
		return getConnection(factory, false);
	}

	/**
	 * Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current
	 * thread, for example when using a transaction manager. Will create a new Connection otherwise, if
	 * {@code allowCreate} is <tt>true</tt>.
	 *
	 * @param factory connection factory for creating the connection
	 * @param allowCreate whether a new (unbound) connection should be created when no connection can be found for the
	 *          current thread
	 * @param bind binds the connection to the thread, in case one was created
	 * @param enableTransactionSupport
	 * @return an active Redis connection
	 */
	public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
			boolean enableTransactionSupport) {

		Assert.notNull(factory, "No RedisConnectionFactory specified");

		RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

		if (connHolder != null) {
			if (enableTransactionSupport) {
				potentiallyRegisterTransactionSynchronisation(connHolder, factory);
			}
			return connHolder.getConnection();
		}

		if (!allowCreate) {
			throw new IllegalArgumentException("No connection found and allowCreate = false");
		}

		if (log.isDebugEnabled()) {
			log.debug("Opening RedisConnection");
		}

		RedisConnection conn = factory.getConnection();

		if (bind) {

			RedisConnection connectionToBind = conn;
			if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
				connectionToBind = createConnectionProxy(conn, factory);
			}

			connHolder = new RedisConnectionHolder(connectionToBind);

			TransactionSynchronizationManager.bindResource(factory, connHolder);
			if (enableTransactionSupport) {
				potentiallyRegisterTransactionSynchronisation(connHolder, factory);
			}

			return connHolder.getConnection();
		}

		return conn;
	}
  • 經過factory.getConnection()獲取conn

DefaultStringRedisConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/DefaultStringRedisConnection.javaios

public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection {

	private static final byte[][] EMPTY_2D_BYTE_ARRAY = new byte[0][];

	private final Log log = LogFactory.getLog(DefaultStringRedisConnection.class);
	private final RedisConnection delegate;
	private final RedisSerializer<String> serializer;
	private Converter<byte[], String> bytesToString = new DeserializingConverter();
	private SetConverter<Tuple, StringTuple> tupleToStringTuple = new SetConverter<>(new TupleConverter());
	private SetConverter<StringTuple, Tuple> stringTupleToTuple = new SetConverter<>(new StringTupleConverter());
	private ListConverter<byte[], String> byteListToStringList = new ListConverter<>(bytesToString);
	private MapConverter<byte[], String> byteMapToStringMap = new MapConverter<>(bytesToString);
	private SetConverter<byte[], String> byteSetToStringSet = new SetConverter<>(bytesToString);
	private Converter<GeoResults<GeoLocation<byte[]>>, GeoResults<GeoLocation<String>>> byteGeoResultsToStringGeoResults;

	@SuppressWarnings("rawtypes") private Queue<Converter> pipelineConverters = new LinkedList<>();
	@SuppressWarnings("rawtypes") private Queue<Converter> txConverters = new LinkedList<>();
	private boolean deserializePipelineAndTxResults = false;
	private IdentityConverter<Object, ?> identityConverter = new IdentityConverter<>();

	//......
	@Override
	public Boolean set(byte[] key, byte[] value) {
		return convertAndReturn(delegate.set(key, value), identityConverter);
	}
	//......
}
  • 該類實際是委託給RedisConnection delegate來執行
  • 對於lettuce的類庫來講,這個delegate是LettuceConnection

LettuceConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnection.javaredis

public class LettuceConnection extends AbstractRedisConnection {
	//......
	@Override
	@Deprecated
	default Boolean set(byte[] key, byte[] value) {
		return stringCommands().set(key, value);
	}

	@Override
	public RedisStringCommands stringCommands() {
		return new LettuceStringCommands(this);
	}

	//......
}
  • 以set(byte[] key, byte[] value)方法爲例,這是調用stringCommands().set方法

LettuceStringCommands

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceStringCommands.javaspring

@RequiredArgsConstructor
class LettuceStringCommands implements RedisStringCommands {
	//......
	@Override
	public Boolean set(byte[] key, byte[] value) {

		Assert.notNull(key, "Key must not be null!");
		Assert.notNull(value, "Value must not be null!");

		try {
			if (isPipelined()) {
				pipeline(
						connection.newLettuceResult(getAsyncConnection().set(key, value), Converters.stringToBooleanConverter()));
				return null;
			}
			if (isQueueing()) {
				transaction(
						connection.newLettuceResult(getAsyncConnection().set(key, value), Converters.stringToBooleanConverter()));
				return null;
			}
			return Converters.stringToBoolean(getConnection().set(key, value));
		} catch (Exception ex) {
			throw convertLettuceAccessException(ex);
		}
	}

	public RedisClusterCommands<byte[], byte[]> getConnection() {
		return connection.getConnection();
	}
	//......
}
  • 這裏直接調用getConnection().set方法,這裏的getConnection(),實際仍是調用LettuceConnection的getConnection方法

LettuceConnection.getConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnection.java異步

public class LettuceConnection extends AbstractRedisConnection {
	//......
	protected RedisClusterCommands<byte[], byte[]> getConnection() {

		if (isQueueing()) {
			return getDedicatedConnection();
		}
		if (asyncSharedConn != null) {

			if (asyncSharedConn instanceof StatefulRedisConnection) {
				return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
			if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
				return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
		}
		return getDedicatedConnection();
	}

	RedisClusterCommands<byte[], byte[]> getDedicatedConnection() {

		if (asyncDedicatedConn == null) {

			asyncDedicatedConn = doGetAsyncDedicatedConnection();

			if (asyncDedicatedConn instanceof StatefulRedisConnection) {
				((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex);
			}
		}

		if (asyncDedicatedConn instanceof StatefulRedisConnection) {
			return ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync();
		}
		if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
			return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncDedicatedConn).sync();
		}

		throw new IllegalStateException(
				String.format("%s is not a supported connection type.", asyncDedicatedConn.getClass().getName()));
	}

	protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
		return connectionProvider.getConnection(StatefulConnection.class);
	}
	//......
}
  • asyncDedicatedConn爲null的話,會調用doGetAsyncDedicatedConnection來獲取StatefulConnection
  • connectionProvider.getConnection裏頭則封裝了鏈接池相關的操做
  • 能夠看到不管是StatefulRedisConnection仍是StatefulRedisClusterConnection,都是調用async鏈接的sync方法轉換爲同步方法

RedisConnectionUtils.releaseConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/core/RedisConnectionUtils.javaasync

/**
	 * Closes the given connection, created via the given factory if not managed externally (i.e. not bound to the
	 * thread).
	 *
	 * @param conn the Redis connection to close.
	 * @param factory the Redis factory that the connection was created with.
	 */
	public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory) {

		if (conn == null) {
			return;
		}

		RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

		if (connHolder != null && connHolder.isTransactionSyncronisationActive()) {
			if (log.isDebugEnabled()) {
				log.debug("Redis Connection will be closed when transaction finished.");
			}
			return;
		}

		// release transactional/read-only and non-transactional/non-bound connections.
		// transactional connections for read-only transactions get no synchronizer registered
		if (isConnectionTransactional(conn, factory) && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
			unbindConnection(factory);
		} else if (!isConnectionTransactional(conn, factory)) {
			if (log.isDebugEnabled()) {
				log.debug("Closing Redis Connection");
			}
			conn.close();
		}
	}
  • 這裏調用的是lettuceConnection的close方法

LettuceConnection.close

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnection.javaide

@Override
	public void close() throws DataAccessException {

		super.close();

		if (isClosed) {
			return;
		}

		isClosed = true;

		if (asyncDedicatedConn != null) {
			try {
				connectionProvider.release(asyncDedicatedConn);
			} catch (RuntimeException ex) {
				throw convertLettuceAccessException(ex);
			}
		}

		if (subscription != null) {
			if (subscription.isAlive()) {
				subscription.doClose();
			}
			subscription = null;
		}

		this.dbIndex = defaultDbIndex;
	}
  • 能夠看到這裏僅僅是對於asyncDedicatedConn進行release操做,對於asyncSharedConn則不作任何操做

小結

  • lettuce是reactive的異步類庫,spring-date-redis的redisTemplate對lettuce的封裝底層是調用asynConn的sync方法來轉換爲同步方法的
  • 另外redisTemplate封裝了connection的獲取及釋放,獲取是調用RedisConnectionUtils.getConnection,釋放是調用RedisConnectionUtils.releaseConnection
  • RedisConnectionUtils.getConnection底層是調用factory.getConnection()獲取conn
  • RedisConnectionUtils.releaseConnection底層是調用LettuceConnection.close,這裏僅僅對asyncDedicatedConn進行release操做

doc

相關文章
相關標籤/搜索