延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。 那麼,是在什麼場景下我才須要這樣的隊列呢?nginx
背景
咱們先看看如下業務場景:git
- 當訂單一直處於未支付狀態時,如何及時的關閉訂單,並退還庫存?
- 如何按期檢查處於退款狀態的訂單是否已經退款成功?
- 新建立店鋪,N天內沒有上傳商品,系統如何知道該信息,併發送激活短信?等等
爲了解決以上問題,最簡單直接的辦法就是定時去掃表。每一個業務都要維護一個本身的掃表邏輯。 當業務愈來愈多時,咱們會發現掃表部分的邏輯會很是相似。咱們能夠考慮將這部分邏輯從具體的業務邏輯裏面抽出來,變成一個公共的部分。
那麼開源界是否已有現成的方案呢?答案是確定的。Beanstalkd(http://kr.github.io/beanstalkd/), 它基本上已經知足以上需求。可是,在刪除消息的時候不是特別方便,須要更多的成本。並且,它是基於C語言開發的,當時咱們團隊主流是PHP和Java,無法作二次開發。因而咱們借鑑了它的設計思路,用Java從新實現了一個延遲隊列。github
設計目標
- 消息傳輸可靠性:消息進入到延遲隊列後,保證至少被消費一次。
- Client支持豐富:因爲業務上的需求,至少支持PHP和Python。
- 高可用性:至少得支持多實例部署。掛掉一個實例後,還有後備實例繼續提供服務。
- 實時性:容許存在必定的時間偏差。
- 支持消息刪除:業務使用方,能夠隨時刪除指定消息。
總體結構
整個延遲隊列由4個部分組成:redis
- Job Pool用來存放全部Job的元信息。
- Delay Bucket是一組以時間爲維度的有序隊列,用來存放全部須要延遲的/已經被reserve的Job(這裏只存放Job Id)。
- Timer負責實時掃描各個Bucket,並將delay時間大於等於當前時間的Job放入到對應的Ready Queue。
- Ready Queue存放處於Ready狀態的Job(這裏只存放Job Id),以供消費程序消費。
以下圖表述:數據庫
設計要點
基本概念
- Job:須要異步處理的任務,是延遲隊列裏的基本單元。與具體的Topic關聯在一塊兒。
- Topic:一組相同類型Job的集合(隊列)。供消費者來訂閱。
消息結構
每一個Job必須包含一下幾個屬性:json
- Topic:Job類型。能夠理解成具體的業務名稱。
- Id:Job的惟一標識。用來檢索和刪除指定的Job信息。
- Delay:Job須要延遲的時間。單位:秒。(服務端會將其轉換爲絕對時間)
- TTR(time-to-run):Job執行超時時間。單位:秒。
- Body:Job的內容,供消費者作具體的業務處理,以json格式存儲。
具體結構以下圖表示:TTR的設計目的是爲了保證消息傳輸的可靠性。網絡
消息狀態轉換
每一個Job只會處於某一個狀態下:數據結構
- ready:可執行狀態,等待消費。
- delay:不可執行狀態,等待時鐘週期。
- reserved:已被消費者讀取,但還未獲得消費者的響應(delete、finish)。
- deleted:已被消費完成或者已被刪除。
下面是四個狀態的轉換示意圖:架構
消息存儲
在選擇存儲介質以前,先來肯定下具體的數據結構:併發
- Job Poll存放的Job元信息,只須要K/V形式的結構便可。key爲job id,value爲job struct。
- Delay Bucket是一個有序隊列。
- Ready Queue是一個普通list或者隊列都行。
可以同時知足以上需求的,非redis莫屬了。
bucket的數據結構就是redis的zset,將其分爲多個bucket是爲了提升掃描速度,下降消息延遲。
通訊協議
爲了知足多語言Client的支持,咱們選擇Http通訊方式,經過文本協議(json)來實現與Client端的交互。 目前支持如下協議:
- 添加:{‘command’:’add’, ’topic’:’xxx’, ‘id’: ‘xxx’, ‘delay’: 30, ’TTR’: 60, ‘body’:‘xxx'}
- 獲取:{‘command’:’pop’, ’topic’:’xxx'}
- 完成:{‘command’:’finish’, ‘id’:’xxx'}
- 刪除:{‘command’:’delete’, ‘id’:’xxx'}
body也是一個json串。
Response結構:{’success’:true/false, ‘error’:’error reason’, ‘id’:’xxx’, ‘value’:’job body'}
強調一下:job id是由業務使用方決定的,必定要保證全局惟一性。這裏建議採用topic+業務惟一id的組合。
舉例說明一個Job的生命週期
- 用戶對某個商品下單,系統建立訂單成功,同時往延遲隊列裏put一個job。job結構爲:{‘topic':'orderclose’, ‘id':'ordercloseorderNoXXX’, ‘delay’:1800 ,’TTR':60 , ‘body':’XXXXXXX’}
- 延遲隊列收到該job後,先往job pool中存入job信息,而後根據delay計算出絕對執行時間,並以輪詢(round-robbin)的方式將job id放入某個bucket。
- timer每時每刻都在輪詢各個bucket,當1800秒(30分鐘)事後,檢查到上面的job的執行時間到了,取得job id從job pool中獲取元信息。若是這時該job處於deleted狀態,則pass,繼續作輪詢;若是job處於非deleted狀態,首先再次確認元信息中delay是否大於等於當前時間,若是知足則根據topic將job id放入對應的ready queue,而後從bucket中移除;若是不知足則從新計算delay時間,再次放入bucket,並將以前的job id從bucket中移除。
- 消費端輪詢對應的topic的ready queue(這裏仍然要判斷該job的合理性),獲取job後作本身的業務邏輯。與此同時,服務端將已經被消費端獲取的job按照其設定的TTR,從新計算執行時間,並將其放入bucket。
- 消費端處理完業務後向服務端響應finish,服務端根據job id刪除對應的元信息。
現有物理拓撲
目前採用的是集中存儲機制,在多實例部署時Timer程序可能會併發執行,致使job被重複放入ready queue。爲了解決這個問題,咱們使用了redis的setnx命令實現了簡單的分佈式鎖,以保證每一個bucket每次只有一個timer thread來掃描。
設計不足的地方
timer是經過獨立線程的無限循環來實現,在沒有ready job的時候會對CPU形成必定的浪費。
消費端在reserve job的時候,採用的是http短輪詢的方式,且每次只能取的一個job。若是ready job較多的時候會加大網絡I/O的消耗。
數據存儲使用的redis,消息在持久化上受限於redis的特性。
scale-out的時候依賴第三方(nginx)。
將來架構方向
基於wait/notify方式的Timer實現。提供TCP長連的API,實現push或者long-polling的消息reserve方法。擁有本身的存儲方案(內嵌數據庫、自定義數據結構寫文件),確保消息的持久化。實現本身的name-server。考慮提供週期性任務的直接支持。