public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware { //...... /** * Executes the given action object within a connection that can be exposed or not. Additionally, the connection can * be pipelined. Note the results of the pipeline are discarded (making it suitable for write-only scenarios). * * @param <T> return type * @param action callback object to execute * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code * @param pipeline whether to pipeline or not the connection for the execution * @return object returned by the action */ @Nullable public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = null; try { if (enableTransactionSupport) { // only bind resources in case of potential transaction synchronization conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); } else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if (pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); T result = action.doInRedis(connToExpose); // close pipeline if (pipeline && !pipelineStatus) { connToUse.closePipeline(); } // TODO: any other connection processing? return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory); } } //...... }
/** * Gets a Redis connection from the given factory. Is aware of and will return any existing corresponding connections * bound to the current thread, for example when using a transaction manager. Will always create a new connection * otherwise. * * @param factory connection factory for creating the connection * @return an active Redis connection without transaction management. */ public static RedisConnection getConnection(RedisConnectionFactory factory) { return getConnection(factory, false); } /** * Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current * thread, for example when using a transaction manager. Will create a new Connection otherwise, if * {@code allowCreate} is <tt>true</tt>. * * @param factory connection factory for creating the connection * @param allowCreate whether a new (unbound) connection should be created when no connection can be found for the * current thread * @param bind binds the connection to the thread, in case one was created * @param enableTransactionSupport * @return an active Redis connection */ public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport) { Assert.notNull(factory, "No RedisConnectionFactory specified"); RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); if (connHolder != null) { if (enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } if (!allowCreate) { throw new IllegalArgumentException("No connection found and allowCreate = false"); } if (log.isDebugEnabled()) { log.debug("Opening RedisConnection"); } RedisConnection conn = factory.getConnection(); if (bind) { RedisConnection connectionToBind = conn; if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) { connectionToBind = createConnectionProxy(conn, factory); } connHolder = new RedisConnectionHolder(connectionToBind); TransactionSynchronizationManager.bindResource(factory, connHolder); if (enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } return conn; }
public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection { private static final byte[][] EMPTY_2D_BYTE_ARRAY = new byte[0][]; private final Log log = LogFactory.getLog(DefaultStringRedisConnection.class); private final RedisConnection delegate; private final RedisSerializer<String> serializer; private Converter<byte[], String> bytesToString = new DeserializingConverter(); private SetConverter<Tuple, StringTuple> tupleToStringTuple = new SetConverter<>(new TupleConverter()); private SetConverter<StringTuple, Tuple> stringTupleToTuple = new SetConverter<>(new StringTupleConverter()); private ListConverter<byte[], String> byteListToStringList = new ListConverter<>(bytesToString); private MapConverter<byte[], String> byteMapToStringMap = new MapConverter<>(bytesToString); private SetConverter<byte[], String> byteSetToStringSet = new SetConverter<>(bytesToString); private Converter<GeoResults<GeoLocation<byte[]>>, GeoResults<GeoLocation<String>>> byteGeoResultsToStringGeoResults; @SuppressWarnings("rawtypes") private Queue<Converter> pipelineConverters = new LinkedList<>(); @SuppressWarnings("rawtypes") private Queue<Converter> txConverters = new LinkedList<>(); private boolean deserializePipelineAndTxResults = false; private IdentityConverter<Object, ?> identityConverter = new IdentityConverter<>(); //...... @Override public Boolean set(byte[] key, byte[] value) { return convertAndReturn(delegate.set(key, value), identityConverter); } //...... }
public class LettuceConnection extends AbstractRedisConnection { //...... @Override @Deprecated default Boolean set(byte[] key, byte[] value) { return stringCommands().set(key, value); } @Override public RedisStringCommands stringCommands() { return new LettuceStringCommands(this); } //...... }
@RequiredArgsConstructor class LettuceStringCommands implements RedisStringCommands { //...... @Override public Boolean set(byte[] key, byte[] value) { Assert.notNull(key, "Key must not be null!"); Assert.notNull(value, "Value must not be null!"); try { if (isPipelined()) { pipeline( connection.newLettuceResult(getAsyncConnection().set(key, value), Converters.stringToBooleanConverter())); return null; } if (isQueueing()) { transaction( connection.newLettuceResult(getAsyncConnection().set(key, value), Converters.stringToBooleanConverter())); return null; } return Converters.stringToBoolean(getConnection().set(key, value)); } catch (Exception ex) { throw convertLettuceAccessException(ex); } } public RedisClusterCommands<byte[], byte[]> getConnection() { return connection.getConnection(); } //...... }
public class LettuceConnection extends AbstractRedisConnection { //...... protected RedisClusterCommands<byte[], byte[]> getConnection() { if (isQueueing()) { return getDedicatedConnection(); } if (asyncSharedConn != null) { if (asyncSharedConn instanceof StatefulRedisConnection) { return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).sync(); } if (asyncSharedConn instanceof StatefulRedisClusterConnection) { return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).sync(); } } return getDedicatedConnection(); } RedisClusterCommands<byte[], byte[]> getDedicatedConnection() { if (asyncDedicatedConn == null) { asyncDedicatedConn = doGetAsyncDedicatedConnection(); if (asyncDedicatedConn instanceof StatefulRedisConnection) { ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex); } } if (asyncDedicatedConn instanceof StatefulRedisConnection) { return ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync(); } if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) { return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncDedicatedConn).sync(); } throw new IllegalStateException( String.format("%s is not a supported connection type.", asyncDedicatedConn.getClass().getName())); } protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() { return connectionProvider.getConnection(StatefulConnection.class); } //...... }
/** * Closes the given connection, created via the given factory if not managed externally (i.e. not bound to the * thread). * * @param conn the Redis connection to close. * @param factory the Redis factory that the connection was created with. */ public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory) { if (conn == null) { return; } RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); if (connHolder != null && connHolder.isTransactionSyncronisationActive()) { if (log.isDebugEnabled()) { log.debug("Redis Connection will be closed when transaction finished."); } return; } // release transactional/read-only and non-transactional/non-bound connections. // transactional connections for read-only transactions get no synchronizer registered if (isConnectionTransactional(conn, factory) && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { unbindConnection(factory); } else if (!isConnectionTransactional(conn, factory)) { if (log.isDebugEnabled()) { log.debug("Closing Redis Connection"); } conn.close(); } }
@Override public void close() throws DataAccessException { super.close(); if (isClosed) { return; } isClosed = true; if (asyncDedicatedConn != null) { try { connectionProvider.release(asyncDedicatedConn); } catch (RuntimeException ex) { throw convertLettuceAccessException(ex); } } if (subscription != null) { if (subscription.isAlive()) { subscription.doClose(); } subscription = null; } this.dbIndex = defaultDbIndex; }