使用Redis實現延時任務(二)

前提

前一篇文章經過Redis的有序集合Sorted Set和調度框架Quartz實例一版簡單的延時任務,可是有兩個相對重要的問題沒有解決:java

  1. 分片。
  2. 監控。

這篇文章的內容就是要完善這兩個方面的功能。前置文章:使用Redis實現延時任務(一)web

爲何須要分片

這裏從新貼一下查詢腳本dequeue.lua的內容:redis

-- 參考jesque的部分Lua腳本實現
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回結果是{'ok':'zset'}這樣子,這裏利用next作一輪迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            -- unpack函數能把table轉化爲可變參數
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil

這個腳本一共用到了四個命令ZREVRANGEBYSCOREZREMHMGETHDELTYPE命令的時間複雜度能夠忽略):spring

命令 時間複雜度 參數說明
ZREVRANGEBYSCORE O(log(N)+M) N是有序集合中的元素總數,M是返回的元素的數量
ZREM O(M*log(N)) N是有序集合中的元素總數,M是成功移除的元素的數量
HMGET O(L) L是成功返回的域的數量
HDEL O(L) L是要刪除的域的數量

接下來須要結合場景和具體參數分析,假如在生產環境,有序集合的元素總量維持在10000每小時(也就是說業務量是每小時下單1萬筆),因爲查詢Sorted SetHash的數據同時作了刪除,那麼30分鐘內常駐在這兩個集合中的數據有5000條,也就是上面表中的N = 5000。假設咱們初步定義查詢的LIMIT值爲100,也就是上面的M值爲100,假設Redis中每一個操做單元的耗時簡單認爲是T,那麼分析一下5000條數據處理的耗時:shell

序號 集合基數 ZREVRANGEBYSCORE ZREM HMGET HDEL
1 5000 log(5000T) + 100T log(5000T) * 100 100T 100T
2 4900 log(4900T) + 100T log(4900T) * 100 100T 100T
3 4800 log(4800T) + 100T log(4800T) * 100 100T 100T
... ... ... ... ... ...

理論上,腳本用到的四個命令中,ZREM命令的耗時是最大的,而ZREVRANGEBYSCOREZREM的時間複雜度函數都是M * log(N),所以控制集合元素基數N對於下降Lua腳本運行的耗時是有必定幫助的。架構

分片

上面分析了dequeue.lua的時間複雜度,準備好的分片方案有兩個:app

  • 方案一:單Redis實例,對Sorted SetHash兩個集合的數據進行分片。
  • 方案二:基於多個Redis實例(能夠是哨兵或者集羣),實施方案一的分片操做。

爲了簡單起見,後面的例子中分片的數量(shardingCount)設計爲2,生產中分片數量應該根據實際狀況定製。預設使用長整型的用戶ID字段userId取模進行分片,假定測試數據中的userId是均勻分佈的。框架

通用實體:運維

@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
    private String timestamp;
}

延遲隊列接口:jvm

public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index);

    List<OrderMessage> dequeue(int index);

    String enqueueSha();

    String dequeueSha();
}

單Redis實例分片

Redis實例分片比較簡單,示意圖以下:

r-d-t-2nd-1

編寫隊列實現代碼以下(部分參數寫死,僅供參考,切勿照搬到生產中):

@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    /**
     * 分片數量
     */
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_";
    private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();

    private final JedisProvider jedisProvider;

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args);
        }
    }

    @Override
    public List<OrderMessage> dequeue(int index) {
        // 30分鐘以前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }

    @Override
    public String enqueueSha() {
        return ENQUEUE_LUA_SHA.get();
    }

    @Override
    public String dequeueSha() {
        return DEQUEUE_LUA_SHA.get();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 加載Lua腳本
        loadLuaScript();
    }

    private void loadLuaScript() throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null, sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null, sha);
        }
    }
}

消費者定時任務的實現以下:

DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    /**
     * 初始化業務線程池
     */
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // 這裏爲了簡單起見,分片的下標暫時使用Quartz的任務執行上下文存放
        int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex");
        LOGGER.info("訂單消息消費者定時任務開始執行,shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null != dequeue) {
            final CountDownLatch latch = new CountDownLatch(1);
            BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("訂單消息消費者定時任務執行完畢,shardingIndex:[{}]...", shardingIndex);
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final int shardingIndex;

        @Override
        public void run() {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("shardingIndex:[{}],處理訂單消息,內容:{}", shardingIndex, JSON.toJSONString(message));
                    // 模擬耗時
                    TimeUnit.MILLISECONDS.sleep(50);
                }
            } catch (Exception ignore) {
            } finally {
                latch.countDown();
            }
        }
    }
}

啓動定時任務和寫入測試數據的CommandLineRunner實現以下:

@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void run(String... args) throws Exception {
        int shardingCount = 2;
        // 準備測試數據
        prepareOrderMessageData(shardingCount);
        for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
            scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
        }
    }

    private void prepareOrderMessageData(int shardingCount) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.now().format(f));
                messages.add(message);
            }
            for (OrderMessage message : messages) {
                // 30分鐘前
                Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
                long index = message.getUserId() % shardingCount;
                jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message));
                jedis.zadd("ORDER_QUEUE_" + index, score, message.getOrderId());
            }
        }
    }

    private List<ConsumerTask> prepareConsumerTasks(int shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (int i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }

    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {

        private final JobDetail jobDetail;
        private final Trigger trigger;
    }
}

啓動應用,輸出以下:

2019-08-28 00:13:20.648  INFO 50248 --- [           main] c.t.s.s.NoneJdbcSpringApplication        : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109)
2019-08-28 00:13:20.780  INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer      : 訂單消息消費者定時任務開始執行,shardingIndex:[0]...
2019-08-28 00:13:20.781  INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer      : 訂單消息消費者定時任務開始執行,shardingIndex:[1]...
2019-08-28 00:13:20.788  INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[1],處理訂單消息,內容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99}
2019-08-28 00:13:20.788  INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[0],處理訂單消息,內容:{"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98}
2019-08-28 00:13:20.840  INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[1],處理訂單消息,內容:{"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97}
2019-08-28 00:13:20.840  INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[0],處理訂單消息,內容:{"amount":96,"orderId":"ORDER_ID_96","timestamp":"2019-08-28 00:13:20.657","userId":96}
// ... 省略大量輸出
2019-08-28 00:13:21.298  INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer      : 訂單消息消費者定時任務執行完畢,shardingIndex:[0]...
2019-08-28 00:13:21.298  INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer      : 訂單消息消費者定時任務執行完畢,shardingIndex:[1]...
// ... 省略大量輸出

多Redis實例分片

Redis實例分片其實存在一個問題,就是Redis實例老是單線程處理客戶端的命令,即便客戶端是多個線程執行Redis命令,示意圖以下:

r-d-t-2nd-2

這種狀況下,雖然經過分片下降了Lua腳本命令的複雜度,可是Redis的命令處理模型(單線程)也有可能成爲另外一個性能瓶頸隱患。所以,能夠考慮基於多Redis實例進行分片。

r-d-t-2nd-3

這裏爲了簡單起見,用兩個單點的Redis實例作編碼示例。代碼以下:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {

    private final Map<Long, JedisPool> pools = Maps.newConcurrentMap();
    private JedisPool defaultPool;

    @Override
    public void afterPropertiesSet() throws Exception {
        JedisPool pool = new JedisPool("localhost");
        defaultPool = pool;
        pools.put(0L, pool);
        // 這個是虛擬機上的redis實例
        pool = new JedisPool("192.168.56.200");
        pools.put(1L, pool);
    }

    public Jedis provide(Long index) {
        return pools.getOrDefault(index, defaultPool).getResource();
    }
}

// 訂單消息
@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
}

// 訂單延時隊列接口
public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index);

    List<OrderMessage> dequeue(long index);

    String enqueueSha(long index);

    String dequeueSha(long index);
}

