關於SpringBoot中Redis線程池的有關探討

探討原由

最近在寫一個小項目,用redis過時來實現驗證碼的時間限制。由於SpringBoot默認採用 lettuce做爲客戶端,引入了commons-pool2依賴以後作了以下配置:java

spring:
 redis:
 host: 192.168.56.1
 lettuce:
 pool:
 min-idle: 2
 max-active: 8     #默認
 max-idle: 8       #默認
複製代碼

原本覺得這樣作就好了,而後寫了以下代碼測了下node

@Test
    public void test() throws InterruptedException {
        int i;
        CountDownLatch c = new CountDownLatch(5000);
        long x = System.currentTimeMillis();
        for (i = 0; i < 5000; i++) {
            new Thread(() -> {
                redisTemplate.opsForValue().set("1", "1");
                c.countDown();
            }).start();
        }
        c.await();
    }
複製代碼

測試期間,實際客戶端最大接入:react

# Clients # 減去一個redis-cli鏈接,一個爲以前的項目鏈接
connected_clients:3
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製代碼

???,我設置的配置去哪裏了???,因而開始了漫長的探索redis

嘗試

首先看下默認下是怎樣的。去除了pool的設置,再跑一下。spring

# Clients # 減去一個redis-cli鏈接,一個爲以前的項目鏈接
connected_clients:3
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製代碼

很好,一樣的結果。說明剛剛的配置根本沒有效果。沒準是lettuce的緣由?因而我修改了pom,用jedis測試了一下。shell

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.1.7.RELEASE</version>
            <exclusions>
                <exclusion>
                    <artifactId>lettuce-core</artifactId>
                    <groupId>io.lettuce</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <!-- 注意版本和SpringData兼容 -->
            <version>2.9.1</version>
        </dependency>
複製代碼
spring:
 redis:
 host: 192.168.56.1
 jedis:
 pool:
 min-idle: 2
 max-active: 8
 max-idle: 8
複製代碼

看下結果:bash

# Clients # 減去一個redis-cli鏈接,一個爲以前的項目鏈接
connected_clients:10
client_recent_max_input_buffer:2
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
複製代碼

最大換成15再試試:app

# Clients
connected_clients:17
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
複製代碼

jedis線程池正常,那說明是lettuce的配置上除了問題。async

探索

首先先來看看LettuceConnectionFactory中的getConnection是怎樣實現的。ide

public RedisConnection getConnection() {
        if (this.isClusterAware()) {
            return this.getClusterConnection();
        } else {
            LettuceConnection connection;
            if (this.pool != null) {
                connection = new LettuceConnection(this.getSharedConnection(), this.getTimeout(), (AbstractRedisClient)null, this.pool, this.getDatabase());
            } else {
                connection = new LettuceConnection(this.getSharedConnection(), this.connectionProvider, this.getTimeout(), this.getDatabase());
            }

            connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
            return connection;
        }
    }
複製代碼

首先,是關於集羣判斷,由於我沒設置任何集羣相關,因此直接來到else。此處的pool 並非咱們設置的pool。它對應的是以下方法,且只有這一處對它進行賦值:

/** * @param pool * @deprecated since 2.0, use pooling via {@link LettucePoolingClientConfiguration}. */
	@Deprecated
	public LettuceConnectionFactory(LettucePool pool) {

		this(new MutableLettuceClientConfiguration());
		this.pool = pool;
	}
複製代碼

LettuceConnectionConfiguration中的注入繼續追下去能夠看到並無調用這個過期方法:

@Bean
	@ConditionalOnMissingBean(RedisConnectionFactory.class)
	public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources) throws UnknownHostException {
		LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources,
				this.properties.getLettuce().getPool());
		return createLettuceConnectionFactory(clientConfig);
	}

	// 以後來到
	public LettuceConnectionFactory(RedisStandaloneConfiguration standaloneConfig, LettuceClientConfiguration clientConfig) {

		this(clientConfig);

		Assert.notNull(standaloneConfig, "RedisStandaloneConfiguration must not be null!");

		this.standaloneConfig = standaloneConfig;
		this.configuration = this.standaloneConfig;
	}
