本文主要研究一下lettuce的shareNativeConnection參數java
spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.javareact
public class LettuceConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory { private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy( LettuceConverters.exceptionConverter()); private final Log log = LogFactory.getLog(getClass()); private final LettuceClientConfiguration clientConfiguration; private @Nullable AbstractRedisClient client; private @Nullable LettuceConnectionProvider connectionProvider; private @Nullable LettuceConnectionProvider reactiveConnectionProvider; private boolean validateConnection = false; private boolean shareNativeConnection = true; private @Nullable SharedConnection<byte[]> connection; private @Nullable SharedConnection<ByteBuffer> reactiveConnection; private @Nullable LettucePool pool; /** Synchronization monitor for the shared Connection */ private final Object connectionMonitor = new Object(); private boolean convertPipelineAndTxResults = true; private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379); private @Nullable RedisSentinelConfiguration sentinelConfiguration; private @Nullable RedisClusterConfiguration clusterConfiguration; private @Nullable ClusterCommandExecutor clusterCommandExecutor; //...... @Override public LettuceReactiveRedisConnection getReactiveConnection() { return getShareNativeConnection() ? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider) : new LettuceReactiveRedisConnection(reactiveConnectionProvider); } @Override public LettuceReactiveRedisClusterConnection getReactiveClusterConnection() { if (!isClusterAware()) { throw new InvalidDataAccessApiUsageException("Cluster is not configured!"); } RedisClusterClient client = (RedisClusterClient) this.client; return getShareNativeConnection() ? new LettuceReactiveRedisClusterConnection(getSharedReactiveConnection(), reactiveConnectionProvider, client) : new LettuceReactiveRedisClusterConnection(reactiveConnectionProvider, client); } /** * Indicates if multiple {@link LettuceConnection}s should share a single native connection. * * @return native connection shared. */ public boolean getShareNativeConnection() { return shareNativeConnection; } /** * @return the shared connection using {@link ByteBuffer} encoding for reactive API use. {@literal null} if * {@link #getShareNativeConnection() connection sharing} is disabled. * @since 2.0.1 */ @Nullable protected StatefulConnection<ByteBuffer, ByteBuffer> getSharedReactiveConnection() { return shareNativeConnection ? getOrCreateSharedReactiveConnection().getConnection() : null; } private SharedConnection<ByteBuffer> getOrCreateSharedReactiveConnection() { synchronized (this.connectionMonitor) { if (this.reactiveConnection == null) { this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider, true); } return this.reactiveConnection; } } }
spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.javagit
/** * Wrapper for shared connections. Keeps track of the connection lifecycleThe wrapper is thread-safe as it * synchronizes concurrent calls by blocking. * * @param <E> connection encoding. * @author Mark Paluch * @author Christoph Strobl * @since 2.1 */ @RequiredArgsConstructor class SharedConnection<E> { private final LettuceConnectionProvider connectionProvider; private final boolean shareNativeClusterConnection; /** Synchronization monitor for the shared Connection */ private final Object connectionMonitor = new Object(); private @Nullable StatefulConnection<E, E> connection; /** * Returns a valid Lettuce connection. Initializes and validates the connection if * {@link #setValidateConnection(boolean) enabled}. * * @return the connection. */ @Nullable StatefulConnection<E, E> getConnection() { synchronized (this.connectionMonitor) { if (this.connection == null) { this.connection = getNativeConnection(); } if (getValidateConnection()) { validateConnection(); } return this.connection; } } /** * Obtain a connection from the associated {@link LettuceConnectionProvider}. * * @return the connection. */ @Nullable private StatefulConnection<E, E> getNativeConnection() { try { if (isClusterAware() && !shareNativeClusterConnection) { return null; } StatefulConnection<E, E> connection = connectionProvider.getConnection(StatefulConnection.class); if (connection instanceof StatefulRedisConnection && getDatabase() > 0) { ((StatefulRedisConnection) connection).sync().select(getDatabase()); } return connection; } catch (RedisException e) { throw new RedisConnectionFailureException("Unable to connect to Redis", e); } } /** * Validate the connection. Invalid connections will be closed and the connection state will be reset. */ void validateConnection() { synchronized (this.connectionMonitor) { boolean valid = false; if (connection != null && connection.isOpen()) { try { if (connection instanceof StatefulRedisConnection) { ((StatefulRedisConnection) connection).sync().ping(); } if (connection instanceof StatefulRedisClusterConnection) { ((StatefulRedisConnection) connection).sync().ping(); } valid = true; } catch (Exception e) { log.debug("Validation failed", e); } } if (!valid) { if (connection != null) { connectionProvider.release(connection); } log.warn("Validation of shared connection failed. Creating a new connection."); resetConnection(); this.connection = getNativeConnection(); } } } /** * Reset the underlying shared Connection, to be reinitialized on next access. */ void resetConnection() { synchronized (this.connectionMonitor) { if (this.connection != null) { this.connectionProvider.release(this.connection); } this.connection = null; } } }
若是使用鏈接池的話,則從新borrow一次
)spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.javagithub
@Override public void release(StatefulConnection<?, ?> connection) { GenericObjectPool<StatefulConnection<?, ?>> pool = poolRef.remove(connection); if (pool == null) { throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider"); } pool.returnObject(connection); }
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/protocol/ConnectionWatchdog.javaredis
/** * A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost. * * @author Will Glozer * @author Mark Paluch * @author Koji Lin */ @ChannelHandler.Sharable public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { //...... @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.debug("{} channelInactive()", logPrefix()); if (!armed) { logger.debug("{} ConnectionWatchdog not armed", logPrefix()); return; } channel = null; if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) { scheduleReconnect(); } else { logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx); } super.channelInactive(ctx); } /** * Schedule reconnect if channel is not available/not active. */ public void scheduleReconnect() { logger.debug("{} scheduleReconnect()", logPrefix()); if (!isEventLoopGroupActive()) { logger.debug("isEventLoopGroupActive() == false"); return; } if (!isListenOnChannelInactive()) { logger.debug("Skip reconnect scheduling, listener disabled"); return; } if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) { attempts++; final int attempt = attempts; int timeout = (int) reconnectDelay.createDelay(attempt).toMillis(); logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout); this.reconnectScheduleTimeout = timer.newTimeout(it -> { reconnectScheduleTimeout = null; if (!isEventLoopGroupActive()) { logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated"); return; } reconnectWorkers.submit(() -> { ConnectionWatchdog.this.run(attempt); return null; }); }, timeout, TimeUnit.MILLISECONDS); // Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment. if (!reconnectSchedulerSync.get()) { reconnectScheduleTimeout = null; } } else { logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix()); } } /** * Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with * the same handler instances contained in the old channel's pipeline. * * @param attempt attempt counter * * @throws Exception when reconnection fails. */ public void run(int attempt) throws Exception { reconnectSchedulerSync.set(false); reconnectScheduleTimeout = null; if (!isEventLoopGroupActive()) { logger.debug("isEventLoopGroupActive() == false"); return; } if (!isListenOnChannelInactive()) { logger.debug("Skip reconnect scheduling, listener disabled"); return; } if (isReconnectSuspended()) { logger.debug("Skip reconnect scheduling, reconnect is suspended"); return; } boolean shouldLog = shouldLog(); InternalLogLevel infoLevel = InternalLogLevel.INFO; InternalLogLevel warnLevel = InternalLogLevel.WARN; if (shouldLog) { lastReconnectionLogging = System.currentTimeMillis(); } else { warnLevel = InternalLogLevel.DEBUG; infoLevel = InternalLogLevel.DEBUG; } InternalLogLevel warnLevelToUse = warnLevel; try { reconnectionListener.onReconnect(new ConnectionEvents.Reconnect(attempt)); logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress); ChannelFuture future = reconnectionHandler.reconnect(); future.addListener(it -> { if (it.isSuccess() || it.cause() == null) { return; } Throwable throwable = it.cause(); if (ReconnectionHandler.isExecutionException(throwable)) { logger.log(warnLevelToUse, "Cannot reconnect: {}", throwable.toString()); } else { logger.log(warnLevelToUse, "Cannot reconnect: {}", throwable.toString(), throwable); } if (!isReconnectSuspended()) { scheduleReconnect(); } }); } catch (Exception e) { logger.log(warnLevel, "Cannot reconnect: {}", e.toString()); } } }
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/protocol/ReconnectionHandler.javaspring
class ReconnectionHandler { //...... /** * Initiate reconnect and return a {@link ChannelFuture} for synchronization. The resulting future either succeeds or fails. * It can be {@link ChannelFuture#cancel(boolean) canceled} to interrupt reconnection and channel initialization. A failed * {@link ChannelFuture} will close the channel. * * @return reconnect {@link ChannelFuture}. */ protected ChannelFuture reconnect() { SocketAddress remoteAddress = socketAddressSupplier.get(); logger.debug("Reconnecting to Redis at {}", remoteAddress); ChannelFuture connectFuture = bootstrap.connect(remoteAddress); ChannelPromise initFuture = connectFuture.channel().newPromise(); initFuture.addListener((ChannelFuture it) -> { if (it.cause() != null) { connectFuture.cancel(true); close(it.channel()); } }); connectFuture.addListener((ChannelFuture it) -> { if (it.cause() != null) { initFuture.tryFailure(it.cause()); return; } ChannelPipeline pipeline = it.channel().pipeline(); RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class); if (channelInitializer == null) { initFuture.tryFailure(new IllegalStateException( "Reconnection attempt without a RedisChannelInitializer in the channel pipeline")); return; } channelInitializer.channelInitialized().whenComplete( (state, throwable) -> { if (throwable != null) { if (isExecutionException(throwable)) { initFuture.tryFailure(throwable); return; } if (clientOptions.isCancelCommandsOnReconnectFailure()) { connectionFacade.reset(); } if (clientOptions.isSuspendReconnectOnProtocolFailure()) { logger.error("Disabling autoReconnect due to initialization failure", throwable); setReconnectSuspended(true); } initFuture.tryFailure(throwable); return; } if (logger.isDebugEnabled()) { logger.info("Reconnected to {}, Channel {}", remoteAddress, ChannelLogDescriptor.logDescriptor(it.channel())); } else { logger.info("Reconnected to {}", remoteAddress); } initFuture.trySuccess(); }); }); Runnable timeoutAction = () -> { initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ", timeout, timeoutUnit))); }; Timeout timeoutHandle = timer.newTimeout(it -> { if (connectFuture.isDone() && initFuture.isDone()) { return; } if (reconnectWorkers.isShutdown()) { timeoutAction.run(); return; } reconnectWorkers.submit(timeoutAction); }, this.timeout, timeoutUnit); initFuture.addListener(it -> timeoutHandle.cancel()); return this.currentFuture = initFuture; } }
創建鏈接不成功鏈接池那裏會拋出org.springframework.data.redis.connection.PoolException: Could not get a resource from the pool; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 192.168.99.100:6379
)不過因爲LettuceConnectionFactory.SharedConnection的validateConnection方法在校驗失敗時,重複調用connectionProvider.release(connection),致使拋出org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.StatefulRedisConnectionImpl@1e4ad4a was either previously returned or does not belong to this connection provider異常