基於Redis實現延時消息推送

背景

有一個客戶有給雲音箱推送臨時廣播的需求。應用場景:商場、店鋪節日大促活動通知等。html

要求

1.支持批量推送;
2.可按指定時間播報;
3.新設置的臨時廣播覆蓋舊設置的;
4.支持取消還沒有播報的臨時廣播;

方案選型

延時消息的實現方案有不少(本文不作介紹,自行查找),考慮到現有項目中沒有集成可用於延時隊列的消息中間件等因素,決定採用Redis實現延時消息。redis

方案設計

1.藉助SortedSet的score特性,將消息指定播報的時間timestamp減基線時間N做爲socre進行排序(N爲2020-01-01 00:00:00對應的秒數),member=DelayMessge({"msg":"廣播內容", "timestamp":"指定播報的時間", "appId":"雲音箱設備所屬的應用ID"}),key=DelayMessageSet。json

2.考慮到延時消息(DelayMessge)與設備(sn)是一對多的關係,sn須要單獨存儲,使用數據結構list;list的key=timestamp_appID_message,value=sn;(key由DelayMessage計算而來,保證惟一性);緩存

3.考慮到須要支持覆蓋(要求3)和取消操做(要求4),本方案增長一個redis緩存,存儲hash對象;
key=DelayMessageHash_(appID), filed=sn value={"score": "對應SortedSet中延時消息的分數", ..., "arg":"拓展字段"}。
注:「要求3」和「要求4」是客戶後期提的需求,但客戶制定的接口傳參中未設置消息id這個概念,消息和sn又是一對多的關係。覆蓋或取消操做時,咱們後臺服務沒法經過客戶的接口傳參找到以前推送的臨時廣播。因此採用hash存儲,覆蓋操做時,將hash表中對應sn的score從新hset便可;同理,取消操做時,將hash表中對應的sn刪除便可。數據結構

4.定時器Timer,每隔N分鐘檢查一次DelayMessageSet,若是有N分鐘內須要處理的延時消息DelayMessage,則添加延時任務,一條DelayMessage對應一個延時任務執行。app

5.延時任務執行時,計算list的key,調用lpop取出sn,再根據sn調用hget取到score,檢查對應score是否一致。分佈式

方案實施

1、生產延時消息

接收到客戶發起的延時推送廣播請求後,生產延時消息,若是timestamp - currentTIme > 定時器時間間隔,則直接將添加延時任務。ide

