所謂延時隊列就是延時的消息隊列,下面說一下一些業務場景前端
實踐場景web
訂單支付失敗,每隔一段時間提醒用戶redis
用戶併發量的狀況,能夠延時2分鐘給用戶發短信算法
先來看看Redis實現普通的消息隊列服務器
咱們知道,對於專業的消息隊列中間件,如Kafka和RabbitMQ,消費者在消費消息以前要進行一系列的繁瑣過程。微信
如RabbitMQ發消息以前要建立 Exchange,再建立 Queue,還要將 Queue 和 Exchange 經過某種規則綁定起來,發消息的時候要指定 routingkey,還要控制頭部信息數據結構
可是絕大 多數狀況下,雖然咱們的消息隊列只有一組消費者,但仍是須要經歷上面一些過程。多線程
有了 Redis,對於那些只有一組消費者的消息隊列,使用 Redis 就能夠很是輕鬆的搞定。Redis 的消息隊列不是專業的消息隊列,它沒有很是多的高級特性, 沒有 ack 保證,若是對消息的可靠性有着極致的追求,那麼它就不適合使用併發
異步消息隊列基本實現
Redis 的 list(列表) 數據結構經常使用來做爲異步消息隊列使用,使用 rpush/lpush 操做入隊列, 使用 lpop 和 rpop 來出隊列dom
> rpush queue 月伴飛魚1 月伴飛魚2 月伴飛魚3
(integer) 3
> lpop queue
"月伴飛魚1"
> llen queue
(integer) 2
問題1:若是隊列空了
客戶端是經過隊列的 pop 操做來獲取消息,而後進行處理。處理完了再接着獲取消息, 再進行處理。如此循環往復,這即是做爲隊列消費者的客戶端的生命週期。
但是若是隊列空了,客戶端就會陷入 pop 的死循環,不停地 pop,沒有數據,接着再 pop, 又沒有數據。這就是浪費生命的空輪詢。空輪詢不但拉高了客戶端的 CPU,redis 的 QPS 也 會被拉高,若是這樣空輪詢的客戶端有幾十來個,Redis 的慢查詢可能會顯著增多。
一般咱們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鍾就能夠了。不但客戶端 的 CPU 能降下來,Redis 的 QPS 也降下來了
問題2:隊列延遲
用上面睡眠的辦法能夠解決問題。同時若是隻有 1 個消費者,那麼這個延遲就是 1s。若是有多個消費者,這個延遲會有所降低,因 爲每一個消費者的睡覺時間是岔開來的。
有沒有什麼辦法能顯著下降延遲呢?
那就是 blpop/brpop。
這兩個指令的前綴字符 b 表明的是 blocking,也就是阻塞讀。
阻塞讀在隊列沒有數據的時候,會當即進入休眠狀態,一旦數據到來,則馬上醒過來。消 息的延遲幾乎爲零。用 blpop/brpop 替代前面的 lpop/rpop,就完美解決了上面的問題。
問題3:空閒鏈接自動斷開
其實他還有個問題須要解決—— 空閒鏈接的問題。
若是線程一直阻塞在哪裏,Redis 的客戶端鏈接就成了閒置鏈接,閒置太久,服務器通常 會主動斷開鏈接,減小閒置資源佔用。這個時候 blpop/brpop 會拋出異常來。
因此編寫客戶端消費者的時候要當心,注意捕獲異常,還要重試。
分佈式鎖衝突處理
假如客戶端在處理請求時加分佈式鎖沒加成功怎麼辦。
通常有 3 種策略來處理加鎖失敗:
一、直接拋出異常,通知用戶稍後重試;
二、sleep 一會再重試;
三、將請求轉移至延時隊列,過一會再試;
直接拋出特定類型的異常
這種方式比較適合由用戶直接發起的請求,用戶看到錯誤對話框後,會先閱讀對話框的內 容,再點擊重試,這樣就能夠起到人工延時的效果。若是考慮到用戶體驗,能夠由前端的代碼 替代用戶本身來進行延時重試控制。它本質上是對當前請求的放棄,由用戶決定是否從新發起 新的請求。
sleep
sleep 會阻塞當前的消息處理線程,會致使隊列的後續消息處理出現延遲。若是碰撞的比 較頻繁或者隊列裏消息比較多,sleep 可能並不合適。若是由於個別死鎖的 key 致使加鎖不成 功,線程會完全堵死,致使後續消息永遠得不到及時處理。
延時隊列
這種方式比較適合異步消息處理,將當前衝突的請求扔到另外一個隊列延後處理以避開衝突。
延時隊列的實現
咱們可使用 zset這個命令,用設置好的時間戳做爲score進行排序,使用 zadd score1 value1 ....
命令就能夠一直往內存中生產消息。再利用 zrangebysocre 查詢符合條件的全部待處理的任務,經過循環執行隊列任務便可。也能夠經過 zrangebyscore key min max withscores limit 0 1
查詢最先的一條任務,來進行消費
private Jedis jedis;
public void redisDelayQueueTest() {
String key = "delay_queue";
// 實際開發建議使用業務 ID 和隨機生成的惟一 ID 做爲 value, 隨機生成的惟一 ID 能夠保證消息的惟一性, 業務 ID 能夠避免 value 攜帶的信息過多
String orderId1 = UUID.randomUUID().toString();
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId1);
String orderId12 = UUID.randomUUID().toString();
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId2);
new Thread() {
@Override
public void run() {
while (true) {
Set<String> resultList;
// 只獲取第一條數據, 只獲取不會移除數據
resultList = jedis.zrangebyscore(key, System.currentTimeMillis(), 0, 1);
if (resultList.size() == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
} else {
// 移除數據獲取到的數據
if (jedis.zrem(key, resultList.get(0)) > 0) {
String orderId = resultList.get(0);
log.info("orderId = {}", resultList.get(0));
this.handleMsg(orderId);
}
}
}
}
}.start();
}
public void handleMsg(T msg) {
System.out.println(msg);
}
上面的實現, 在多線程邏輯上也是沒有問題的, 假設有兩個線程 T1, T2和其餘更多線程, 處理邏輯以下, 保證了多線程狀況下只有一個線程處理了對應的消息:
1.T1, T2 和其餘更多線程調用 zrangebyscore 獲取到了一條消息 A
2.T1 準備開始刪除消息 A, 因爲是原子操做, T2 和其餘更多線程等待 T1 執行 zrem 刪除消息 A 後再執行 zrem 刪除消息 A
3.T1 刪除了消息 A, 返回刪除成功標記 1, 並對消息 A 進行處理
4.T2 其餘更多線程開始 zrem 刪除消息 A, 因爲消息 A 已經被刪除, 因此全部的刪除均失敗, 放棄了對消息 A 的處理
同時,咱們要注意必定要對 handle_msg
進行異常捕獲,避免由於個別任務處理問題致使循環異常退 出
進一步優化
上面的算法中同一個任務可能會被多個進程取到以後再使用 zrem 進行爭搶,那些沒搶到 的進程都是白取了一次任務,這是浪費。能夠考慮使用 lua scripting 來優化一下這個邏輯,將 zrangebyscore 和 zrem 一同挪到服務器端進行原子化操做,這樣多個進程之間爭搶任務時就不 會出現這種浪費了
使用調用Lua腳本進一步優化
Lua 腳本, 若是有超時的消息, 就刪除, 並返回這條消息, 不然返回空字符串:
String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1)\n" +
"if #resultArray > 0 then\n" +
" if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +
" return resultArray[1]\n" +
" else\n" +
" return ''\n" +
" end\n" +
"else\n" +
" return ''\n" +
"end";
jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));
Redis延時隊列優點
Redis用來進行實現延時隊列是具備這些優點的:
1.Redis zset支持高性能的 score 排序。
2.Redis是在內存上進行操做的,速度很是快。
3.Redis能夠搭建集羣,當消息不少時候,咱們能夠用集羣來提升消息處理的速度,提升可用性。
4.Redis具備持久化機制,當出現故障的時候,能夠經過AOF和RDB方式來對數據進行恢復,保證了數據的可靠性
Redis延時隊列劣勢
使用 Redis 實現的延時消息隊列也存在數據持久化, 消息可靠性的問題
沒有重試機制 - 處理消息出現異常沒有重試機制, 這些須要本身去實現, 包括重試次數的實現等
沒有 ACK 機制 - 例如在獲取消息並已經刪除了消息狀況下, 正在處理消息的時候客戶端崩潰了, 這條正在處理的這些消息就會丟失, MQ 是須要明確的返回一個值給 MQ 纔會認爲這個消息是被正確的消費了
若是對消息可靠性要求較高, 推薦使用 MQ 來實現
Redission實現延時隊列
基於Redis的Redisson分佈式延遲隊列結構的RDelayedQueue Java對象在實現了RQueue接口的基礎上提供了向隊列按要求延遲添加項目的功能。該功能能夠用來實現消息傳送延遲按幾何增加或幾何衰減的發送策略
RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒鐘之後將消息發送到指定隊列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分鐘之後將消息發送到指定隊列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
在該對象再也不須要的狀況下,應該主動銷燬。僅在相關的Redisson對象也須要關閉的時候能夠不用主動銷燬。
RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();
是否是很方便...............
若是以爲不錯,點個贊再走吧
參考
Redis Lua scripts debugger
Redis 深度歷險:核心原理與應用實踐
掃碼二維碼
獲取更多精彩
月伴飛魚
本文分享自微信公衆號 - 月伴飛魚(gh_c4183eee9eb9)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。