複製代碼

因此,上面判斷時此的pool恆爲null,進入else。在實例化以前,首先會根據getSharedConnection獲取一個StatefulRedisConnection鏈接:

protected StatefulRedisConnection<byte[], byte[]> getSharedConnection() {
		return shareNativeConnection ? (StatefulRedisConnection) getOrCreateSharedConnection().getConnection() : null;
	}

	private SharedConnection<byte[]> getOrCreateSharedConnection() {

		synchronized (this.connectionMonitor) {

			if (this.connection == null) {
				this.connection = new SharedConnection<>(connectionProvider);
			}

			return this.connection;
		}
	}
複製代碼

再接着向下則進入LettuceConnection構造函數:

LettuceConnection(@Nullable StatefulConnection<byte[], byte[]> sharedConnection,
			LettuceConnectionProvider connectionProvider, long timeout, int defaultDbIndex) {

		Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null.");

		this.asyncSharedConn = sharedConnection;
		this.connectionProvider = connectionProvider;
		this.timeout = timeout;
		this.defaultDbIndex = defaultDbIndex;
		this.dbIndex = this.defaultDbIndex;
	}
複製代碼

此時的this.asyncSharedConn = sharedConnection;,再接着向下,咱們會進入以下方法:

protected RedisClusterCommands<byte[], byte[]> getConnection() {
		// 在事務未開啓時此處爲false默認值
		if (isQueueing()) {
			return getDedicatedConnection();
		}
		if (asyncSharedConn != null) {

			if (asyncSharedConn instanceof StatefulRedisConnection) {
				return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
			if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
				return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
		}
		return getDedicatedConnection();
	}
複製代碼

這裏的asyncSharedConn即爲咱們上面的共享實例鏈接。

分析一下,在默認狀況下:

private boolean shareNativeConnection = true;
複製代碼

new LettuceConnection時,通過StatefulRedisConnection()getOrCreateSharedConnection()後傳入了同一個共享鏈接。因此在此時即便配置了線程池,在運行時有且只有一個共享實例提供操做。 那咱們驗證一下,我嘗試人爲改變一下shareNativeConnection

@Configuration
public class Config {
    @Autowired
    public void setLettuceConnectionFactory(LettuceConnectionFactory lettuceConnectionFactory){
        lettuceConnectionFactory.setShareNativeConnection(false);
    }
}
複製代碼

仍是按下面的配置及測試:

spring:
 redis:
 host: 192.168.56.1
 lettuce:
 pool:
 min-idle: 2
複製代碼
@Test
    public void test() throws InterruptedException {
        int i ;
        CountDownLatch c = new CountDownLatch(5000);
        for (i = 1; i <= 5000; i++) {
            new Thread(() -> {
                System.out.println(redisTemplate.execute(RedisConnection::ping));
                c.countDown();
            }).start();
        }
        c.await();
    }
複製代碼

以後結果:

# Clients
connected_clients:10                    # 減去一個redis-cli鏈接,一個爲以前的項目鏈接
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製代碼

這個結果10中的8個爲pool中的默認max-active屬性,但這不意味不設置pool屬性時仍爲默認值(下面會說)。再將max-active改成10,結果也是相符:

# Clients # 減去一個redis-cli鏈接,一個爲以前的項目鏈接
connected_clients:12
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製代碼

那改變以後和以前有何區別呢?修改後,首先在new LettuceConnection(this.getSharedConnection(), this.connectionProvider, this.getTimeout(), this.getDatabase());時,傳入的共享實例爲null,接着也是進入以下方法:

protected RedisClusterCommands<byte[], byte[]> getConnection() {
		// 在事務未開啓時此處爲false默認值
		if (isQueueing()) {
			return getDedicatedConnection();
		}
		if (asyncSharedConn != null) {

			if (asyncSharedConn instanceof StatefulRedisConnection) {
				return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
			if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
				return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
		}
		return getDedicatedConnection();
	}
複製代碼

再接着則會進入getDedicatedConnection()

RedisClusterCommands<byte[], byte[]> getDedicatedConnection() {

		if (asyncDedicatedConn == null) {

			asyncDedicatedConn = doGetAsyncDedicatedConnection();

			if (asyncDedicatedConn instanceof StatefulRedisConnection) {
				((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex);
			}
		}

		if (asyncDedicatedConn instanceof StatefulRedisConnection) {
			return ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync();
		}
		if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
			return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncDedicatedConn).sync();
		}

		throw new IllegalStateException(
				String.format("%s is not a supported connection type.", asyncDedicatedConn.getClass().getName()));
	}
複製代碼

由於首次進入,asyncDedicatedConn確定爲null,繼續向下doGetAsyncDedicatedConnection();

protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
		return connectionProvider.getConnection(StatefulConnection.class);
	}