// 延時隊列實現
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap();
    private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap();

    private final JedisProvider jedisProvider;

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args);
        }
    }

    @Override
    public List<OrderMessage> dequeue(long index) {
        // 30分鐘以前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }

    @Override
    public String enqueueSha(long index) {
        return ENQUEUE_LUA_SHA.get(index);
    }

    @Override
    public String dequeueSha(long index) {
        return DEQUEUE_LUA_SHA.get(index);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 加載Lua腳本
        loadLuaScript();
    }

    private void loadLuaScript() throws Exception {
        for (long i = 0; i < SHARDING_COUNT; i++) {
            try (Jedis jedis = jedisProvider.provide(i)) {
                ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
                String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                String sha = jedis.scriptLoad(luaContent);
                ENQUEUE_LUA_SHA.put(i, sha);
                resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
                luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                sha = jedis.scriptLoad(luaContent);
                DEQUEUE_LUA_SHA.put(i, sha);
            }
        }
    }
}

// 消費者
public class OrderMessageConsumer implements Job {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    // 初始化業務線程池
    private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex");
        LOGGER.info("訂單消息消費者定時任務開始執行,shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null != dequeue) {
            // 這裏的倒數柵欄,在線程池資源充足的前提下能夠去掉
            final CountDownLatch latch = new CountDownLatch(1);
            businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("訂單消息消費者定時任務執行完畢,shardingIndex:[{}]...", shardingIndex);
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final long shardingIndex;

        @Override
        public void run() {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("shardingIndex:[{}],處理訂單消息,內容:{}", shardingIndex, JSON.toJSONString(message));
                    // 模擬處理耗時50毫秒
                    TimeUnit.MILLISECONDS.sleep(50);
                }
            } catch (Exception ignore) {
            } finally {
                latch.countDown();
            }
        }
    }
}

// 配置
@Configuration
public class QuartzConfiguration {

    @Bean
    public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory() {
        return new AutowiredSupportQuartzJobFactory();
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setSchedulerName("RamScheduler");
        factory.setAutoStartup(true);
        factory.setJobFactory(autowiredSupportQuartzJobFactory);
        return factory;
    }

    public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware {

        private AutowireCapableBeanFactory autowireCapableBeanFactory;

        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }

        @Override
        protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            autowireCapableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
}

// CommandLineRunner
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void run(String... args) throws Exception {
        long shardingCount = 2;
        prepareData(shardingCount);
        for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
            scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
        }
    }

    private void prepareData(long shardingCount) {
        for (long i = 0L; i < shardingCount; i++) {
            Map<String, Double> z = Maps.newHashMap();
            Map<String, String> h = Maps.newHashMap();
            for (int k = 0; k < 100; k++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(k));
                message.setUserId((long) k);
                message.setOrderId("ORDER_ID_" + k);
                // 30 min ago
                z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                h.put(message.getOrderId(), JSON.toJSONString(message));
            }
            Jedis jedis = jedisProvider.provide(i);
            jedis.hmset("ORDER_DETAIL_QUEUE", h);
            jedis.zadd("ORDER_QUEUE", z);
        }
    }

    private List<ConsumerTask> prepareConsumerTasks(long shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (long i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }

    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {

        private final JobDetail jobDetail;
        private final Trigger trigger;
    }
}

新增一個啓動函數而且啓動,控制檯輸出以下:

// ...省略大量輸出
2019-09-01 14:08:27.664  INFO 13056 --- [           main] c.t.multi.NoneJdbcSpringApplication      : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352)
2019-09-01 14:08:27.724  INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer   : 訂單消息消費者定時任務開始執行,shardingIndex:[1]...
2019-09-01 14:08:27.724  INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer   : 訂單消息消費者定時任務開始執行,shardingIndex:[0]...
2019-09-01 14:08:27.732  INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[1],處理訂單消息,內容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.732  INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[0],處理訂單消息,內容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.782  INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[0],處理訂單消息,內容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
2019-09-01 14:08:27.782  INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[1],處理訂單消息,內容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
// ...省略大量輸出
2019-09-01 14:08:28.239  INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer   : 訂單消息消費者定時任務執行完畢,shardingIndex:[1]...
2019-09-01 14:08:28.240  INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer   : 訂單消息消費者定時任務執行完畢,shardingIndex:[0]...
// ...省略大量輸出

生產中應該避免Redis服務單點,通常經常使用哨兵配合樹狀主從的部署方式(參考《Redis開發與運維》),2套Redis哨兵的部署示意圖以下:

r-d-t-2nd-4

須要什麼監控項

