在使用redisson消息訂閱時,我針對門店商品庫存減扣進行訂閱的操做(在這裏一個商品一個監聽隊列),當正式投入生產時,發現一直再報Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
的錯誤,索性根據提示翻了翻源碼看看緣由:java
在redisson裏先關注一個類:RedisPubSubConnection
該類繼承自RedisConnection
,根據名字咱們可知它是一個典型的發佈與訂閱的類。那麼在redisson
使用時,會使用PubSubConnectionEntry
進行一次包裝:git
public class PubSubConnectionEntry { private final AtomicInteger subscribedChannelsAmount; private final RedisPubSubConnection conn; private final ConcurrentMap<ChannelName, SubscribeListener> subscribeChannelListeners = new ConcurrentHashMap<ChannelName, SubscribeListener>(); private final ConcurrentMap<ChannelName, Queue<RedisPubSubListener<?>>> channelListeners = new ConcurrentHashMap<ChannelName, Queue<RedisPubSubListener<?>>>(); public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { super(); this.conn = conn; this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection); } //.....省略其餘代碼 }
在這裏咱們能夠看到其有一個比較重要的屬性 subscribedChannelsAmount
,而這個值就是經過PublishSubscribeService
進行調用的:github
private void connect(final Codec codec, final ChannelName channelName, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) { //.... RedisPubSubConnection conn = future.getNow(); final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); //.... }
那麼此屬性就是根據config的subscriptionsPerConnection
裏設置的,那麼此值就表明了每一個鏈接的最大訂閱數。當tryAcqcurie
的時候會減小這個數量:redis
public int tryAcquire() { while (true) { int value = subscribedChannelsAmount.get(); if (value == 0) { return -1; } if (subscribedChannelsAmount.compareAndSet(value, value - 1)) { return value - 1; } } }
若是當此值爲0時,那麼會從新獲取一個可用的鏈接,代碼以下:spring
int remainFreeAmount = freeEntry.tryAcquire(); if (remainFreeAmount == -1) { throw new IllegalStateException(); } final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); if (oldEntry != null) { freeEntry.release(); freePubSubLock.release(); subscribe(channelName, promise, type, lock, oldEntry, listeners); return; } if (remainFreeAmount == 0) { freePubSubConnections.poll(); } freePubSubLock.release();
若是此時沒有可用的鏈接的話,恐怕這次操做就會等待新的鏈接直至超時,超時了就報上述的錯誤了,不過根據提示。咱們此時的解決辦法是增大subscriptionsPerConnection或者subscriptionConnectionPoolSize的值。當咱們使用springboot時能夠經過設置spring.redis.redisson.config
(具體設置請參考官網)來指定redisson的配置文件或者從新建立RedissonClient:promise
@Bean(destroyMethod = "shutdown") public RedissonClient redisson(RedissonProperties redissonProperties, RedisProperties redisProperties) throws IOException { Config config = new Config(); String prefix = "redis://"; Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl"); if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) { prefix = "rediss://"; } config.useSingleServer() .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort()) .setConnectTimeout(30000).setSubscriptionsPerConnection(5000) //在這裏指定數目 .setDatabase(redisProperties.getDatabase()) .setPassword(redisProperties.getPassword()); return Redisson.create(config); }