有一個客戶有給雲音箱推送臨時廣播的需求。應用場景:商場、店鋪節日大促活動通知等。html
1.支持批量推送; 2.可按指定時間播報; 3.新設置的臨時廣播覆蓋舊設置的; 4.支持取消還沒有播報的臨時廣播;
延時消息的實現方案有不少(本文不作介紹,自行查找),考慮到現有項目中沒有集成可用於延時隊列的消息中間件等因素,決定採用Redis實現延時消息。redis
1.藉助SortedSet的score特性,將消息指定播報的時間timestamp減基線時間N做爲socre進行排序(N爲2020-01-01 00:00:00對應的秒數),member=DelayMessge({"msg":"廣播內容", "timestamp":"指定播報的時間", "appId":"雲音箱設備所屬的應用ID"}),key=DelayMessageSet。json
2.考慮到延時消息(DelayMessge)與設備(sn)是一對多的關係,sn須要單獨存儲,使用數據結構list;list的key=timestamp_appID_message,value=sn;(key由DelayMessage計算而來,保證惟一性);緩存
3.考慮到須要支持覆蓋(要求3)和取消操做(要求4),本方案增長一個redis緩存,存儲hash對象;
key=DelayMessageHash_(appID), filed=sn value={"score": "對應SortedSet中延時消息的分數", ..., "arg":"拓展字段"}。
注:「要求3」和「要求4」是客戶後期提的需求,但客戶制定的接口傳參中未設置消息id這個概念,消息和sn又是一對多的關係。覆蓋或取消操做時,咱們後臺服務沒法經過客戶的接口傳參找到以前推送的臨時廣播。因此採用hash存儲,覆蓋操做時,將hash表中對應sn的score從新hset便可;同理,取消操做時,將hash表中對應的sn刪除便可。數據結構
4.定時器Timer,每隔N分鐘檢查一次DelayMessageSet,若是有N分鐘內須要處理的延時消息DelayMessage,則添加延時任務,一條DelayMessage對應一個延時任務執行。app
5.延時任務執行時,計算list的key,調用lpop取出sn,再根據sn調用hget取到score,檢查對應score是否一致。分佈式
接收到客戶發起的延時推送廣播請求後,生產延時消息,若是timestamp - currentTIme > 定時器時間間隔,則直接將添加延時任務。ide
/** * 延時推送 */ private void pushDelay(int currentTime, DelayMessage delayMessage) { //此處無論 DelayTaskProducer producer = new DelayTaskProducer(delayMessage); producer.product(); long delay = delayMessage.getScore() + BASELINE_TIME - currentTime; int reloadInterval = SystemProperty.getIntProperty("config_delay_message_interval"); if(reloadInterval == 0) { //默認十分鐘 reloadInterval = 600; } if (delay <= reloadInterval){ logger.debug("消息延時時間小於定時檢查時間,直接添加延時任務"); try { String element = JSONObject.toJSONString(delayMessage); PushBatchExecutor.getScheduledThreadPoolExecutor().schedule(new DelayTaskConsumer(element), delay, TimeUnit.SECONDS); } catch (Exception e) { logger.error("直接添加延時任務出錯", e); } }
/** * 生產延時消息 */ public void product() { try { long score = delayMessage.getScore(); int companyId = delayMessage.getCompanyId(); String data = JSONObject.toJSONString(delayMessage); //將DelayMessage存入SortedSet RedisManager.zadd("DelayMessageSet", score, data); //listkey:(timestamp)_(companyId)_(message) String listKey = delayMessage.getRedisKey(); //hashKey:DelayMessageHash_(companyId) String hashKey = "DelayMessageHash_" + companyId; //存儲json,後期可加入拓展字段 JSONObject valueJson = new JSONObject(); //設置有效時間 long expiredTime = score + DelayTaskConstant.BASELINE_TIME - System.currentTimeMillis() / 1000 + 60; //map<sn, sn所屬機構> Map<String, String> map = delayMessage.getDeviceMap(); for (String sn: map.keySet()) { //調用rpush,從列表右邊開始放入元素 RedisManager.addListItem(redisKey, sn, (int) expiredTime); valueJson.put("score", score); //將設備號對應的score存入hash表中,便於延時消息覆蓋及取消等操做。 RedisManager.hset(hashKey, sn, valueJson.toJSONString()); } }catch (Exception e) { logger.error("延時消息生產失敗:", e); } }
包含過時消息清除優化
public class DelayMessageTimer implements Runnable{ private static final Logger logger = LoggerFactory.getLogger(DelayMessageTimer.class); private int interval; public DelayMessageTimer(int interval) { this.interval = interval; } @Override public void run() { try { logger.debug("開始延時消息檢查"); int currentTime = (int) (System.currentTimeMillis() / 1000); int startTime = currentTime - DelayTaskConstant.BASELINE_TIME; int endTime = startTime + interval; logger.debug("range:{}to:{}", startTime, endTime); //取出範圍內的數據zrangeByScoreWithScores Set<Tuple> tupleSet = RedisManager.zrangeByScoreWithScores("DelayMessageSet", startTime, endTime); if (tupleSet == null || tupleSet.isEmpty()) { logger.debug("當前沒有{}秒內須要執行的延時消息", interval); return; } logger.debug("取出{}秒內待處理延遲消息{}個", interval, tupleSet.size()); tupleSet.forEach(tuple -> { String element = tuple.getElement(); double delay = tuple.getScore() + DelayTaskConstant.BASELINE_TIME - currentTime ; logger.debug("{}秒後消費延時消息{}", delay, element); //執行延時任務 PushBatchExecutor.getScheduledThreadPoolExecutor().schedule(new DelayTaskConsumer(element), (long) delay, TimeUnit.SECONDS); }); //檢查過時的消息,須要刪除 Set<Tuple> expiredSet = RedisManager.zrangeByScoreWithScores("DelayMessageSet", 0d, startTime); if (expiredSet == null || expiredSet.isEmpty()) { logger.debug("沒有過時的延時消息"); } else { logger.debug("取出{}個過時消息,執行刪除element操做", expiredSet.size()); expiredSet.forEach(tuple -> RedisManager.zrem("DelayMessageSet", tuple.getElement())); } logger.debug("延時消息檢查結束"); } catch (Exception e) { logger.error("檢查延時消息出現錯誤", e); } }
延時消息消費時,須要判斷hash中sn對應的score與DelayMessage對應的score的一致性。this
public class DelayTaskConsumer implements Runnable{ private static final Logger logger = LoggerFactory.getLogger(DelayTaskConsumer.class); private String element; private DelayMessage delayMessage; public DelayTaskConsumer(String element) { this.element = element; } @Override public void run() { logger.debug("開始消費延時消息"); delayMessage = JSONObject.toJavaObject(JSON.parseObject(element), DelayMessage.class); //須要查詢設備 String key = delayMessage.getRedisKey(); String hashKey = "DelayMessageHash_" + delayMessage.getCompanyId(); while (true) { //移出並獲取列表的第一個元素,當列表不存在或者爲空時,返回Null //延時消息生產時,已進行設備號去重,此處從列表左邊一個一個的取出。 String sn= RedisManager.lpop(key); if (null == sn|| "nil".equals(sn)) { break; } //從hash表中取出filed爲sn的數據 String value = RedisManager.hget(hashKey, sn); if (null == value || "nil".equals(value)) { //表名執行過取消操做,跳過當前設備 continue; } //取出value,判斷score是否一致 JSONObject valueJson = JSON.parseObject(value); long score = valueJson.getLong("score"); if (delayMessage.getScore() != score) { //表名執行過覆蓋操做,跳過當前設備 logger.debug("score:{}不一致,跳過當前設備:{}", score, sn); continue; } //刪除filed RedisManager.hdel(hashKey, sn); //執行推送 pushAndCreateMessage(sn); } //將消費完的消息從redis中剔除 RedisManager.zrem("DelayMessageSet", element); logger.debug("消費延時消息完成"); } }
因爲客戶要求的特殊性,本文在實現延時消息時,同時使用了sortedset(用於排序、範圍查取)、list(利用lpop特性,避免分佈式場景中的重複消費)、hash(用於過濾設備)三種數據結構。雖然說方便覆蓋(hset設置新的消息對應的score)和取消(hdel對應的filed)未消費的延時消息,但數據存儲顯得冗餘。本方案徹底依賴Redis,暫未作消息持久化存貯。若是你們對方案優化有啥建議,還望不吝賜教。