在業務發展過程當中,會出現一些須要延時處理的場景,好比:java
a.訂單下單以後超過30分鐘用戶未支付,須要取消訂單
b.訂單一些評論,若是48h用戶未對商家評論,系統會自動產生一條默認評論
c.點我達訂單下單後,超過必定時間訂單未派出,須要超時取消訂單等。。。
處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在數據量不大的場景下是徹底沒問題,可是當數據量大的時候高頻的輪訓數據庫就會比較的耗資源,致使數據庫的慢查或者查詢超時。因此在處理這類需求時候,採用了延時隊列來完成。數據庫
延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列:
1.Java中java.util.concurrent.DelayQueue
優勢:JDK自身實現,使用方便,量小適用
缺點:隊列消息處於jvm內存,不支持分佈式運行和消息持久化
2.Rocketmq延時隊列
優勢:消息持久化,分佈式
缺點:不支持任意時間精度,只支持特定level的延時消息
3.Rabbitmq延時隊列(TTL+DLX實現)
優勢:消息持久化,分佈式
缺點:延時相同的消息必須扔在同一個隊列jvm
根據自身業務和公司狀況,若是實現一個本身的延時隊列服務須要考慮一下幾點:分佈式
* 消息存儲
* 過時延時消息實時獲取
* 高可用性優化
* 消息可靠性,消息持久化,消息至少被消費一次
* 實時性:存在必定的時間偏差(定時任務間隔)
* 支持指定消息remove
* 高可用性線程
- Messages Pool全部的延時消息存放,結構爲KV結構,key爲消息ID,value爲一個具體的message(這裏選擇Redis Hash結構主要是由於hash結構能存儲較大的數據量,數據較多時候會進行漸進式rehash擴容,而且對於HSET和HGET命令來講時間複雜度都是O(1))
- Delayed Queue是16個有序隊列(隊列支持水平擴展),結構爲ZSET,value爲messages pool中消息ID,score爲過時時間(分爲多個隊列是爲了提升掃描的速度)
- Timed Task定時任務,負責掃描處理每一個隊列過時消息blog
每一個延時消息必須包括如下參數:排序
* tags:消息過時以後發送mq的tags
* keys:消息過時以後發送mq的keys
* body:消息過時以後發送mq的body,提供給消費這作具體的消息處理
* delayTime:延時發送時間(默認,delayTime、expectDate有一個便可)
* expectDate:指望發送時間隊列
注:上圖一、二、3或者二、3是一個事務操做
取出過時消息過程是經過一個外部定時任務每隔1min分鐘去查詢隊列中過時的消息,而後發送mq && remove事務
1.0上有一個可改進的地方就是隊列中過時的消息是經過定時任務觸發查詢。全部有了2.0
2.0版本在1.0上作了一個優化,廢棄掉了1min定時任務觸發過時消息發送,採用了java Lock await/singlal方式實現過時消息的實時發送低延時
- pull job:這裏分別爲每個隊列建立了一個pull job thread,功能很簡單,就是負責去隊列中拉取過時的消息數據(這裏保證一個隊列有且只有一個pull job)
- worker:pull job拉取到的過時消息會交給一個worker thread去處理,這樣的好處是處理過時的消息實時性更高(pull job沒必要等去除過時消息所有處理完成在繼續去拉取新的過時數據)
- zookeeper coordinate:經過zk的操做來完成對隊列的從新分配工做,daemon thread監聽zk節點的建立和刪除
主要流程:
服務啓動會註冊zk,獲取分配處理的queues,啓動後臺線程監聽zk
爲每一個分配queue建立一個pull job
pull job首先會去queue中查詢是否有過時消息:
Y:將取出消息交給worker處理
N:查詢queue中最後一個成員(zset結構默認按score遞增排序),若是爲空,則await;不爲空則await(成員score-System.currentTimeMillis())
因爲過時消息發送成功纔會從隊列中remove,因此pull job會記錄上一次查詢隊列的一個offset,每次獲取到過時消息會將offset向前偏移,過時消息交給worker處理,當worker因爲某些異常緣由處理失敗會重置pull job中offset,這樣能夠避免消息發送一次失敗以後沒辦法在繼續處理(除了新節點add || remove時候)當部署服務有新增,延時隊列服務會從新計算獲得當前處理隊列,並將以前建立pull job cancel,爲新處理隊列從新建立pull job。刪除同理。</ol>