有贊延遲隊列設計

延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。 那麼,是在什麼場景下我才須要這樣的隊列呢?nginx

背景

咱們先看看如下業務場景:git

  • 當訂單一直處於未支付狀態時,如何及時的關閉訂單,並退還庫存?
  • 如何按期檢查處於退款狀態的訂單是否已經退款成功?
  • 新建立店鋪,N天內沒有上傳商品,系統如何知道該信息,併發送激活短信?等等

爲了解決以上問題,最簡單直接的辦法就是定時去掃表。每一個業務都要維護一個本身的掃表邏輯。 當業務愈來愈多時,咱們會發現掃表部分的邏輯會很是相似。咱們能夠考慮將這部分邏輯從具體的業務邏輯裏面抽出來,變成一個公共的部分。
那麼開源界是否已有現成的方案呢?答案是確定的。Beanstalkd(http://kr.github.io/beanstalkd/), 它基本上已經知足以上需求。可是,在刪除消息的時候不是特別方便,須要更多的成本。並且,它是基於C語言開發的,當時咱們團隊主流是PHP和Java,無法作二次開發。因而咱們借鑑了它的設計思路,用Java從新實現了一個延遲隊列。github

設計目標

  • 消息傳輸可靠性:消息進入到延遲隊列後,保證至少被消費一次。
  • Client支持豐富:因爲業務上的需求,至少支持PHP和Python。
  • 高可用性:至少得支持多實例部署。掛掉一個實例後,還有後備實例繼續提供服務。
  • 實時性:容許存在必定的時間偏差。
  • 支持消息刪除:業務使用方,能夠隨時刪除指定消息。

總體結構

整個延遲隊列由4個部分組成:redis

  • Job Pool用來存放全部Job的元信息。
  • Delay Bucket是一組以時間爲維度的有序隊列,用來存放全部須要延遲的/已經被reserve的Job(這裏只存放Job Id)。
  • Timer負責實時掃描各個Bucket,並將delay時間大於等於當前時間的Job放入到對應的Ready Queue。
  • Ready Queue存放處於Ready狀態的Job(這裏只存放Job Id),以供消費程序消費。

以下圖表述:Delay Queue數據庫

設計要點

基本概念

  • Job:須要異步處理的任務,是延遲隊列裏的基本單元。與具體的Topic關聯在一塊兒。
  • Topic:一組相同類型Job的集合(隊列)。供消費者來訂閱。

消息結構

每一個Job必須包含一下幾個屬性:json

  • Topic:Job類型。能夠理解成具體的業務名稱。
  • Id:Job的惟一標識。用來檢索和刪除指定的Job信息。
  • Delay:Job須要延遲的時間。單位:秒。(服務端會將其轉換爲絕對時間)
  • TTR(time-to-run):Job執行超時時間。單位:秒。
  • Body:Job的內容,供消費者作具體的業務處理,以json格式存儲。

具體結構以下圖表示:Job StructTTR的設計目的是爲了保證消息傳輸的可靠性。網絡

消息狀態轉換

每一個Job只會處於某一個狀態下:數據結構

  • ready:可執行狀態,等待消費。
  • delay:不可執行狀態,等待時鐘週期。
  • reserved:已被消費者讀取,但還未獲得消費者的響應(delete、finish)。
  • deleted:已被消費完成或者已被刪除。

下面是四個狀態的轉換示意圖:Job State Flow架構

消息存儲

在選擇存儲介質以前,先來肯定下具體的數據結構:併發

  • Job Poll存放的Job元信息,只須要K/V形式的結構便可。key爲job id,value爲job struct。
  • Delay Bucket是一個有序隊列。
  • Ready Queue是一個普通list或者隊列都行。

可以同時知足以上需求的,非redis莫屬了。
bucket的數據結構就是redis的zset,將其分爲多個bucket是爲了提升掃描速度,下降消息延遲。

通訊協議

爲了知足多語言Client的支持,咱們選擇Http通訊方式,經過文本協議(json)來實現與Client端的交互。 目前支持如下協議:

  • 添加:{‘command’:’add’, ’topic’:’xxx’, ‘id’: ‘xxx’, ‘delay’: 30, ’TTR’: 60, ‘body’:‘xxx'}
  • 獲取:{‘command’:’pop’, ’topic’:’xxx'}
  • 完成:{‘command’:’finish’, ‘id’:’xxx'}
  • 刪除:{‘command’:’delete’, ‘id’:’xxx'}

body也是一個json串。 
Response結構:{’success’:true/false, ‘error’:’error reason’, ‘id’:’xxx’, ‘value’:’job body'} 
強調一下:job id是由業務使用方決定的,必定要保證全局惟一性。這裏建議採用topic+業務惟一id的組合。

舉例說明一個Job的生命週期

  • 用戶對某個商品下單,系統建立訂單成功,同時往延遲隊列裏put一個job。job結構爲:{‘topic':'orderclose’, ‘id':'ordercloseorderNoXXX’, ‘delay’:1800 ,’TTR':60 , ‘body':’XXXXXXX’}
  • 延遲隊列收到該job後,先往job pool中存入job信息,而後根據delay計算出絕對執行時間,並以輪詢(round-robbin)的方式將job id放入某個bucket。
  • timer每時每刻都在輪詢各個bucket,當1800秒(30分鐘)事後,檢查到上面的job的執行時間到了,取得job id從job pool中獲取元信息。若是這時該job處於deleted狀態,則pass,繼續作輪詢;若是job處於非deleted狀態,首先再次確認元信息中delay是否大於等於當前時間,若是知足則根據topic將job id放入對應的ready queue,而後從bucket中移除;若是不知足則從新計算delay時間,再次放入bucket,並將以前的job id從bucket中移除。
  • 消費端輪詢對應的topic的ready queue(這裏仍然要判斷該job的合理性),獲取job後作本身的業務邏輯。與此同時,服務端將已經被消費端獲取的job按照其設定的TTR,從新計算執行時間,並將其放入bucket。
  • 消費端處理完業務後向服務端響應finish,服務端根據job id刪除對應的元信息。

現有物理拓撲

deploy目前採用的是集中存儲機制,在多實例部署時Timer程序可能會併發執行,致使job被重複放入ready queue。爲了解決這個問題,咱們使用了redis的setnx命令實現了簡單的分佈式鎖,以保證每一個bucket每次只有一個timer thread來掃描。

設計不足的地方

timer是經過獨立線程的無限循環來實現,在沒有ready job的時候會對CPU形成必定的浪費。 
消費端在reserve job的時候,採用的是http短輪詢的方式,且每次只能取的一個job。若是ready job較多的時候會加大網絡I/O的消耗。
數據存儲使用的redis,消息在持久化上受限於redis的特性。
scale-out的時候依賴第三方(nginx)。

將來架構方向

基於wait/notify方式的Timer實現。提供TCP長連的API,實現push或者long-polling的消息reserve方法。擁有本身的存儲方案(內嵌數據庫、自定義數據結構寫文件),確保消息的持久化。實現本身的name-server。考慮提供週期性任務的直接支持。

相關文章
相關標籤/搜索