聊聊redisson的DelayedQueue

本文主要研究一下redisson的DelayedQueuejava

maven

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.8.1</version>
        </dependency>

實例

@Test
    public void testDelayedQueue() throws InterruptedException {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://192.168.99.100:6379");
        RedissonClient redisson = Redisson.create(config);
        RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");
        RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
        delayedQueue.offer("demo", 10, TimeUnit.SECONDS);
        Assert.assertFalse(blockingQueue.contains("demo"));
        TimeUnit.SECONDS.sleep(15);
        Assert.assertTrue(blockingQueue.contains("demo"));
    }
  • 這裏使用了兩個queue,對delayedQueue的offer操做是直接進入delayedQueue,可是delay是做用在目標隊列上,這裏就是RBlockingQueue

源碼解析

RDelayedQueue.offer

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.javagit

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {

    private final QueueTransferService queueTransferService;
    private final String channelName;
    private final String queueName;
    private final String timeoutSetName;
    
    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        channelName = prefixName("redisson_delay_queue_channel", getName());
        queueName = prefixName("redisson_delay_queue", getName());
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
        
        //QueueTransferTask task = ......
        
        queueTransferService.schedule(queueName, task);
        
        this.queueTransferService = queueTransferService;
    }

    public void offer(V e, long delay, TimeUnit timeUnit) {
        get(offerAsync(e, delay, timeUnit));
    }
    
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
     
        long randomId = PlatformDependent.threadLocalRandom().nextLong();
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;"
                 ,
              Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
              timeout, randomId, encode(e));
    }

    public ByteBuf encode(Object value) {
        if (commandExecutor.isRedissonReferenceSupportEnabled()) {
            RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
            if (reference != null) {
                value = reference;
            }
        }
        
        try {
            return codec.getValueEncoder().encode(value);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public static String prefixName(String prefix, String name) {
        if (name.contains("{")) {
            return prefix + ":" + name;
        }
        return prefix + ":{" + name + "}";
    }

    //......
}
  • 這裏使用的是一段lua腳本,其中keys參數數組有四個值,KEYS[1]爲getName(), KEYS[2]爲timeoutSetName, KEYS[3]爲queueName, KEYS[4]爲channelName
  • 變量有三個,ARGV[1]爲timeout,ARGV[2]爲randomId,ARGV[3]爲encode(e)
  • 這段lua腳本對timeoutSetName的zset添加一個結構體,其score爲timeout值;對queueName的list的表尾添加結構體;而後判斷timeoutSetName的zset的第一個元素是不是當前的結構體,若是是則對channel發佈timeout消息

queueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.javagithub

QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                      System.currentTimeMillis(), 100);
            }
            
            @Override
            protected RTopic<Long> getTopic() {
                return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        
        queueTransferService.schedule(queueName, task);
  • RedissonDelayedQueue構造器裏頭對QueueTransferTask進行調度
  • 調度執行的是pushTaskAsync方法,主要就是將到期的元素從元素隊列移到目標隊列
  • 這裏使用一段lua腳本,KEYS[1]爲getName(),KEYS[2]爲timeoutSetName,KEYS[3]爲queueName;ARGV[1]爲當前時間戳,ARGV[2]爲100
  • 這裏調用zrangebyscore,對timeoutSetName的zset使用timeout參數進行排序,取得分介於0和當前時間戳的元素,取前200條
  • 若是有值表示該元素須要移交到目標隊列,而後調用rpush移交到目標隊列,再調用lrem從元素隊列移除,最後在從timeoutSetName的zset中刪除掉已經處理的這些元素
  • 處理完過元素轉移以後,再取timeoutSetName的zset的第一個元素的得分返回,若是沒有返回nil

QueueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.javaredis

public class QueueTransferService {

    private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();
    
    public synchronized void schedule(String name, QueueTransferTask task) {
        QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
        if (oldTask == null) {
            task.start();
        } else {
            oldTask.incUsage();
        }
    }
    
    public synchronized void remove(String name) {
        QueueTransferTask task = tasks.get(name);
        if (task != null) {
            if (task.decUsage() == 0) {
                tasks.remove(name, task);
                task.stop();
            }
        }
    }
}
  • 這裏的schedule方法首先添加到ConcurrentMap中,若是該任務已經存在,則調用oldTask.incUsage(),不存在則啓動該任務

QueueTransferTask.start

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferTask.java數組

public void start() {
        RTopic<Long> schedulerTopic = getTopic();
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        
        messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {
            return;
        }
        
        if (oldTimeout != null) {
            oldTimeout.getTask().cancel();
        }
        
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }

    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    if (future.cause() instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error(future.cause().getMessage(), future.cause());
                    scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                    return;
                }
                
                if (future.getNow() != null) {
                    scheduleTask(future.getNow());
                }
            }
        });
    }
  • 這裏用到了RTopic,添加了StatusListener以及MessageListener
  • StatusListener在訂閱的時候觸發pushTask,MessageListener主要是調用scheduleTask
  • pushTaskAsync在RedissonDelayedQueue的實現就是上面講的實現元素在原始隊列及目標隊列的轉移
  • scheduleTask方法會從新計算delay,對於大於10的延時觸發pushTask,小於等於10的則馬上觸發pushTask
  • pushTask會對pushTaskAsync操做進行回調,若是執行不成功則從新觸發scheduleTask,若是執行成功可是返回值(timeoutSetName的zset的第一個元素的得分)不爲null的話,則以該值觸發scheduleTask

小結

  • redisson的DelayedQueue使用上是將元素及延時信息入隊,以後定時任務將到期的元素轉移到目標隊列
  • 這裏使用了三個結構來存儲,一個是目標隊列list;一個是原生隊列list,添加的是帶有延時信息的結構體;一個是timeoutSetName的zset,元素是結構體,其score爲timeout值
  • redisson使用了不少異步回調來操做,總體代碼閱讀上會相對費勁些

doc

相關文章
相關標籤/搜索