使用redisson時關於訂閱數的問題

在使用redisson消息訂閱時,我針對門店商品庫存減扣進行訂閱的操做(在這裏一個商品一個監聽隊列),當正式投入生產時,發現一直再報Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.的錯誤,索性根據提示翻了翻源碼看看緣由:java

在redisson裏先關注一個類:RedisPubSubConnection該類繼承自RedisConnection,根據名字咱們可知它是一個典型的發佈與訂閱的類。那麼在redisson使用時,會使用PubSubConnectionEntry進行一次包裝:git

public class PubSubConnectionEntry {
    
        private final AtomicInteger subscribedChannelsAmount;
        private final RedisPubSubConnection conn;
    
        private final ConcurrentMap<ChannelName, SubscribeListener> subscribeChannelListeners = new ConcurrentHashMap<ChannelName, SubscribeListener>();
        private final ConcurrentMap<ChannelName, Queue<RedisPubSubListener<?>>> channelListeners = new ConcurrentHashMap<ChannelName, Queue<RedisPubSubListener<?>>>();
    
        public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
            super();
            this.conn = conn;
            this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection);
        }
      
     //.....省略其餘代碼 
    }

在這裏咱們能夠看到其有一個比較重要的屬性 subscribedChannelsAmount,而這個值就是經過PublishSubscribeService進行調用的:github

private void connect(final Codec codec, final ChannelName channelName,
                final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
          //....
          
          
             RedisPubSubConnection conn = future.getNow();
                    
                    final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
                    entry.tryAcquire();
          //....
        }

那麼此屬性就是根據config的subscriptionsPerConnection裏設置的,那麼此值就表明了每一個鏈接的最大訂閱數。當tryAcqcurie的時候會減小這個數量:redis

public int tryAcquire() {
            while (true) {
                int value = subscribedChannelsAmount.get();
                if (value == 0) {
                    return -1;
                }
                
                if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {
                    return value - 1;
                }
            }
        }

若是當此值爲0時,那麼會從新獲取一個可用的鏈接,代碼以下:spring

int remainFreeAmount = freeEntry.tryAcquire();
                    if (remainFreeAmount == -1) {
                        throw new IllegalStateException();
                    }
                    
                    final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
                    if (oldEntry != null) {
                        freeEntry.release();
                        freePubSubLock.release();
                        subscribe(channelName, promise, type, lock, oldEntry, listeners);
                        return;
                    }
                    
                    if (remainFreeAmount == 0) {
                        freePubSubConnections.poll();
                    }
                    freePubSubLock.release();

若是此時沒有可用的鏈接的話,恐怕這次操做就會等待新的鏈接直至超時,超時了就報上述的錯誤了,不過根據提示。咱們此時的解決辦法是增大subscriptionsPerConnection或者subscriptionConnectionPoolSize的值。當咱們使用springboot時能夠經過設置spring.redis.redisson.config(具體設置請參考官網)來指定redisson的配置文件或者從新建立RedissonClient:promise

@Bean(destroyMethod = "shutdown")
            public RedissonClient redisson(RedissonProperties redissonProperties, RedisProperties redisProperties) throws IOException {
    
                Config config = new Config();
                String prefix = "redis://";
                Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
                if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) {
                    prefix = "rediss://";
                }
    
                config.useSingleServer()
                        .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())
                        .setConnectTimeout(30000).setSubscriptionsPerConnection(5000) //在這裏指定數目
                        .setDatabase(redisProperties.getDatabase())
                        .setPassword(redisProperties.getPassword());
    
                return Redisson.create(config);
            }
相關文章
相關標籤/搜索