複製代碼

此時能夠看到,鏈接的提供則會由LettuceConnectionFactory中傳入的connectionProvider負責。connectionProviderafterPropertiesSet()進行初始化:

public void afterPropertiesSet() {

		this.client = createClient();

		this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
		this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);

		if (isClusterAware()) {
			this.clusterCommandExecutor = new ClusterCommandExecutor(
					new LettuceClusterTopologyProvider((RedisClusterClient) client),
					new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
					EXCEPTION_TRANSLATION);
		}
    
    // 建立方法 
	private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {

		LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);

		if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
			return new LettucePoolingConnectionProvider(connectionProvider,
					(LettucePoolingClientConfiguration) this.clientConfiguration);
		}

		return connectionProvider;
	}
複製代碼

能夠看到connectionProvider與上面注入的this.properties.getLettuce().getPool()的配置息息相關。首先在建立時都爲doCreateConnectionProvider:

protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {

		ReadFrom readFrom = getClientConfiguration().getReadFrom().orElse(null);

		if (isStaticMasterReplicaAware()) {

			List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) configuration).getNodes().stream() //
					.map(it -> createRedisURIAndApplySettings(it.getHostName(), it.getPort())) //
					.peek(it -> it.setDatabase(getDatabase())) //
					.collect(Collectors.toList());

			return new StaticMasterReplicaConnectionProvider((RedisClient) client, codec, nodes, readFrom);
		}

		if (isClusterAware()) {
			return new ClusterConnectionProvider((RedisClusterClient) client, codec, readFrom);
		}

		return new StandaloneConnectionProvider((RedisClient) client, codec, readFrom);
	}
複製代碼

pool在配置中爲null,那connectionProvider則爲StandaloneConnectionProvider,不然則爲LettucePoolingConnectionProvider。而在getConnection的實現上,二者也存在巨大區別。

// LettucePoolingConnectionProvider 
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

		GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
			return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
					poolConfig, false);
		});

		try {

			StatefulConnection<?, ?> connection = pool.borrowObject();

			poolRef.put(connection, pool);

			return connectionType.cast(connection);
		} catch (Exception e) {
			throw new PoolException("Could not get a resource from the pool", e);
		}
	}

// StandaloneConnectionProvider
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

		if (connectionType.equals(StatefulRedisSentinelConnection.class)) {
			return connectionType.cast(client.connectSentinel());
		}

		if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
			return connectionType.cast(client.connectPubSub(codec));
		}

		if (StatefulConnection.class.isAssignableFrom(connectionType)) {

			return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
					.orElseGet(() -> client.connect(codec)));
		}

		throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
	}
複製代碼

至此即可看到:若是咱們將shareNativeConnection設置爲false以後便會有兩種情形:

  • 沒有設置pool:StandaloneConnectionProvider每次請求都會建立一個鏈接
  • 設置了pool:LettucePoolingConnectionProvider則是以線程池的方式建立鏈接。

那咱們驗證一下,若是我將shareNativeConnection設置爲false同時不在application.yml對線程池進行任何設置,結果以下:

# Clients
connected_clients:591
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:771
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:885
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:974
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:1022
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:1022
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製代碼

這鏈接數是直接放飛自我了。。。。。並且伴隨着報錯的發生。不過設置pool後,由線程池進行管理鏈接則可獲得人爲控制。

總結

經過探索,我發現Lettuce的poolshareNativeConnection息息相關,若有不對的地方,望能指證。

相關文章
相關標籤/搜索