咱們須要相對實時地知道Redis中的延時隊列集合有多少積壓數據,每次出隊的耗時大概是多少等等監控項參數,這樣咱們才能更好地知道延時隊列模塊是否正常運行、是否存在性能瓶頸等等。具體的監控項,須要按需定製,這裏爲了方便舉例,只作兩個監控項的監控:

  • 有序集合Sorted Set中積壓的元素數量。
  • 每次調用dequeue.lua的耗時。

採用的是應用實時上報數據的方式,依賴於spring-boot-starter-actuatorPrometheusGrafana搭建的監控體系,若是並不熟悉這個體系能夠看兩篇前置文章:

監控

引入依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
    <version>1.2.0</version>
</dependency>

這裏選用GaugeMeter進行監控數據收集,添加監控類OrderDelayQueueMonitor:。

// OrderDelayQueueMonitor
@Component
public class OrderDelayQueueMonitor implements InitializingBean {

    private static final long SHARDING_COUNT = 2L;
    private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap();
    private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap();
    private ScheduledExecutorService executor;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void afterPropertiesSet() throws Exception {
        executor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "OrderDelayQueueMonitor");
            thread.setDaemon(true);
            return thread;
        });
        for (long i = 0L; i < SHARDING_COUNT; i++) {
            AtomicLong l = new AtomicLong();
            Metrics.gauge("order.delay.queue.lua.cost", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    l, AtomicLong::get);
            lua.put(i, l);
            AtomicLong r = new AtomicLong();
            Metrics.gauge("order.delay.queue.remain", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    r, AtomicLong::get);
            remain.put(i, r);
        }
        // 每五秒上報一次集合中的剩餘數據
        executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0, 5, TimeUnit.SECONDS);
    }

    public void recordRemain(Long index, long count) {
        remain.get(index).set(count);
    }

    public void recordLuaCost(Long index, long count) {
        lua.get(index).set(count);
    }

    @RequiredArgsConstructor
    private class MonitorTask implements Runnable {

        private final JedisProvider jedisProvider;

        @Override
        public void run() {
            for (long i = 0L; i < SHARDING_COUNT; i++) {
                try (Jedis jedis = jedisProvider.provide(i)) {
                    recordRemain(i, jedis.zcount("ORDER_QUEUE", "-inf", "+inf"));
                }
            }
        }
    }
}

原來的RedisOrderDelayQueue#dequeue()進行改造:

@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
    
    // ... 省略沒有改動的代碼
    private final OrderDelayQueueMonitor orderDelayQueueMonitor;

    // ... 省略沒有改動的代碼

    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            long start = System.nanoTime();
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            long end = System.nanoTime();
            // 添加dequeue的耗時監控-單位微秒
            orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start));
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    } 

    // ... 省略沒有改動的代碼

}

其餘配置這裏簡單說一下。

application.yaml要開放prometheus端點的訪問權限:

server:
  port: 9091
management:
  endpoints:
    web:
      exposure:
        include: 'prometheus'

Prometheus服務配置儘可能減小查詢的間隔時間,暫定爲5秒:

# my global config
global:
  scrape_interval:     5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'
    metrics_path: '/actuator/prometheus'
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
    - targets: ['localhost:9091']

Grafana的基本配置項以下:

出隊耗時 order_delay_queue_lua_cost 分片編號-{{index}}
訂單延時隊列積壓量 order_delay_queue_remain 分片編號-{{index}}

最終能夠在Grafana配置每5秒刷新,見效果以下:

r-d-t-2nd-5

這裏的監控項更多時候應該按需定製,說實話,監控的工做每每是最複雜和繁瑣的。

小結

全文相對詳細地介紹了基於Redis實現延時任務的分片和監控的具體實施過程,核心代碼僅供參考,還有一些具體的細節例如PrometheusGrafana的一些應用,這裏限於篇幅不會詳細地展開。說實話,基於實際場景作一次中間件和架構的選型並非一件簡單的事,並且每每初期的實施並非最大的難點,更大的難題在後面的優化以及監控。

(本文完 c-3-d 20190901 身體不適,拖了一下)

原文連接

  • Github Page:http://throwable.club/2019/09/01/redis-delay-task-second/
  • Coding Page:http://throwable.coding.me/2019/09/01/redis-delay-task-second/
相關文章
相關標籤/搜索