上篇文章介紹了RocketMQ總體架構和原理有興趣的能夠閱讀一下,在這篇文章中的延時消息部分,我寫道開源版的RocketMQ只提供了18個層級的消息隊列延時,這個功能在開源版中顯得特別雞肋,可是在阿里雲中的RocketMQ卻提供了支持40天以內任意秒級延時隊列,果真有些功能你只能充錢才能擁有。固然你或許想換一個開源的消息隊列,在開源社區中消息隊列延時消息不少都沒有被支持好比:RabbitMQ,Kafka等,都只能經過一些特殊方法才能完成延時的功能。爲何這麼多都沒有實現這個功能呢?是由於技術難度比較複雜嗎?接下來咱們分析一下如何才能實現一個延時消息。html
在實現分佈式消息隊列的延時消息以前,咱們想一想咱們平時是如何在本身的應用程序上實現一些延時功能的?在Java中能夠經過下面的方式來完成咱們延時功能:mysql
ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,咱們提交任務的時候,會將任務首先提交到DelayedWorkQueue一個優先級隊列中,按照過時時間進行排序,這個優先級隊列也就是咱們堆結構,每次提交任務排序的複雜度是O(logN)。而後取任務的時候就會從堆頂取出咱們的任務,也就是咱們延遲時間最小的任務。ScheduledThreadPoolExecutor有個好處是執行延時任務能夠支持多線程並行執行,由於他繼承的是ThreadPoolExecutor。redis
Timer:Timer也是利用優先級隊列結構作的,可是其沒有繼承線程池,相對來講比較獨立,不支持多線程,只能使用單獨的一個線程。算法
咱們實現本地延時比較簡單,直接使用Java中現成的便可,那咱們分佈式消息隊列的實現有哪些難點呢?sql
有不少同窗首先會想到咱們實現分佈式消息隊列的延時任務,可不能夠直接使用本地的那一套,用ScheduledThreadPoolExecutor,Timer,固然這是能夠的,前提是你的消息量很小,可是咱們分佈式消息隊列每每都是企業級別的中間件,數據量都是很是的大,那麼咱們純內存的方案確定是行不通的。因此咱們就有了下面這幾個方案來解決咱們這個問題。數據庫
數據庫通常來講是咱們很容易想到的一個辦法,咱們一般能夠創建下面這樣一個表:數組
CREATE TABLE `delay_message` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `excute_time` bigint(16) DEFAULT NULL COMMENT '執行時間,ms級別', `body` varchar(4096) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '消息體', PRIMARY KEY (`id`), KEY `time_index` (`excute_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
這個表中咱們使用excute_time表明咱們真實的執行時間,而且對其創建索引,而後在咱們的消息服務中,啓動一個定時任務,定時從數據庫中掃描已經能夠執行的消息,而後開始執行,具體流程以下面所示:緩存
使用數據庫的方法是一個比較原始的方法,在沒有延時消息這個概念以前,要作一個訂單多少分鐘過時的這種功能,一般使用這個方法去完成。而這個方法一般也比較侷限於咱們單個業務,若是想擴展爲咱們企業級的一箇中間件的話是不行的,由於mysql因爲BTree的特性,會隨着維護二級索引的開銷愈來愈大,致使寫入會愈來愈慢,因此這個方案一般不會被考慮。數據結構
咱們以前介紹RocketMQ在開源版本中只實現了18個Level的延時消息,可是有不少公司基於RocketMQ作了本身的一套支持任意時間的延時消息,在美團內部封裝了RocketMQ使用LevelDB作了對延時消息的封裝,在滴滴開源的DDMQ中,使用了RocksDB對RocketMQ的延時消息部分進行了封裝。多線程
其原理基本和Mysql相似,以下圖所示:
爲何一樣是數據庫RocksDB會比Mysql更加合適呢?由於RocksDB的特性是LSM樹,其使用場景適用於大量寫入,和消息隊列的場景更加契合,因此這個也是滴滴和美團選擇其做爲延時消息封裝的存儲介質。
再說時間輪以前,讓咱們再次回到咱們的實現本地延時的時候使用的ScheduledThreadPoolExecutor還有Timer,他們都是使用的優先級隊列完成的,優先級隊列本質上也就是堆結構,堆結構的插入的時間複雜度是O(LogN),若是將來咱們的內存能夠作到無限,咱們使用使用優先級隊列去作延時消息的存儲,可是隨着消息的增多,咱們的插入消息的效率也會愈來愈低,那麼怎麼才能讓咱們的插入消息的效率不隨着消息的增多而變低呢?答案就是時間輪。
什麼是時間輪呢?其實咱們能夠簡單的將其看作是一個多維數組。在不少框架中都使用了時間輪來作一些定時的任務,用來替代咱們的Timer,好比我以前講過的有關本地緩存Caffeine一篇文章,在Caffeine中是一個二層時間輪,也就是二維數組,其一維的數據表示較大的時間維度好比,秒,分,時,天等,其二維的數據表示該時間維度較小的時間維度,好比秒內的某個區間段。當定位到一個TimeWhile[i][j]以後,其數據結構實際上是一個鏈表,記錄着咱們的Node。在Caffeine利用時間輪記錄咱們在某個時間過時的數據,而後去處理。
因爲時間輪是一個數組的結構,那麼其插入複雜度是O(1)。咱們解決了效率以後,可是咱們的內存依舊不是無限的,咱們時間輪如何使用呢?答案固然就是磁盤,在去哪兒開源的QMQ中已經實現了時間輪+磁盤存儲,這裏爲了方便描述我將其轉化爲RocketMQ中的結構來進行講解,實現圖以下:
時間輪+磁盤存儲我我的以爲比上面的RocksDB要更加正統一點,不依賴其餘的中間件就能夠完成,可用性天然也就更高,固然阿里雲的RocketMQ具體怎麼實現的這個兩種方案都有可能。
在社區中也有不少公司使用的Redis作的延時消息,在Redis中有一個數據結構是Zest,也就是有序集合,他能夠實現相似咱們的優先級隊列的功能,一樣的他也是堆結構,因此插入算法複雜度依然是O(logN),可是因爲Redis足夠快,因此這一塊能夠忽略。(這塊沒有作對比的基準測試,只是猜想)。有同窗會問,redis不是純內存的k,v嗎,一樣的應該也會受到內存限制啊,爲何還會選擇他呢?
其實在這個場景中,Redis是很容易水平擴展的當一個Redis內存不夠,這裏可使用兩個甚至更多,來知足咱們的須要,redis延時消息的原理圖(原圖出自:http://www.javashuo.com/article/p-pxsavpsj-ec.html)以下:
咱們怎麼才能知道Delayed Queue中的消息到期了呢?這裏有兩種方法:
本文介紹了三種方式實現分佈式延時消息,但願能在你實現本身的延遲消息的時候提供一點思路。總的來講可能前兩種方法來講適用面更加廣一點,畢竟在RocketMQ這些大型的消息隊列中間件,還有一些其餘的集成功能,好比順序消息,事務消息等,延時消息可能更加傾向因而分佈式消息隊列中的一個功能,而不是做爲一個獨立的組件存在。固然其中還有一些細節並無一一介紹,具體細節能夠去參考QMQ和DDMQ的源碼。
若是你們以爲這篇文章對你有幫助,你的關注和轉發是對我最大的支持,O(∩_∩)O: