若是有人再問你怎麼實現分佈式延時消息,這篇文章丟給他

1.背景

上篇文章介紹了RocketMQ總體架構和原理有興趣的能夠閱讀一下,在這篇文章中的延時消息部分,我寫道開源版的RocketMQ只提供了18個層級的消息隊列延時,這個功能在開源版中顯得特別雞肋,可是在阿里雲中的RocketMQ卻提供了支持40天以內任意秒級延時隊列,果真有些功能你只能充錢才能擁有。固然你或許想換一個開源的消息隊列,在開源社區中消息隊列延時消息不少都沒有被支持好比:RabbitMQ,Kafka等,都只能經過一些特殊方法才能完成延時的功能。爲何這麼多都沒有實現這個功能呢?是由於技術難度比較複雜嗎?接下來咱們分析一下如何才能實現一個延時消息。html

2.本地延時

在實現分佈式消息隊列的延時消息以前,咱們想一想咱們平時是如何在本身的應用程序上實現一些延時功能的?在Java中能夠經過下面的方式來完成咱們延時功能:mysql

  • ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,咱們提交任務的時候,會將任務首先提交到DelayedWorkQueue一個優先級隊列中,按照過時時間進行排序,這個優先級隊列也就是咱們堆結構,每次提交任務排序的複雜度是O(logN)。而後取任務的時候就會從堆頂取出咱們的任務,也就是咱們延遲時間最小的任務。ScheduledThreadPoolExecutor有個好處是執行延時任務能夠支持多線程並行執行,由於他繼承的是ThreadPoolExecutor。redis

  • Timer:Timer也是利用優先級隊列結構作的,可是其沒有繼承線程池,相對來講比較獨立,不支持多線程,只能使用單獨的一個線程。算法

3.分佈式消息隊列延時

咱們實現本地延時比較簡單,直接使用Java中現成的便可,那咱們分佈式消息隊列的實現有哪些難點呢?sql

有不少同窗首先會想到咱們實現分佈式消息隊列的延時任務,可不能夠直接使用本地的那一套,用ScheduledThreadPoolExecutor,Timer,固然這是能夠的,前提是你的消息量很小,可是咱們分佈式消息隊列每每都是企業級別的中間件,數據量都是很是的大,那麼咱們純內存的方案確定是行不通的。因此咱們就有了下面這幾個方案來解決咱們這個問題。數據庫

3.1 數據庫

數據庫通常來講是咱們很容易想到的一個辦法,咱們一般能夠創建下面這樣一個表:數組

