聊聊spring-data-redis的鏈接池的校驗

本文主要研究一下spring-data-redis的鏈接池的校驗html

lettuce

LettucePoolingConnectionProvider

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.javajava

class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
    private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
    private final LettuceConnectionProvider connectionProvider;
    private final GenericObjectPoolConfig poolConfig;
    private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32);
    private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32);

    LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) {
        Assert.notNull(connectionProvider, "ConnectionProvider must not be null!");
        Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
        this.connectionProvider = connectionProvider;
        this.poolConfig = clientConfiguration.getPoolConfig();
    }

    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> {
            return ConnectionPoolSupport.createGenericObjectPool(() -> {
                return this.connectionProvider.getConnection(connectionType);
            }, this.poolConfig, false);
        });

        try {
            StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject();
            this.poolRef.put(connection, pool);
            return (StatefulConnection)connectionType.cast(connection);
        } catch (Exception var4) {
            throw new PoolException("Could not get a resource from the pool", var4);
        }
    }

    public AbstractRedisClient getRedisClient() {
        if (this.connectionProvider instanceof RedisClientProvider) {
            return ((RedisClientProvider)this.connectionProvider).getRedisClient();
        } else {
            throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName()));
        }
    }

    public void release(StatefulConnection<?, ?> connection) {
        GenericObjectPool<StatefulConnection<?, ?>> pool = (GenericObjectPool)this.poolRef.remove(connection);
        if (pool == null) {
            throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider");
        } else {
            pool.returnObject(connection);
        }
    }

    public void destroy() throws Exception {
        if (!this.poolRef.isEmpty()) {
            log.warn("LettucePoolingConnectionProvider contains unreleased connections");
            this.poolRef.forEach((connection, pool) -> {
                pool.returnObject(connection);
            });
            this.poolRef.clear();
        }

        this.pools.forEach((type, pool) -> {
            pool.close();
        });
        this.pools.clear();
    }
}
  • 這裏調用ConnectionPoolSupport.createGenericObjectPool來建立鏈接池

ConnectionPoolSupport.createGenericObjectPool

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.javagit

public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
            Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {

        LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
        LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");

        AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>();

        GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {

            @Override
            public T borrowObject() throws Exception {
                return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject();
            }

            @Override
            public void returnObject(T obj) {

                if (wrapConnections && obj instanceof HasTargetConnection) {
                    super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
                    return;
                }
                super.returnObject(obj);
            }
        };

        poolRef.set(pool);

        return pool;
    }
  • 這裏使用了RedisPooledObjectFactory

ConnectionPoolSupport.RedisPooledObjectFactory

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.javagithub

private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> {

        private final Supplier<T> connectionSupplier;

        RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
            this.connectionSupplier = connectionSupplier;
        }

        @Override
        public T create() throws Exception {
            return connectionSupplier.get();
        }

        @Override
        public void destroyObject(PooledObject<T> p) throws Exception {
            p.getObject().close();
        }

        @Override
        public PooledObject<T> wrap(T obj) {
            return new DefaultPooledObject<>(obj);
        }

        @Override
        public boolean validateObject(PooledObject<T> p) {
            return p.getObject().isOpen();
        }
    }
  • 這裏繼承了BasePooledObjectFactory,重寫了validate等方法,這裏validate是經過isOpen來判斷

RedisChannelHandler.isOpen

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.javaredis

public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);

    private Duration timeout;
    private CloseEvents closeEvents = new CloseEvents();

    private final RedisChannelWriter channelWriter;
    private final boolean debugEnabled = logger.isDebugEnabled();

    private volatile boolean closed;
    private volatile boolean active = true;
    private volatile ClientOptions clientOptions;

    //......

    /**
     * Notification when the connection becomes active (connected).
     */
    public void activated() {
        active = true;
        closed = false;
    }

    /**
     * Notification when the connection becomes inactive (disconnected).
     */
    public void deactivated() {
        active = false;
    }

    /**
     *
     * @return true if the connection is active and not closed.
     */
    public boolean isOpen() {
        return active;
    }

    @Override
    public synchronized void close() {

        if (debugEnabled) {
            logger.debug("close()");
        }

        if (closed) {
            logger.warn("Connection is already closed");
            return;
        }

        if (!closed) {
            active = false;
            closed = true;
            channelWriter.close();
            closeEvents.fireEventClosed(this);
            closeEvents = new CloseEvents();
        }
    }
}
  • isOpen是經過active字段來判斷的,而active在deactivated或者close的時候變爲false,初始化以及在activated的時候變爲true
  • 能夠看到對於docker pause這種形成的timeout,active這種方式檢測不出來

LettuceConnectionFactory.SharedConnection.validateConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.javaspring

/**
		 * Validate the connection. Invalid connections will be closed and the connection state will be reset.
		 */
		void validateConnection() {

			synchronized (this.connectionMonitor) {

				boolean valid = false;

				if (connection != null && connection.isOpen()) {
					try {

						if (connection instanceof StatefulRedisConnection) {
							((StatefulRedisConnection) connection).sync().ping();
						}

						if (connection instanceof StatefulRedisClusterConnection) {
							((StatefulRedisConnection) connection).sync().ping();
						}
						valid = true;
					} catch (Exception e) {
						log.debug("Validation failed", e);
					}
				}

				if (!valid) {

					if (connection != null) {
						connectionProvider.release(connection);
					}

					log.warn("Validation of shared connection failed. Creating a new connection.");

					resetConnection();
					this.connection = getNativeConnection();
				}
			}
		}
  • 這個是默認開啓LettuceConnectionFactory的shareNativeConnection走的獲取鏈接的方法
  • 若是LettuceConnectionFactory的validateConnection爲true的話(默認爲false),則會本身在每次get的時候執行一下validateConnection

