class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean { private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class); private final LettuceConnectionProvider connectionProvider; private final GenericObjectPoolConfig poolConfig; private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32); private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32); LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) { Assert.notNull(connectionProvider, "ConnectionProvider must not be null!"); Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); this.connectionProvider = connectionProvider; this.poolConfig = clientConfiguration.getPoolConfig(); } public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) { GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> { return ConnectionPoolSupport.createGenericObjectPool(() -> { return this.connectionProvider.getConnection(connectionType); }, this.poolConfig, false); }); try { StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject(); this.poolRef.put(connection, pool); return (StatefulConnection)connectionType.cast(connection); } catch (Exception var4) { throw new PoolException("Could not get a resource from the pool", var4); } } public AbstractRedisClient getRedisClient() { if (this.connectionProvider instanceof RedisClientProvider) { return ((RedisClientProvider)this.connectionProvider).getRedisClient(); } else { throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName())); } } public void release(StatefulConnection<?, ?> connection) { GenericObjectPool<StatefulConnection<?, ?>> pool = (GenericObjectPool)this.poolRef.remove(connection); if (pool == null) { throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider"); } else { pool.returnObject(connection); } } public void destroy() throws Exception { if (!this.poolRef.isEmpty()) { log.warn("LettucePoolingConnectionProvider contains unreleased connections"); this.poolRef.forEach((connection, pool) -> { pool.returnObject(connection); }); this.poolRef.clear(); } this.pools.forEach((type, pool) -> { pool.close(); }); this.pools.clear(); } }
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool( Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) { LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null"); LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null"); AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>(); GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) { @Override public T borrowObject() throws Exception { return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject(); } @Override public void returnObject(T obj) { if (wrapConnections && obj instanceof HasTargetConnection) { super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection()); return; } super.returnObject(obj); } }; poolRef.set(pool); return pool; }
private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> { private final Supplier<T> connectionSupplier; RedisPooledObjectFactory(Supplier<T> connectionSupplier) { this.connectionSupplier = connectionSupplier; } @Override public T create() throws Exception { return connectionSupplier.get(); } @Override public void destroyObject(PooledObject<T> p) throws Exception { p.getObject().close(); } @Override public PooledObject<T> wrap(T obj) { return new DefaultPooledObject<>(obj); } @Override public boolean validateObject(PooledObject<T> p) { return p.getObject().isOpen(); } }
public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class); private Duration timeout; private CloseEvents closeEvents = new CloseEvents(); private final RedisChannelWriter channelWriter; private final boolean debugEnabled = logger.isDebugEnabled(); private volatile boolean closed; private volatile boolean active = true; private volatile ClientOptions clientOptions; //...... /** * Notification when the connection becomes active (connected). */ public void activated() { active = true; closed = false; } /** * Notification when the connection becomes inactive (disconnected). */ public void deactivated() { active = false; } /** * * @return true if the connection is active and not closed. */ public boolean isOpen() { return active; } @Override public synchronized void close() { if (debugEnabled) { logger.debug("close()"); } if (closed) { logger.warn("Connection is already closed"); return; } if (!closed) { active = false; closed = true; channelWriter.close(); closeEvents.fireEventClosed(this); closeEvents = new CloseEvents(); } } }
/** * Validate the connection. Invalid connections will be closed and the connection state will be reset. */ void validateConnection() { synchronized (this.connectionMonitor) { boolean valid = false; if (connection != null && connection.isOpen()) { try { if (connection instanceof StatefulRedisConnection) { ((StatefulRedisConnection) connection).sync().ping(); } if (connection instanceof StatefulRedisClusterConnection) { ((StatefulRedisConnection) connection).sync().ping(); } valid = true; } catch (Exception e) { log.debug("Validation failed", e); } } if (!valid) { if (connection != null) { connectionProvider.release(connection); } log.warn("Validation of shared connection failed. Creating a new connection."); resetConnection(); this.connection = getNativeConnection(); } } }
private static class LettuceFactory extends BasePooledObjectFactory<StatefulConnection<byte[], byte[]>> { private final RedisClient client; private int dbIndex; public LettuceFactory(RedisClient client, int dbIndex) { this.client = client; this.dbIndex = dbIndex; } public void activateObject(PooledObject<StatefulConnection<byte[], byte[]>> pooledObject) throws Exception { if (pooledObject.getObject() instanceof StatefulRedisConnection) { ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex); } } public void destroyObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) throws Exception { try { ((StatefulConnection)obj.getObject()).close(); } catch (Exception var3) { ; } } public boolean validateObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) { try { if (obj.getObject() instanceof StatefulRedisConnection) { ((StatefulRedisConnection)obj.getObject()).sync().ping(); } return true; } catch (Exception var3) { return false; } } public StatefulConnection<byte[], byte[]> create() throws Exception { return this.client.connect(LettuceConnection.CODEC); } public PooledObject<StatefulConnection<byte[], byte[]>> wrap(StatefulConnection<byte[], byte[]> obj) { return new DefaultPooledObject(obj); } }
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { //...... private Pool<Jedis> createPool() { if (isRedisSentinelAware()) { return createRedisSentinelPool(this.sentinelConfig); } return createRedisPool(); } /** * Creates {@link JedisSentinelPool}. * * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}. * @return the {@link Pool} to use. Never {@literal null}. * @since 1.4 */ protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) { GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig(); return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()), poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName()); } /** * Creates {@link JedisPool}. * * @return the {@link Pool} to use. Never {@literal null}. * @since 1.4 */ protected Pool<Jedis> createRedisPool() { return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName(), isUseSsl(), clientConfiguration.getSslSocketFactory().orElse(null), // clientConfiguration.getSslParameters().orElse(null), // clientConfiguration.getHostnameVerifier().orElse(null)); } //...... }
class JedisFactory implements PooledObjectFactory<Jedis> { private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>(); private final int connectionTimeout; private final int soTimeout; private final String password; private final int database; private final String clientName; private final boolean ssl; private final SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; //...... @Override public boolean validateObject(PooledObject<Jedis> pooledJedis) { final BinaryJedis jedis = pooledJedis.getObject(); try { HostAndPort hostAndPort = this.hostAndPort.get(); String connectionHost = jedis.getClient().getHost(); int connectionPort = jedis.getClient().getPort(); return hostAndPort.getHost().equals(connectionHost) && hostAndPort.getPort() == connectionPort && jedis.isConnected() && jedis.ping().equals("PONG"); } catch (final Exception e) { return false; } } }