前一篇文章經過Redis
的有序集合Sorted Set
和調度框架Quartz
實例一版簡單的延時任務,可是有兩個相對重要的問題沒有解決:java
這篇文章的內容就是要完善這兩個方面的功能。前置文章:使用Redis實現延時任務(一)。web
這裏從新貼一下查詢腳本dequeue.lua
的內容:redis
-- 參考jesque的部分Lua腳本實現 local zset_key = KEYS[1] local hash_key = KEYS[2] local min_score = ARGV[1] local max_score = ARGV[2] local offset = ARGV[3] local limit = ARGV[4] -- TYPE命令的返回結果是{'ok':'zset'}這樣子,這裏利用next作一輪迭代 local status, type = next(redis.call('TYPE', zset_key)) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit) if list ~= nil and #list > 0 then -- unpack函數能把table轉化爲可變參數 redis.call('ZREM', zset_key, unpack(list)) local result = redis.call('HMGET', hash_key, unpack(list)) redis.call('HDEL', hash_key, unpack(list)) return result end end end return nil
這個腳本一共用到了四個命令ZREVRANGEBYSCORE
、ZREM
、HMGET
和HDEL
(TYPE
命令的時間複雜度能夠忽略):spring
命令 | 時間複雜度 | 參數說明 |
---|---|---|
ZREVRANGEBYSCORE |
O(log(N)+M) |
N 是有序集合中的元素總數,M 是返回的元素的數量 |
ZREM |
O(M*log(N)) |
N 是有序集合中的元素總數,M 是成功移除的元素的數量 |
HMGET |
O(L) |
L 是成功返回的域的數量 |
HDEL |
O(L) |
L 是要刪除的域的數量 |
接下來須要結合場景和具體參數分析,假如在生產環境,有序集合的元素總量維持在10000每小時(也就是說業務量是每小時下單1萬筆),因爲查詢Sorted Set
和Hash
的數據同時作了刪除,那麼30分鐘內常駐在這兩個集合中的數據有5000條,也就是上面表中的N = 5000
。假設咱們初步定義查詢的LIMIT
值爲100,也就是上面的M
值爲100,假設Redis
中每一個操做單元的耗時簡單認爲是T
,那麼分析一下5000條數據處理的耗時:shell
序號 | 集合基數 | ZREVRANGEBYSCORE |
ZREM |
HMGET |
HDEL |
---|---|---|---|---|---|
1 | 5000 |
log(5000T) + 100T |
log(5000T) * 100 |
100T |
100T |
2 | 4900 |
log(4900T) + 100T |
log(4900T) * 100 |
100T |
100T |
3 | 4800 |
log(4800T) + 100T |
log(4800T) * 100 |
100T |
100T |
... | ... | ... | ... | ... | ... |
理論上,腳本用到的四個命令中,ZREM
命令的耗時是最大的,而ZREVRANGEBYSCORE
和ZREM
的時間複雜度函數都是M * log(N)
,所以控制集合元素基數N
對於下降Lua
腳本運行的耗時是有必定幫助的。架構
上面分析了dequeue.lua
的時間複雜度,準備好的分片方案有兩個:app
Redis
實例,對Sorted Set
和Hash
兩個集合的數據進行分片。Redis
實例(能夠是哨兵或者集羣),實施方案一的分片操做。爲了簡單起見,後面的例子中分片的數量(shardingCount
)設計爲2,生產中分片數量應該根據實際狀況定製。預設使用長整型的用戶ID字段userId
取模進行分片,假定測試數據中的userId
是均勻分佈的。框架
通用實體:運維
@Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; private String timestamp; }
延遲隊列接口:jvm
public interface OrderDelayQueue { void enqueue(OrderMessage message); List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index); List<OrderMessage> dequeue(int index); String enqueueSha(); String dequeueSha(); }
單Redis
實例分片比較簡單,示意圖以下:
編寫隊列實現代碼以下(部分參數寫死,僅供參考,切勿照搬到生產中):
@RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { private static final String MIN_SCORE = "0"; private static final String OFFSET = "0"; private static final String LIMIT = "10"; /** * 分片數量 */ private static final long SHARDING_COUNT = 2L; private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_"; private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_"; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua"; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua"; private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>(); private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>(); private final JedisProvider jedisProvider; @Override public void enqueue(OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis())); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); List<String> keys = Lists.newArrayList(); long index = message.getUserId() % SHARDING_COUNT; keys.add(ORDER_QUEUE_PREFIX + index); keys.add(ORDER_DETAIL_QUEUE_PREFIX + index); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args); } } @Override public List<OrderMessage> dequeue(int index) { // 30分鐘以前 String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE_PREFIX + index); keys.add(ORDER_DETAIL_QUEUE_PREFIX + index); try (Jedis jedis = jedisProvider.provide()) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha() { return ENQUEUE_LUA_SHA.get(); } @Override public String dequeueSha() { return DEQUEUE_LUA_SHA.get(); } @Override public void afterPropertiesSet() throws Exception { // 加載Lua腳本 loadLuaScript(); } private void loadLuaScript() throws Exception { try (Jedis jedis = jedisProvider.provide()) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.compareAndSet(null, sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.compareAndSet(null, sha); } } }
消費者定時任務的實現以下:
DisallowConcurrentExecution @Component public class OrderMessageConsumer implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); private static final AtomicInteger COUNTER = new AtomicInteger(); /** * 初始化業務線程池 */ private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute(JobExecutionContext context) throws JobExecutionException { // 這裏爲了簡單起見,分片的下標暫時使用Quartz的任務執行上下文存放 int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex"); LOGGER.info("訂單消息消費者定時任務開始執行,shardingIndex:[{}]...", shardingIndex); List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex); if (null != dequeue) { final CountDownLatch latch = new CountDownLatch(1); BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex)); try { latch.await(); } catch (InterruptedException ignore) { //ignore } } LOGGER.info("訂單消息消費者定時任務執行完畢,shardingIndex:[{}]...", shardingIndex); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final CountDownLatch latch; private final List<OrderMessage> messages; private final int shardingIndex; @Override public void run() { try { for (OrderMessage message : messages) { LOGGER.info("shardingIndex:[{}],處理訂單消息,內容:{}", shardingIndex, JSON.toJSONString(message)); // 模擬耗時 TimeUnit.MILLISECONDS.sleep(50); } } catch (Exception ignore) { } finally { latch.countDown(); } } } }
啓動定時任務和寫入測試數據的CommandLineRunner
實現以下:
@Component public class QuartzJobStartCommandLineRunner implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; @Override public void run(String... args) throws Exception { int shardingCount = 2; // 準備測試數據 prepareOrderMessageData(shardingCount); for (ConsumerTask task : prepareConsumerTasks(shardingCount)) { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); } } private void prepareOrderMessageData(int shardingCount) throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); try (Jedis jedis = jedisProvider.provide()) { List<OrderMessage> messages = Lists.newArrayList(); for (int i = 0; i < 100; i++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(i)); message.setOrderId("ORDER_ID_" + i); message.setUserId((long) i); message.setTimestamp(LocalDateTime.now().format(f)); messages.add(message); } for (OrderMessage message : messages) { // 30分鐘前 Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)); long index = message.getUserId() % shardingCount; jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message)); jedis.zadd("ORDER_QUEUE_" + index, score, message.getOrderId()); } } } private List<ConsumerTask> prepareConsumerTasks(int shardingCount) { List<ConsumerTask> tasks = Lists.newArrayList(); for (int i = 0; i < shardingCount; i++) { JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer-" + i, "DelayTask") .usingJobData("shardingIndex", i) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask") .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()) .build(); tasks.add(new ConsumerTask(jobDetail, trigger)); } return tasks; } @Getter @RequiredArgsConstructor private static class ConsumerTask { private final JobDetail jobDetail; private final Trigger trigger; } }
啓動應用,輸出以下:
2019-08-28 00:13:20.648 INFO 50248 --- [ main] c.t.s.s.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109) 2019-08-28 00:13:20.780 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 訂單消息消費者定時任務開始執行,shardingIndex:[0]... 2019-08-28 00:13:20.781 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 訂單消息消費者定時任務開始執行,shardingIndex:[1]... 2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],處理訂單消息,內容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99} 2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],處理訂單消息,內容:{"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98} 2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],處理訂單消息,內容:{"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97} 2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],處理訂單消息,內容:{"amount":96,"orderId":"ORDER_ID_96","timestamp":"2019-08-28 00:13:20.657","userId":96} // ... 省略大量輸出 2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 訂單消息消費者定時任務執行完畢,shardingIndex:[0]... 2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 訂單消息消費者定時任務執行完畢,shardingIndex:[1]... // ... 省略大量輸出
單Redis
實例分片其實存在一個問題,就是Redis
實例老是單線程處理客戶端的命令,即便客戶端是多個線程執行Redis
命令,示意圖以下:
這種狀況下,雖然經過分片下降了Lua
腳本命令的複雜度,可是Redis
的命令處理模型(單線程)也有可能成爲另外一個性能瓶頸隱患。所以,能夠考慮基於多Redis
實例進行分片。
這裏爲了簡單起見,用兩個單點的Redis
實例作編碼示例。代碼以下:
// Jedis提供者 @Component public class JedisProvider implements InitializingBean { private final Map<Long, JedisPool> pools = Maps.newConcurrentMap(); private JedisPool defaultPool; @Override public void afterPropertiesSet() throws Exception { JedisPool pool = new JedisPool("localhost"); defaultPool = pool; pools.put(0L, pool); // 這個是虛擬機上的redis實例 pool = new JedisPool("192.168.56.200"); pools.put(1L, pool); } public Jedis provide(Long index) { return pools.getOrDefault(index, defaultPool).getResource(); } } // 訂單消息 @Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; } // 訂單延時隊列接口 public interface OrderDelayQueue { void enqueue(OrderMessage message); List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index); List<OrderMessage> dequeue(long index); String enqueueSha(long index); String dequeueSha(long index); } // 延時隊列實現 @RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { private static final String MIN_SCORE = "0"; private static final String OFFSET = "0"; private static final String LIMIT = "10"; private static final long SHARDING_COUNT = 2L; private static final String ORDER_QUEUE = "ORDER_QUEUE"; private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE"; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua"; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua"; private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap(); private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap(); private final JedisProvider jedisProvider; @Override public void enqueue(OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); List<String> keys = Lists.newArrayList(); long index = message.getUserId() % SHARDING_COUNT; keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args); } } @Override public List<OrderMessage> dequeue(long index) { // 30分鐘以前 String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha(long index) { return ENQUEUE_LUA_SHA.get(index); } @Override public String dequeueSha(long index) { return DEQUEUE_LUA_SHA.get(index); } @Override public void afterPropertiesSet() throws Exception { // 加載Lua腳本 loadLuaScript(); } private void loadLuaScript() throws Exception { for (long i = 0; i < SHARDING_COUNT; i++) { try (Jedis jedis = jedisProvider.provide(i)) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.put(i, sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.put(i, sha); } } } } // 消費者 public class OrderMessageConsumer implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); private static final AtomicInteger COUNTER = new AtomicInteger(); // 初始化業務線程池 private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute(JobExecutionContext context) throws JobExecutionException { long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex"); LOGGER.info("訂單消息消費者定時任務開始執行,shardingIndex:[{}]...", shardingIndex); List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex); if (null != dequeue) { // 這裏的倒數柵欄,在線程池資源充足的前提下能夠去掉 final CountDownLatch latch = new CountDownLatch(1); businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex)); try { latch.await(); } catch (InterruptedException ignore) { //ignore } } LOGGER.info("訂單消息消費者定時任務執行完畢,shardingIndex:[{}]...", shardingIndex); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final CountDownLatch latch; private final List<OrderMessage> messages; private final long shardingIndex; @Override public void run() { try { for (OrderMessage message : messages) { LOGGER.info("shardingIndex:[{}],處理訂單消息,內容:{}", shardingIndex, JSON.toJSONString(message)); // 模擬處理耗時50毫秒 TimeUnit.MILLISECONDS.sleep(50); } } catch (Exception ignore) { } finally { latch.countDown(); } } } } // 配置 @Configuration public class QuartzConfiguration { @Bean public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory() { return new AutowiredSupportQuartzJobFactory(); } @Bean public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setSchedulerName("RamScheduler"); factory.setAutoStartup(true); factory.setJobFactory(autowiredSupportQuartzJobFactory); return factory; } public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory; } @Override protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } } } // CommandLineRunner @Component public class QuartzJobStartCommandLineRunner implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; @Override public void run(String... args) throws Exception { long shardingCount = 2; prepareData(shardingCount); for (ConsumerTask task : prepareConsumerTasks(shardingCount)) { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); } } private void prepareData(long shardingCount) { for (long i = 0L; i < shardingCount; i++) { Map<String, Double> z = Maps.newHashMap(); Map<String, String> h = Maps.newHashMap(); for (int k = 0; k < 100; k++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(k)); message.setUserId((long) k); message.setOrderId("ORDER_ID_" + k); // 30 min ago z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000))); h.put(message.getOrderId(), JSON.toJSONString(message)); } Jedis jedis = jedisProvider.provide(i); jedis.hmset("ORDER_DETAIL_QUEUE", h); jedis.zadd("ORDER_QUEUE", z); } } private List<ConsumerTask> prepareConsumerTasks(long shardingCount) { List<ConsumerTask> tasks = Lists.newArrayList(); for (long i = 0; i < shardingCount; i++) { JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer-" + i, "DelayTask") .usingJobData("shardingIndex", i) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask") .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()) .build(); tasks.add(new ConsumerTask(jobDetail, trigger)); } return tasks; } @Getter @RequiredArgsConstructor private static class ConsumerTask { private final JobDetail jobDetail; private final Trigger trigger; } }
新增一個啓動函數而且啓動,控制檯輸出以下:
// ...省略大量輸出 2019-09-01 14:08:27.664 INFO 13056 --- [ main] c.t.multi.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352) 2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 訂單消息消費者定時任務開始執行,shardingIndex:[1]... 2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 訂單消息消費者定時任務開始執行,shardingIndex:[0]... 2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],處理訂單消息,內容:{"amount":99,"orderId":"ORDER_ID_99","userId":99} 2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],處理訂單消息,內容:{"amount":99,"orderId":"ORDER_ID_99","userId":99} 2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],處理訂單消息,內容:{"amount":98,"orderId":"ORDER_ID_98","userId":98} 2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],處理訂單消息,內容:{"amount":98,"orderId":"ORDER_ID_98","userId":98} // ...省略大量輸出 2019-09-01 14:08:28.239 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 訂單消息消費者定時任務執行完畢,shardingIndex:[1]... 2019-09-01 14:08:28.240 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 訂單消息消費者定時任務執行完畢,shardingIndex:[0]... // ...省略大量輸出
生產中應該避免Redis
服務單點,通常經常使用哨兵配合樹狀主從的部署方式(參考《Redis開發與運維》),2套Redis
哨兵的部署示意圖以下:
咱們須要相對實時地知道Redis
中的延時隊列集合有多少積壓數據,每次出隊的耗時大概是多少等等監控項參數,這樣咱們才能更好地知道延時隊列模塊是否正常運行、是否存在性能瓶頸等等。具體的監控項,須要按需定製,這裏爲了方便舉例,只作兩個監控項的監控:
Sorted Set
中積壓的元素數量。dequeue.lua
的耗時。採用的是應用實時上報數據的方式,依賴於spring-boot-starter-actuator
、Prometheus
、Grafana
搭建的監控體系,若是並不熟悉這個體系能夠看兩篇前置文章:
引入依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> <version>1.2.0</version> </dependency>
這裏選用Gauge
的Meter
進行監控數據收集,添加監控類OrderDelayQueueMonitor
:。
// OrderDelayQueueMonitor @Component public class OrderDelayQueueMonitor implements InitializingBean { private static final long SHARDING_COUNT = 2L; private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap(); private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap(); private ScheduledExecutorService executor; @Autowired private JedisProvider jedisProvider; @Override public void afterPropertiesSet() throws Exception { executor = Executors.newSingleThreadScheduledExecutor(r -> { Thread thread = new Thread(r, "OrderDelayQueueMonitor"); thread.setDaemon(true); return thread; }); for (long i = 0L; i < SHARDING_COUNT; i++) { AtomicLong l = new AtomicLong(); Metrics.gauge("order.delay.queue.lua.cost", Collections.singleton(Tag.of("index", String.valueOf(i))), l, AtomicLong::get); lua.put(i, l); AtomicLong r = new AtomicLong(); Metrics.gauge("order.delay.queue.remain", Collections.singleton(Tag.of("index", String.valueOf(i))), r, AtomicLong::get); remain.put(i, r); } // 每五秒上報一次集合中的剩餘數據 executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0, 5, TimeUnit.SECONDS); } public void recordRemain(Long index, long count) { remain.get(index).set(count); } public void recordLuaCost(Long index, long count) { lua.get(index).set(count); } @RequiredArgsConstructor private class MonitorTask implements Runnable { private final JedisProvider jedisProvider; @Override public void run() { for (long i = 0L; i < SHARDING_COUNT; i++) { try (Jedis jedis = jedisProvider.provide(i)) { recordRemain(i, jedis.zcount("ORDER_QUEUE", "-inf", "+inf")); } } } } }
原來的RedisOrderDelayQueue#dequeue()
進行改造:
@RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { // ... 省略沒有改動的代碼 private final OrderDelayQueueMonitor orderDelayQueueMonitor; // ... 省略沒有改動的代碼 @Override public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { long start = System.nanoTime(); List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args); long end = System.nanoTime(); // 添加dequeue的耗時監控-單位微秒 orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start)); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } // ... 省略沒有改動的代碼 }
其餘配置這裏簡單說一下。
application.yaml
要開放prometheus
端點的訪問權限:
server: port: 9091 management: endpoints: web: exposure: include: 'prometheus'
Prometheus
服務配置儘可能減小查詢的間隔時間,暫定爲5秒:
# my global config global: scrape_interval: 5s # Set the scrape interval to every 15 seconds. Default is every 1 minute. evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. # scrape_timeout is set to the global default (10s). # Alertmanager configuration alerting: alertmanagers: - static_configs: - targets: # - alertmanager:9093 # Load rules once and periodically evaluate them according to the global 'evaluation_interval'. rule_files: # - "first_rules.yml" # - "second_rules.yml" # A scrape configuration containing exactly one endpoint to scrape: # Here it's Prometheus itself. scrape_configs: # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config. - job_name: 'prometheus' metrics_path: '/actuator/prometheus' # metrics_path defaults to '/metrics' # scheme defaults to 'http'. static_configs: - targets: ['localhost:9091']
Grafana
的基本配置項以下:
出隊耗時 order_delay_queue_lua_cost 分片編號-{{index}} 訂單延時隊列積壓量 order_delay_queue_remain 分片編號-{{index}}
最終能夠在Grafana
配置每5秒刷新,見效果以下:
這裏的監控項更多時候應該按需定製,說實話,監控的工做每每是最複雜和繁瑣的。
全文相對詳細地介紹了基於Redis
實現延時任務的分片和監控的具體實施過程,核心代碼僅供參考,還有一些具體的細節例如Prometheus
、Grafana
的一些應用,這裏限於篇幅不會詳細地展開。說實話,基於實際場景作一次中間件和架構的選型並非一件簡單的事,並且每每初期的實施並非最大的難點,更大的難題在後面的優化以及監控。
(本文完 c-3-d 20190901 身體不適,拖了一下)