CREATE TABLE `delay_message` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `excute_time` bigint(16) DEFAULT NULL COMMENT '執行時間,ms級別',
  `body` varchar(4096) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '消息體',
  PRIMARY KEY (`id`),
  KEY `time_index` (`excute_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

這個表中咱們使用excute_time表明咱們真實的執行時間,而且對其創建索引,而後在咱們的消息服務中,啓動一個定時任務,定時從數據庫中掃描已經能夠執行的消息,而後開始執行,具體流程以下面所示:緩存

使用數據庫的方法是一個比較原始的方法,在沒有延時消息這個概念以前,要作一個訂單多少分鐘過時的這種功能,一般使用這個方法去完成。而這個方法一般也比較侷限於咱們單個業務,若是想擴展爲咱們企業級的一箇中間件的話是不行的,由於mysql因爲BTree的特性,會隨着維護二級索引的開銷愈來愈大,致使寫入會愈來愈慢,因此這個方案一般不會被考慮。數據結構

RocksDB/LevelDB

咱們以前介紹RocketMQ在開源版本中只實現了18個Level的延時消息,可是有不少公司基於RocketMQ作了本身的一套支持任意時間的延時消息,在美團內部封裝了RocketMQ使用LevelDB作了對延時消息的封裝,在滴滴開源的DDMQ中,使用了RocksDB對RocketMQ的延時消息部分進行了封裝。多線程

其原理基本和Mysql相似,以下圖所示:

  • Step1: DDMQ發送消息的時候會有一個代理層,用於將消息作分發,由於其內部有多種消息隊列,kafka,rocketMQ等等,若是是延時消息會將消息發送到RockesDB的存儲。
  • Step2: 經過定時任務輪訓掃描將數據轉發投遞至RocketMQ集羣。
  • Step3: 消費者進行消費。

爲何一樣是數據庫RocksDB會比Mysql更加合適呢?由於RocksDB的特性是LSM樹,其使用場景適用於大量寫入,和消息隊列的場景更加契合,因此這個也是滴滴和美團選擇其做爲延時消息封裝的存儲介質。

3.2 時間輪+磁盤存儲

再說時間輪以前,讓咱們再次回到咱們的實現本地延時的時候使用的ScheduledThreadPoolExecutor還有Timer,他們都是使用的優先級隊列完成的,優先級隊列本質上也就是堆結構,堆結構的插入的時間複雜度是O(LogN),若是將來咱們的內存能夠作到無限,咱們使用使用優先級隊列去作延時消息的存儲,可是隨着消息的增多,咱們的插入消息的效率也會愈來愈低,那麼怎麼才能讓咱們的插入消息的效率不隨着消息的增多而變低呢?答案就是時間輪。

什麼是時間輪呢?其實咱們能夠簡單的將其看作是一個多維數組。在不少框架中都使用了時間輪來作一些定時的任務,用來替代咱們的Timer,好比我以前講過的有關本地緩存Caffeine一篇文章,在Caffeine中是一個二層時間輪,也就是二維數組,其一維的數據表示較大的時間維度好比,秒,分,時,天等,其二維的數據表示該時間維度較小的時間維度,好比秒內的某個區間段。當定位到一個TimeWhile[i][j]以後,其數據結構實際上是一個鏈表,記錄着咱們的Node。在Caffeine利用時間輪記錄咱們在某個時間過時的數據,而後去處理。

因爲時間輪是一個數組的結構,那麼其插入複雜度是O(1)。咱們解決了效率以後,可是咱們的內存依舊不是無限的,咱們時間輪如何使用呢?答案固然就是磁盤,在去哪兒開源的QMQ中已經實現了時間輪+磁盤存儲,這裏爲了方便描述我將其轉化爲RocketMQ中的結構來進行講解,實現圖以下:

  • Step 1: 生產者投遞延時消息到CommitLog,這個時候使用了偷換Topic的那招,來達到後面的效果。
  • Step 2: 後臺有一個Reput的任務定時拉取,延時Topic相關的Message。
  • Step 3: 判斷這個Message是否在當前時間輪範圍中,若是不在則來到Step4,若是在的話就直接將消息投遞進入時間輪。
  • Step 4: 找到當前消息所屬的scheduleLog,而後寫入進去,去哪兒默認劃分是一個小時爲一段,這裏能夠根據業務自行調整。
  • Step 5:時間輪會定時預加載下個時間段的scheduleLog到內存。
  • Step 6: 到點的消息會還原topic再次投遞到CommitLog,若是投遞成功這裏會記錄dispatchLog。記錄的緣由是由於時間輪是內存的,你不知道已經執行到哪一個位置了,若是執行到最後最後1s鐘的時候掛了,這段時間輪以前的全部數據又得從新加載,這裏是用來過濾已經投遞過的消息。

時間輪+磁盤存儲我我的以爲比上面的RocksDB要更加正統一點,不依賴其餘的中間件就能夠完成,可用性天然也就更高,固然阿里雲的RocketMQ具體怎麼實現的這個兩種方案都有可能。

3.3 redis

在社區中也有不少公司使用的Redis作的延時消息,在Redis中有一個數據結構是Zest,也就是有序集合,他能夠實現相似咱們的優先級隊列的功能,一樣的他也是堆結構,因此插入算法複雜度依然是O(logN),可是因爲Redis足夠快,因此這一塊能夠忽略。(這塊沒有作對比的基準測試,只是猜想)。有同窗會問,redis不是純內存的k,v嗎,一樣的應該也會受到內存限制啊,爲何還會選擇他呢?

其實在這個場景中,Redis是很容易水平擴展的當一個Redis內存不夠,這裏可使用兩個甚至更多,來知足咱們的須要,redis延時消息的原理圖(原圖出自:http://www.javashuo.com/article/p-pxsavpsj-ec.html)以下:

  • Delayed Messages Pool: Redis Hash結構,key爲消息ID,value爲具體的message,固然這裏也能夠用磁盤或者數據庫代替。這裏主要存儲咱們全部消息的內容。
  • Delayed Queue: ZSET數據結構,value爲消息ID,score爲執行時間,這裏Delayed Queue能夠水平擴展從而增長咱們能夠支持的數據量。
  • Worker Thread Pool: 其中有多個Worker,能夠部署在多個機器上造成一個集羣,集羣中的全部Worker經過ZK進行協調,分配Delayed Queue。

咱們怎麼才能知道Delayed Queue中的消息到期了呢?這裏有兩種方法:

  • 每一個Worker定時掃描,ZSET的最小執行時間,若是到了就取出,這個方法在消息少的時候特別浪費資源,在消息量多的時候,因爲輪訓不及時致使延時的時間不許確。
  • 由於第一個方法問題比較多,因此這裏借鑑了Timer中的一些思想,經過wait-notify能夠達到一個比較好的延時效果,而且資源也不會浪費,第一次的時候仍是獲取ZSET中最小的時間,而後wait(執行時間-當前時間),這樣就不須要浪費資源到達時間時會自動響應,若是當前ZSET有新的消息進入,而且比咱們等待的消息還要小,那麼直接notify喚醒,從新獲取這個更小的消息,而後又wait,如此循環。

總結

本文介紹了三種方式實現分佈式延時消息,但願能在你實現本身的延遲消息的時候提供一點思路。總的來講可能前兩種方法來講適用面更加廣一點,畢竟在RocketMQ這些大型的消息隊列中間件,還有一些其餘的集成功能,好比順序消息,事務消息等,延時消息可能更加傾向因而分佈式消息隊列中的一個功能,而不是做爲一個獨立的組件存在。固然其中還有一些細節並無一一介紹,具體細節能夠去參考QMQ和DDMQ的源碼。

若是你們以爲這篇文章對你有幫助,你的關注和轉發是對我最大的支持,O(∩_∩)O:

相關文章
相關標籤/搜索