DefaultLettucePool.LettuceFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.javadocker

private static class LettuceFactory extends BasePooledObjectFactory<StatefulConnection<byte[], byte[]>> {
        private final RedisClient client;
        private int dbIndex;

        public LettuceFactory(RedisClient client, int dbIndex) {
            this.client = client;
            this.dbIndex = dbIndex;
        }

        public void activateObject(PooledObject<StatefulConnection<byte[], byte[]>> pooledObject) throws Exception {
            if (pooledObject.getObject() instanceof StatefulRedisConnection) {
                ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex);
            }

        }

        public void destroyObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) throws Exception {
            try {
                ((StatefulConnection)obj.getObject()).close();
            } catch (Exception var3) {
                ;
            }

        }

        public boolean validateObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) {
            try {
                if (obj.getObject() instanceof StatefulRedisConnection) {
                    ((StatefulRedisConnection)obj.getObject()).sync().ping();
                }

                return true;
            } catch (Exception var3) {
                return false;
            }
        }

        public StatefulConnection<byte[], byte[]> create() throws Exception {
            return this.client.connect(LettuceConnection.CODEC);
        }

        public PooledObject<StatefulConnection<byte[], byte[]>> wrap(StatefulConnection<byte[], byte[]> obj) {
            return new DefaultPooledObject(obj);
        }
    }
  • 被廢棄的DefaultLettucePool裏頭有個LettuceFactory,其validate是經過ping來判斷的,於是更爲準確

jedis

JedisConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.javaide

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
	//......
	private Pool<Jedis> createPool() {

		if (isRedisSentinelAware()) {
			return createRedisSentinelPool(this.sentinelConfig);
		}
		return createRedisPool();
	}

	/**
	 * Creates {@link JedisSentinelPool}.
	 *
	 * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
	 * @return the {@link Pool} to use. Never {@literal null}.
	 * @since 1.4
	 */
	protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) {

		GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
		return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
				poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
	}

	/**
	 * Creates {@link JedisPool}.
	 *
	 * @return the {@link Pool} to use. Never {@literal null}.
	 * @since 1.4
	 */
	protected Pool<Jedis> createRedisPool() {

		return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(),
				getPassword(), getDatabase(), getClientName(), isUseSsl(),
				clientConfiguration.getSslSocketFactory().orElse(null), //
				clientConfiguration.getSslParameters().orElse(null), //
				clientConfiguration.getHostnameVerifier().orElse(null));
	}
	//......
}
  • 無論是JedisPool仍是JedisSentinelPool,裏頭使用的是JedisFactory

JedisFactory.validateObject

jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.javathis

class JedisFactory implements PooledObjectFactory<Jedis> {
  private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
  private final int connectionTimeout;
  private final int soTimeout;
  private final String password;
  private final int database;
  private final String clientName;
  private final boolean ssl;
  private final SSLSocketFactory sslSocketFactory;
  private SSLParameters sslParameters;
  private HostnameVerifier hostnameVerifier;

  //......

  @Override
  public boolean validateObject(PooledObject<Jedis> pooledJedis) {
    final BinaryJedis jedis = pooledJedis.getObject();
    try {
      HostAndPort hostAndPort = this.hostAndPort.get();

      String connectionHost = jedis.getClient().getHost();
      int connectionPort = jedis.getClient().getPort();

      return hostAndPort.getHost().equals(connectionHost)
          && hostAndPort.getPort() == connectionPort && jedis.isConnected()
          && jedis.ping().equals("PONG");
    } catch (final Exception e) {
      return false;
    }
  }
}
  • JedisFactory實現了PooledObjectFactory接口,其validateObject方法不只校驗isConnected,並且也校驗了ping方法
  • ping方法只要超時就會拋出異常,從而校驗失敗,於是能夠感知到docker pause帶來的timeout,從而將鏈接從鏈接池剔除

小結

  • spring-date-redis的2.0及以上版本廢棄了原來的LettucePool,改成使用LettucePoolingClientConfiguration
  • 這裏有一個問題,就是舊版鏈接池校驗是採用ping的方式,而新版鏈接池校驗則是使用active字段來標識,對於docker pause識別不出來
  • 對於lettuce其shareNativeConnection參數默認爲true,且validateConnection爲false,第一次從鏈接池borrow到鏈接以後,就一直複用底層的鏈接,也沒有歸還。若是要每次獲取鏈接都走鏈接池獲取而後歸還,須要設置shareNativeConnection爲false
  • jedis的鏈接池實現,其validateObject方法不只校驗isConnected,並且也校驗了ping方法,於是可以感知到docker pause帶來的timeout,從而將鏈接從鏈接池剔除
  • 對於lettuce來講,若是要識別docker pause的異常,有兩個方案,一個是修復ConnectionPoolSupport中RedisPooledObjectFactory的validateObject方法,不只判斷isOpen,還須要ping一下;另一個是不開啓鏈接池,而且將LettuceConnectionFactory的validateConnection參數設置爲true

doc

相關文章
相關標籤/搜索