如何利用Redis實現延時處理

背景

最近須要作一個延時處理的功能,主要是從kafka中消費消息後根據消息中的某個延時字段來進行延時處理,在實際的實現過程當中有一些須要注意的地方,記錄以下。java

實現過程

說到java中的定時功能,首先想到的Timer和ScheduledThreadPoolExecutor,可是相比之下Timer能夠排除,主要緣由有如下幾點:redis

  • Timer使用的是絕對時間,系統時間的改變會對Timer產生必定的影響;而ScheduledThreadPoolExecutor使用的是相對時間,因此不會有這個問題。
  • Timer使用單線程來處理任務,長時間運行的任務會致使其餘任務的延時處理,而ScheduledThreadPoolExecutor能夠自定義線程數量。
  • Timer沒有對運行時異常進行處理,一旦某個任務觸發運行時異常,會致使整個Timer崩潰,而ScheduledThreadPoolExecutor對運行時異常作了捕獲(能夠在 afterExecute() 回調方法中進行處理),因此更加安全。
  1. ScheduledThreadPoolExecutor 決定了用ScheduledThreadPoolExecutor來進行實現,接下來就是代碼編寫啦(大致流程代碼)。 主要的延時實現以下:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new 
ThreadPoolExecutor.AbortPolicy());
//從消息中取出延遲時間及相關信息的代碼略
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            //具體操做邏輯
        }},0,delayTime, TimeUnit.SECONDS);
複製代碼

其中NamedThreadFactory是我自定義的一個線程工廠,主要給線程池定義名稱及相關日誌打印便於後續的問題分析,這裏就很少作介紹了。拒絕策略也是採用默認的拒絕策略。 而後測試了一下,知足目標需求的功能,能夠作到延遲指定時間後執行,至此彷佛功能就被完成了。 你們可能疑問,這也太簡單了有什麼好說的,可是這種方式實現簡單是簡單可是存在一個潛在的問題,問題在哪呢,讓咱們看一下ScheduledThreadPoolExecutor的源碼:api

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, 
    TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
複製代碼

ScheduledThreadPoolExecutor 因爲它自身的延時和週期的特性,默認使用了DelayWorkQueue,而並不像咱們平時使用的SingleThreadExecutor等構造是可使用本身定義的LinkedBlockingQueue而且設置隊列大小,問題就出在這裏。DelayWrokQueue是一個無界隊列,而咱們的目標數據源是kafka,也就是一個高併發高吞吐的消息隊列,很大可能在某一時間段有大量的消息過來從而致使OOM,在使用多線程時咱們是確定要考慮到OOM的可能性的,由於OOM帶來的後果每每比較嚴重,系統OOM臨時的解決辦法通常只能是重啓,可能會致使用戶數據丟失等不可能挽回的問題,因此從編碼設計階段要採用儘量穩妥的手段來避免這些問題。安全

  1. 採用redis和線程結合,這一次換了思路,採用redis來幫助咱們作緩衝,從而避免消息過多OOM的問題。 相關redis zset api:
//添加元素
ZADD key score member [[score member] [score member] …]
//根據分值及限制數量查詢
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
//從zset中刪除指定成員
ZREM key member [member …]
複製代碼

咱們採用redis基礎數據結構的zset結構,採用score來存儲咱們目標發送時間的數值,總體處理流程以下:bash

  • 第一步數據存儲:9:10分從kafka接收了一條a的訂單消息,要求30分鐘後進行發貨通知,那咱們就將當前時間加上30分鐘而後轉爲時間戳做爲a的score,key爲a的訂單號存入redis中。代碼以下:
public void onMessage(String topic, String message) {
        String orderId;
		int delayTime = 0;
        try {
            Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
            }.getType());
            if (msgMap.isEmpty()) {
                return;
            }
            LOGGER.info("onMessage kafka content:{}", msgMap.toString());
	    orderId = msgMap.get("orderId");
            if(StringUtils.isNotEmpty(orderId)){
                delayTime = Integer.parseInt(msgMap.get("delayTime"));
                Calendar calendar = Calendar.getInstance();
                //計算出預計發送時間
                calendar.add(Calendar.MINUTE, delayTime);
                long sendTime = calendar.getTimeInMillis();
                RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
                LOGGER.info("orderId:{}---放入redis中等待發送---sendTime:{}", ---orderId:{}, sendTime);
            }
        } catch (Exception e) {
            LOGGER.info("onMessage 延時發送異常:{}", e);
        }
    }

複製代碼
  • 第二步數據處理:另起一個線程具體調度時間根據業務需求來定,我這裏3分鐘執行一次,內部邏輯:從redis中取出必定量的zset數據,如何取呢,使用zset的zrangeByScore方法,根據數據的score進行排序,固然能夠帶上時間段,這裏從0到如今,來進行消費,須要注意的一點是,在取出數據後咱們須要用zrem方法將取出的數據從zset中刪除,防止其餘線程重複消費數據。在此以後進行接下來的發貨通知等相關邏輯。代碼以下:
public void run(){
        //獲取批量大小
        int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
        try {
            //批量獲取離發送時間最近的orderNum條數據
	    Calendar calendar = Calendar.getInstance();
	    long now = calendar.getTimeInMillis();
	    //獲取無限早到如今的事件key(防止上次批量數量小於放入數量,存在歷史數據未消費狀況)
	    Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
	    LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
            if (CollectionUtils.isNotEmpty(orders)){
                //刪除key 防止重複發送
                for (String orderId : orderIds) {
                    RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
                }
	        //接下來執行發送等業務邏輯                 
            }
        } catch (Exception e) {
            LOGGER.warn("task.run exception:{}", e);
        }
    }
複製代碼

至此完成了依賴redis和線程完成了延時發送的功能。數據結構

結語

那麼對上面兩種不一樣的實現方式進行一下優缺點比較:多線程

  • 第一種方式實現簡單,不依賴外部組件,可以快速的實現目標功能,但缺點也很明顯,須要在特定的場景下使用,若是是我這種消息量大的狀況下使用極可能是有問題,固然在數據源消息很少的狀況下不失爲好的選擇。併發

  • 第二種方式實現稍微複雜一點,可是可以適應消息量大的場景,採用redis的zset做爲了「中間件」的效果,而且幫助咱們進行延時的功能實現可以較好的適應高併發場景,缺點在於在編寫的過程當中須要考慮實際的因素較多,例如線程的執行週期時間,發送可能會有必定時間的延遲,批量數據大小的設置等等。ide

綜上是本人此次延時功能的實現過程的兩種實現方式的總結,具體採用哪一種方式還需你們根據實際狀況選擇,但願能給你們帶來幫助。ps:因爲本人的技術能力有限,文章中可能出現技術描述不許確或者錯誤的狀況懇請各位大佬指出,我立馬進行改正,避免誤導你們,謝謝!高併發

相關文章
相關標籤/搜索