/**
     * 延時推送
     */
    private void pushDelay(int currentTime, DelayMessage delayMessage) {
        //此處無論
        DelayTaskProducer producer = new DelayTaskProducer(delayMessage);
        producer.product();

        long delay = delayMessage.getScore() + BASELINE_TIME - currentTime;
        int reloadInterval = SystemProperty.getIntProperty("config_delay_message_interval");
        if(reloadInterval == 0) {
            //默認十分鐘
            reloadInterval = 600;
        }
        if (delay <= reloadInterval){
            logger.debug("消息延時時間小於定時檢查時間,直接添加延時任務");
            try {
                String element = JSONObject.toJSONString(delayMessage);
                PushBatchExecutor.getScheduledThreadPoolExecutor().schedule(new DelayTaskConsumer(element), delay, TimeUnit.SECONDS);
            } catch (Exception e) {
                logger.error("直接添加延時任務出錯", e);
            }
        }
/**
  * 生產延時消息
  */
  public void product() {
        try {
            long score = delayMessage.getScore();
            int companyId = delayMessage.getCompanyId();
            String data = JSONObject.toJSONString(delayMessage);
            //將DelayMessage存入SortedSet
            RedisManager.zadd("DelayMessageSet", score, data);
            //listkey:(timestamp)_(companyId)_(message)
            String listKey = delayMessage.getRedisKey();
            //hashKey:DelayMessageHash_(companyId)
            String hashKey = "DelayMessageHash_" + companyId;
            //存儲json,後期可加入拓展字段
            JSONObject valueJson = new JSONObject();
            //設置有效時間
            long expiredTime = score + DelayTaskConstant.BASELINE_TIME - System.currentTimeMillis() / 1000 + 60;
            //map<sn, sn所屬機構>
            Map<String, String> map = delayMessage.getDeviceMap();
            for (String sn: map.keySet()) {
                 //調用rpush,從列表右邊開始放入元素
                RedisManager.addListItem(redisKey, sn, (int) expiredTime);
                valueJson.put("score", score);
                //將設備號對應的score存入hash表中,便於延時消息覆蓋及取消等操做。
                RedisManager.hset(hashKey, sn, valueJson.toJSONString());
            }
        }catch (Exception e) {
            logger.error("延時消息生產失敗:", e);
        }
  }

2、定時器

包含過時消息清除優化

public class DelayMessageTimer implements Runnable{
    private static final Logger logger = LoggerFactory.getLogger(DelayMessageTimer.class);
    private int interval;
    public DelayMessageTimer(int interval) {
        this.interval = interval;
    }
    @Override
    public void run() {
        try {
            logger.debug("開始延時消息檢查");
            int currentTime = (int) (System.currentTimeMillis() / 1000);
            int startTime = currentTime - DelayTaskConstant.BASELINE_TIME;
            int endTime = startTime + interval;
            logger.debug("range:{}to:{}", startTime, endTime);
            //取出範圍內的數據zrangeByScoreWithScores
            Set<Tuple> tupleSet = RedisManager.zrangeByScoreWithScores("DelayMessageSet", startTime, endTime);
            if (tupleSet == null || tupleSet.isEmpty()) {
                logger.debug("當前沒有{}秒內須要執行的延時消息", interval);
                return;
            }
            logger.debug("取出{}秒內待處理延遲消息{}個", interval, tupleSet.size());
            tupleSet.forEach(tuple -> {
                String element = tuple.getElement();
                double delay = tuple.getScore() + DelayTaskConstant.BASELINE_TIME - currentTime ;
                logger.debug("{}秒後消費延時消息{}", delay, element);
                //執行延時任務
                PushBatchExecutor.getScheduledThreadPoolExecutor().schedule(new DelayTaskConsumer(element), (long) delay, TimeUnit.SECONDS);
            });
            //檢查過時的消息,須要刪除
            Set<Tuple> expiredSet = RedisManager.zrangeByScoreWithScores("DelayMessageSet", 0d, startTime);
            if (expiredSet == null || expiredSet.isEmpty()) {
                logger.debug("沒有過時的延時消息");
            } else {
                logger.debug("取出{}個過時消息,執行刪除element操做", expiredSet.size());
                expiredSet.forEach(tuple -> RedisManager.zrem("DelayMessageSet", tuple.getElement()));
            }
            logger.debug("延時消息檢查結束");
        } catch (Exception e) {
            logger.error("檢查延時消息出現錯誤", e);
        }
    }

3、消費延時消息

延時消息消費時,須要判斷hash中sn對應的score與DelayMessage對應的score的一致性。this

public class DelayTaskConsumer implements Runnable{
    private static final Logger logger = LoggerFactory.getLogger(DelayTaskConsumer.class);
    private String element;
    private DelayMessage delayMessage;
    public DelayTaskConsumer(String element) {
        this.element = element;
    }
    @Override
    public void run() {
        logger.debug("開始消費延時消息");
        delayMessage = JSONObject.toJavaObject(JSON.parseObject(element), DelayMessage.class);
        //須要查詢設備
        String key = delayMessage.getRedisKey();
        String hashKey = "DelayMessageHash_" + delayMessage.getCompanyId();
        while (true) {
            //移出並獲取列表的第一個元素,當列表不存在或者爲空時,返回Null
            //延時消息生產時,已進行設備號去重,此處從列表左邊一個一個的取出。
            String sn= RedisManager.lpop(key);
            if (null == sn|| "nil".equals(sn)) {
                break;
            }
            //從hash表中取出filed爲sn的數據
            String value = RedisManager.hget(hashKey, sn);
            if (null == value || "nil".equals(value)) {
                //表名執行過取消操做,跳過當前設備
                continue;
            }
            //取出value,判斷score是否一致
            JSONObject valueJson = JSON.parseObject(value);
            long score = valueJson.getLong("score");
            if (delayMessage.getScore() != score) {
                //表名執行過覆蓋操做,跳過當前設備
                logger.debug("score:{}不一致,跳過當前設備:{}", score, sn);
                continue;
            }
            //刪除filed
            RedisManager.hdel(hashKey, sn);
            //執行推送
            pushAndCreateMessage(sn);
        }
        //將消費完的消息從redis中剔除
         RedisManager.zrem("DelayMessageSet", element);
        logger.debug("消費延時消息完成");
    }
}

延時消息方案流程圖

基於Redis的延時消息實現方案

總結

因爲客戶要求的特殊性,本文在實現延時消息時,同時使用了sortedset(用於排序、範圍查取)、list(利用lpop特性,避免分佈式場景中的重複消費)、hash(用於過濾設備)三種數據結構。雖然說方便覆蓋(hset設置新的消息對應的score)和取消(hdel對應的filed)未消費的延時消息,但數據存儲顯得冗餘。本方案徹底依賴Redis,暫未作消息持久化存貯。若是你們對方案優化有啥建議,還望不吝賜教。

參考

Redis真的那麼好用嗎?
如何用 Redis 實現延時任務?

相關文章
相關標籤/搜索