本文檔旨在描述RocketMQ的多個關鍵特性的實現原理,並對消息中間件遇到的各類問題進行總結,闡述RocketMQ如何解決這些問題。文中主要引用了JMS規範與CORBA Notification規範,規範爲咱們設計系統指明瞭方向,可是仍有很多問題規範沒有說起,對於消息中間件又相當重要。RocketMQ並不遵循任何規範,可是參考了各類規範與同類產品的設計思想。前端
大約經歷了三個主要版本迭代
1、Metaq(Metamorphosis)1.x
由開源社區killme2008維護,開源社區很是活躍。
https://github.com/killme2008/Metamorphosis
2、Metaq 2.x
於2012年10月份上線,在淘寶內部被普遍使用。
3、RocketMQ 3.x
基於公司內部開源共建原則,RocketMQ項目只維護核心功能,且去除了全部其餘運行時依賴,核心功能最簡化。 每一個BU的個性化需求都在RocketMQ項目之上進行深度定製。RocketMQ向其餘BU提供的僅僅是Jar包,例如要定製一個Broker,那麼只須要依賴rocketmq-broker這個jar包便可,可經過API進行交互,若是定製client,則依賴rocketmq-client這個jar包,對其提供的api進行再封裝。開源社區地址:
https://github.com/alibaba/RocketMQ在RocketMQ項目基礎上衍生的項目以下java
本節闡述消息中間件一般須要解決哪些問題,在解決這些問題當中會遇到什麼困難,RocketMQ是否能夠解決,規範中如何定義這些問題。linux
發佈訂閱是消息中間件的最基本功能,也是相對於傳統RPC通訊而言。在此再也不詳述。git
規範中描述的優先級是指在一個消息隊列中,每條消息都有不一樣的優先級,通常用整數來描述,優先級高的消息先投遞,若是消息徹底在一個內存隊列中,那麼在投遞前能夠按照優先級排序,令優先級高的先投遞。
因爲RocketMQ全部消息都是持久化的,因此若是按照優先級來排序,開銷會很是大,所以RocketMQ沒有特地支持消息優先級,可是能夠經過變通的方式實現相似功能, 即單獨配置一個優先級高的隊列,和一個普通優先級的隊列,將不一樣優先級發送到不一樣隊列便可。
對於優先級問題,能夠概括爲2類
1)只要達到優先級目的便可,不是嚴格意義上的優先級,一般將優先級劃分爲高、中、低,或者再多幾個級別。每一個優先級能夠用不一樣的topic表示,發消息時,指定不一樣的topic來表示優先級,這種方式能夠解決絕大部分的優先級問題,可是對業務的優先級精確性作了妥協。
2)嚴格的優先級,優先級用整數表示,例如0 ~ 65535,這種優先級問題通常使用不一樣topic解決就很是不合適。若是要讓MQ解決此問題,會對MQ的性能形成很是大的影響。這裏要確保一點,業務上是否確實須要這種嚴格的優先級,若是將優先級壓縮成幾個,對業務的影響有多大?github
消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了3條消息,分別是訂單建立,訂單付款,訂單完成。消費時,要按照這個順序消費纔能有意義。可是同時訂單之間是能夠並行消費的。
RocketMQ能夠嚴格的保證消息有序。原理是?web
消息中間件一般採用的幾種持久化方式:
(1).持久化到數據庫,例如Mysql。
(2).持久化到KV存儲,例如levelDB、伯克利DB等KV存儲系統。
(3). 文件記錄形式持久化,例如Kafka,RocketMQ
(4).對內存數據作一個持久化鏡像,例如beanstalkd,VisiNotify
(1)、(2)、(3)三種持久化方式都具備將內存隊列Buffer進行擴展的能力,(4)只是一個內存的鏡像,做用是當Broker掛掉重啓後仍然能將以前內存的數據恢復出來。
JMS與CORBA Notification規範沒有明確說明如何持久化,可是持久化部分的性能直接決定了整個消息中間件的性能。
RocketMQ參考了Kafka的持久化方式,充分利用Linux文件系統內存cache來提升性能。算法
影響消息可靠性的幾種狀況:
(1).Broker正常關閉
(2).Broker異常Crash
(3).OS Crash
(4).機器掉電,可是能當即恢復供電狀況。
(5).機器沒法開機(多是cpu、主板、內存等關鍵設備損壞)
(6).磁盤設備損壞。
(1)、(2)、(3)、(4)四種狀況都屬於硬件資源可當即恢復狀況,RocketMQ在這四種狀況下能保證消息不丟,或者丟失少許數據(依賴刷盤方式是同步仍是異步)。
(5)、(6)屬於單點故障,且沒法恢復,一旦發生,在此單點上的消息所有丟失。RocketMQ在這兩種狀況下,經過異步複製,可保證99%的消息不丟,可是仍然會有極少許的消息可能丟失。經過同步雙寫技術能夠徹底避免單點,同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與Money相關的應用。
RocketMQ從3.0版本開始支持同步雙寫。sql
在消息不堆積狀況下,消息到達Broker後,能馬上到達Consumer。
RocketMQ使用長輪詢Pull方式,可保證消息很是實時,消息實時性不低於Push。數據庫
是指每一個消息必須投遞一次
RocketMQ Consumer 先 pull 消息到本地,消費完成後,才向服務器返回 ack,若是沒有消費必定不會 ack 消息, 因此 RocketMQ 能夠很好的支持此特性。編程
(1). 發送消息階段,不容許發送重複的消息。
(2). 消費消息階段,不容許消費重複的消息。
只有以上兩個條件都知足狀況下,才能認爲消息是「Exactly Only Once」,而要實現以上兩點,在分佈式系統環 境下,不可避免要產生巨大的開銷。因此 RocketMQ 爲了追求高性能,並不保證此特性,要求在業務上進行去重, 也就是說消費消息要作到冪等性。RocketMQ 雖然不能嚴格保證不重複,可是正常狀況下不多會出現重複發送、消費狀況,只有網絡異常,Consumer 啓停等異常狀況下會出現消息重複。
此問題的本質緣由是網絡調用存在不肯定性,即既不成功也不失敗的第三種狀態,因此才產生了消息重複性問題。
Broker 的 Buffer 一般指的是 Broker 中一個隊列的內存 Buffer 大小,這類 Buffer 一般大小有限,若是 Buffer 滿 了之後怎麼辦?
下面是 CORBA Notification 規範中處理方式:
(1). RejectNewEvents
拒絕新來的消息,向 Producer 返回 RejectNewEvents 錯誤碼。
(2). 按照特定策略丟棄已有消息
a) AnyOrder - Any event may be discarded on overflow. This is the default setting for this property.
b) FifoOrder - The first event received will be the first discarded.
c) LifoOrder - The last event received will be the first discarded.
d) PriorityOrder - Events should be discarded in priority order, such that lower priority
RocketMQ 沒有內存 Buffer 概念,RocketMQ 的隊列都是持久化磁盤,數據按期清除。
對於此問題的解決思路,RocketMQ 同其餘 MQ 有很是顯著的區別,RocketMQ 的內存 Buffer 抽象成一個無限長度的隊列,無論有多少數據進來都能裝得下,這個無限是有前提的, Broker 會按期刪除過時的數據,例如 Broker 只保存 3 天的消息,那麼這個 Buffer 雖然長度無限,可是 3 天前的數據會被從隊尾刪除。
回溯消費是指 Consumer 已經消費成功的消息,因爲業務上需求須要從新消費, 要支持此功能,Broker 在向 Consumer 投遞成功消息後,消息仍然須要保留。而且從新消費通常是按照時間維度,例如因爲 Consumer 系統故障, 恢復後須要從新消費 1 小時前的數據,那麼 Broker 要提供一種機制,能夠按照時間維度來回退消費進度。
RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,能夠向前回溯,也能夠向後回溯。
消息中間件的主要功能是異步解耦,還有個重要功能是擋住前端的數據洪峯,保證後端系統的穩定性, 這就要求消息中間件具備必定的消息堆積能力,消息堆積分如下兩種狀況:
(1). 消息堆積在內存 Buffer,一旦超過內存 Buffer,能夠根據必定的丟棄策略來丟棄消息,如 CORBA Notification 規範中描述。適合能容忍丟棄消息的業務,這種狀況消息的堆積能力主要在於內存 Buffer 大小,並且消息 堆積後,性能降低不會太大,由於內存中數據多少對於對外提供的訪問能力影響有限。
(2). 消息堆積到持久化存儲系統中,例如 DB,KV 存儲,文件記錄形式。
當消息不能在內存 Cache 命中時,要不可避免的訪問磁盤,會產生大量讀 IO,讀 IO 的吞吐量直接決定了 消息堆積後的訪問能力。
評估消息堆積能力主要有如下四點:
(1). 消息能堆積多少條,多少字節?即消息的堆積容量。
(2). 消息堆積後,發消息的吞吐量大小,是否會受堆積影響?
(3). 消息堆積後,正常消費的 Consumer 是否會受影響?
(4). 消息堆積後,訪問堆積在磁盤的消息時,吞吐量有多大?
已知的幾個分佈式事務規範,如 XA,JTA 等。其中 XA 規範被各大數據庫廠商普遍支持,如 Oracle,Mysql 等。 其中 XA 的 TM 實現佼佼者如 Oracle Tuxedo,在金融、電信等領域被普遍應用。
分佈式事務涉及到兩階段提交問題,在數據存儲方面的方面必然須要 KV 存儲的支持,由於第二階段的提交回滾須要修改消息狀態,必定涉及到根據 Key 去查找 Message 的動做。RocketMQ 在第二階段繞過了根據 Key 去查找 Message 的問題, 採用第一階段發送 Prepared 消息時,拿到了消息的 Offset,第二階段經過 Offset 去訪問消息, 並修改狀態,Offset 就是數據的地址。
RocketMQ 這種實現事務方式,沒有經過 KV 存儲作,而是經過 Offset 方式,存在一個顯著缺陷,即經過 Offset 更改數據,會令系統的髒頁過多,須要特別關注。
定時消息是指消息發到 Broker 後,不能馬上被 Consumer 消費,要到特定的時間點或者等待特定的時間後才能 被消費。
若是要支持任意的時間精度,在 Broker 層面,必需要作消息排序, 若是再涉及到持久化,那麼消息排序要不可避免的產生巨大性能開銷。
RocketMQ 支持定時消息,可是不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。
Consumer 消費消息失敗後,要提供一種重試機制,令消息再消費一次。Consumer 消費消息失敗一般能夠認爲 有如下幾種狀況
因爲消息自己的緣由,例如反序列化失敗,消息數據自己沒法處理(例如話費充值,當前消息的手機號被註銷,沒法充值)等。 這種錯誤一般須要跳過這條消息,再消費其餘消息,而這條失敗的消息即便馬上重試消費,99%也不成功, 因此最好提供一種定時重試機制,即過 10s 秒後再重試。
因爲依賴的下游應用服務不可用,例如 db 鏈接不可用,外系統網絡不可達等。 遇到這種錯誤,即便跳過當前失敗的消息,消費其餘消息一樣也會報錯。這種狀況建議應用 sleep 30s,再消費下一條消息,這樣能夠減輕 Broker 重試消息的壓力。
圖5-1
圖5-2
RocketMQ 網絡部署特色
Producer Group
用來表示一個發送消息應用,一個 Producer Group 下包含多個 Producer 實例,能夠是多臺機器,也能夠 是一臺機器的多個進程,或者一個進程的多個 Producer 對象。 一個 Producer Group 能夠發送多個 Topic 消息,Producer Group 做用以下:
1.標識一類 Producer
2.能夠經過運維工具查詢這個發送消息應用下有多個 Producer 實例
3.發送分佈式事務消息時,若是 Producer 中途意外宕機,Broker 會主動回調 Producer Group 內的任意一臺機器來確認事務狀態。
Consumer Group
用來表示一個消費消息應用,一個 Consumer Group 下包含多個 Consumer 實例,能夠是多臺機器,也可 以是多個進程,或者是一個進程的多個 Consumer 對象。一個 Consumer Group 下的多個 Consumer 以均攤 方式消費消息,若是設置爲廣播方式,那麼這個 Consumer Group 下的每一個實例都消費全量數據。
Consumer 消費消息過程,使用了零拷貝,零拷貝包含如下兩種方式
使用 mmap + write 方式
優勢:即便頻繁調用,使用小塊文件傳輸,效率也很高
缺點:不能很好的利用 DMA 方式,會比 sendfile 多消耗 CPU,內存安全性控制複雜,須要避免 JVM Crash 問題。
使用 sendfile 方式
優勢:能夠利用 DMA 方式,消耗 CPU 較少,大塊文件傳輸效率高,無內存安全新問題。
缺點:小塊文件效率低於 mmap 方式,只能是 BIO 方式傳輸,不能使用 NIO。
RocketMQ 選擇了第一種方式,mmap+write 方式,由於有小塊數據傳輸的需求,效果會比 sendfile 更好。
關於 Zero Copy 的更詳細介紹,請參考如下文章
http://www.linuxjournal.com/article/6345
RocketMQ 選擇 Linux Ext4 文件系統,緣由以下:
Ext4 文件系統刪除 1G 大小的文件一般耗時小於 50ms,而 Ext3 文件系統耗時約 1s 左右,且刪除文件時,磁盤 IO 壓力極大,會致使 IO 寫入超時。
文件系統層面須要作如下調優措施
文件系統 IO 調度算法須要調整爲 deadline,由於 deadline 算法在隨機讀狀況下,能夠合併讀請求爲順序跳躍 方式,從而提升讀 IO 吞吐量。
Ext4 文件系統有如下 Bug,請注意
http://blog.donghao.org/2013/03/20/修復ext4日誌(jbd2)bug/
圖6-1
圖7-1
(1). 全部數據單獨存儲到一個 Commit Log,徹底順序寫,隨機讀。
(2). 對最終用戶展示的隊列實際只存儲消息在 Commit Log 的位置信息,而且串行方式刷盤。
這樣作的好處以下:
(1). 隊列輕量化,單個隊列數據量很是少。
(2). 對磁盤的訪問串行化,避免磁盤竟爭,不會由於隊列增長致使 IOWAIT 增高。
每一個方案都有缺點,它的缺點以下:
(1). 寫雖然徹底是順序寫,可是讀卻變成了徹底的隨機讀。
(2). 讀一條消息,會先讀 Consume Queue,再讀 Commit Log,增長了開銷。
(3). 要保證 Commit Log 與 Consume Queue 徹底的一致,增長了編程的複雜度。
以上缺點如何克服:
(1). 隨機讀,儘量讓讀命中 PAGECACHE,減小 IO 讀操做,因此內存越大越好。若是系統中堆積的消息過多, 讀數據要訪問磁盤會不會因爲隨機讀致使系統性能急劇降低,答案是否認的。
a) 訪問 PAGECACHE 時,即便只訪問 1k 的消息,系統也會提早預讀出更多數據,在下次讀時,就可能命 中內存。
b) 隨機訪問 Commit Log 磁盤數據,系統 IO 調度算法設置爲 NOOP 方式,會在必定程度上將徹底的隨機 讀變成順序跳躍方式,而順序跳躍方式讀較徹底的隨機讀性能會高 5 倍以上,可參見如下針對各類 IO 方式的性能數據。
http://stblog.baidu-tech.com/?p=851
另外 4k 的消息在徹底隨機訪問狀況下,仍然能夠達到 8K 次每秒以上的讀性能。
(2). 因爲 Consume Queue 存儲數據量極少,並且是順序讀,在 PAGECACHE 預讀做用下,Consume Queue 的讀性能幾乎與內存一致,即便堆積狀況下。 因此可認爲 Consume Queue 徹底不會阻礙讀性能。
(3). Commit Log 中存儲了全部的元信息,包含消息體,相似於 Mysql、Oracle 的 redolog,因此只要有 Commit Log 在,Consume Queue 即便數據丟失,仍然能夠恢復出來。
RocketMQ 的全部消息都是持久化的,先寫入系統 PAGECACHE,而後刷盤, 能夠保證內存與磁盤都有一份數據, 訪問時,直接從內存讀取。
圖7-2-1
在有 RAID 卡,SAS 15000 轉磁盤測試順序寫文件,速度能夠達到 300M 每秒左右,而線上的網卡通常都爲千兆 網卡,寫磁盤速度明顯快於數據網絡入口速度, 那麼是否能夠作到寫完內存就向用戶返回,由後臺線程刷盤呢?
(1). 因爲磁盤速度大於網卡速度,那麼刷盤的進度確定能夠跟上消息的寫入速度。
(2). 萬一因爲此時系統壓力過大,可能堆積消息,除了寫入 IO,還有讀取 IO,萬一出現磁盤讀取落後狀況, 會不會致使系統內存溢出,答案是否認的,緣由以下:
a) 寫入消息到 PAGECACHE 時,若是內存不足,則嘗試丟棄乾淨的 PAGE,騰出內存供新消息使用,策略 是 LRU 方式。
b) 若是乾淨頁不足,此時寫入 PAGECACHE 會被阻塞,系統嘗試刷盤部分數據,大約每次嘗試 32 個 PAGE,來找出更多幹淨 PAGE。
綜上,內存溢出的狀況不會出現。
圖7-2-2
同步刷盤與異步刷盤的惟一區別是異步刷盤寫完 PAGECACHE 直接返回,而同步刷盤須要等待刷盤完成才返回, 同步刷盤流程以下:
(1). 寫入 PAGECACHE 後,線程等待,通知刷盤線程刷盤。
(2). 刷盤線程刷盤後,喚醒前端等待線程,多是一批線程。
(3). 前端等待線程向用戶返回成功。
圖7-2
MsgId 總共 16 字節,包含消息存儲主機地址,消息 Commit Log offset。 從 MsgId 中解析出 Broker 的地址和 Commit Log 的偏移地址,而後按照存儲格式所在位置消息 buffer 解析成一個完整的消息。
圖7-3
根據查詢的 key 的 hashcode%slotNum 獲得具體的槽的位置(slotNum 是一個索引文件裏面包含的最大槽的數目, 例如圖中所示 slotNum=5000000)。
根據 slotValue(slot 位置對應的值)查找到索引項列表的最後一項(倒序排列,slotValue 老是指向最新的一個索引項)。
遍歷索引項列表返回查詢時間範圍內的結果集(默認一次最大返回的 32 條記錄)
Hash 衝突;尋找 key 的 slot 位置時至關於執行了兩次散列函數,一次 key 的 hash,一次 key 的 hash 值取模, 所以這裏存在兩次衝突的狀況;第一種,key 的 hash 值不一樣但模數相同(不一樣key的hash值不一樣但模數相同),此時查詢的時候會在比較一次 key 的 hash 值(每一個索引項保存了 key 的 hash 值),過濾掉 hash 值不相等的項。第二種,hash 值相等但 key 不等(不一樣key的hash值相同,模數固然相同), 出於性能的考慮衝突的檢測放到客戶端處理(key 的原始值是存儲在消息文件中的,避免對數據文件的解析), 客戶端比較一次消息體的 key 是否相同。
存儲;爲了節省空間索引項中存儲的時間是時間差值(存儲時間-開始時間,開始時間存儲在索引文件頭中), 整個索引文件是定長的,結構也是固定的。索引文件存儲結構參見圖 7.4.3-3 。
RocketMQ 的消息過濾方式有別於其餘消息中間件,是在訂閱時,再作過濾,先來看下 Consume Queue 的存儲結構。
圖7-4
(1). 在Broker端進行Message Tag比對,先遍歷 Consume Queue,若是存儲的 Message Tag 與訂閱的 Message Tag 不符合,則跳過,繼續比對下一個,符合則傳輸給 Consumer。 注意:Message Tag 是字符串形式,Consume Queue 中存儲的是其對應的 hashcode,比對時也是比對 hashcode。
(2). Consumer 收到過濾後的消息後,一樣也要執行在 Broker 端的操做,可是比對的是真實的 Message Tag 字符串,而不是 Hashcode。
爲何過濾要這樣作?
(1). Message Tag 存儲 Hashcode,是爲了在 Consume Queue 定長方式存儲,節約空間。
(2). 過濾過程當中不會訪問 Commit Log 數據,能夠保證堆積狀況下也能高效過濾。
(3). 即便存在 Hash 衝突,也能夠在 Consumer 端進行修正,保證萬無一失。
RocketMQ的Consumer都是從Broker拉消息來消費,可是爲了能作到實時收消息,RocketMQ 使用長輪詢方式,能夠保證消息實時性同 Push 方式一致。 這種長輪詢方式相似於 Web QQ 收發消息機制。請參考如下信息瞭解 更多
http://www.ibm.com/developerworks/cn/web/wa-lo-comet/
圖7-6-1
在RocketMQ中,主要指的是局部順序,即一類消息爲知足順序性,必須Producer單線程順序發送,且發送到同一個隊列(這就是原理), 這樣Consumer就能夠按照Producer發送的順序去消費消息。
發送順序消息沒法利用集羣 FailOver 特性
消費順序消息的並行度依賴於隊列數量
隊列熱點問題,個別隊列因爲哈希不均致使消息過多,消費速度跟不上,產生消息堆積問題
遇到消息失敗的消息,沒法跳過,當前隊列消費暫停
圖7-7-1
圖7-5
如圖所示,5 個隊列能夠部署在一臺機器上,也能夠分別部署在 5 臺不一樣的機器上, 發送消息經過輪詢隊列的方式 發送,每一個隊列接收平均的消息量。經過增長機器,能夠水平擴展隊列容量。
另外也能夠自定義方式選擇發往哪一個隊列。
圖7-6
如圖所示,若是有 5 個隊列,2 個 consumer,那麼第一個 Consumer 消費 3 個隊列,第二 consumer 消費 2 個隊列。
這樣便可達到平均消費的目的,能夠水平擴展 Consumer 來提升消費能力。 可是 Consumer 數量要小於等於隊列數 量,若是 Consumer 超過隊列數量,那麼多餘的 Consumer 將不能消費消息(不是隊列內也能夠並行?)。
表1-1
圖7-10
單隊列並行消費採用滑動窗口方式並行消費, 如圖所示,3~7 的消息在一個滑動窗口區間,能夠有多個線程並行消 費,可是每次提交的 Offset 都是最小 Offset,例如 3
異步複製的實現思路很是簡單,Slave 啓動一個線程,不斷從 Master 拉取 Commit Log 中的數據,而後在異步 build 出 Consume Queue 數據結構。整個實現過程基本同 Mysql 主從同步相似。
圖7-7
(1). Producer 發送消息,消息從 socket 進入 java 堆。
(2). Producer 發送消息,消息從 java 堆轉入 PAGACACHE,物理內存。
(3). Producer 發送消息,由異步線程刷盤,消息從 PAGECACHE 刷入磁盤。
(4). Consumer 拉消息(正常消費),消息直接從 PAGECACHE(數據在物理內存)轉入 socket,到達 consumer, 不通過 java 堆。這種消費場景最多,線上 96G 物理內存,按照 1K 消息算,能夠在物理內存緩存 1 億條消 息。
(5). Consumer 拉消息(異常消費),消息直接從 PAGECACHE(數據在虛擬內存)轉入 socket。
(6). Consumer 拉消息(異常消費),因爲 Socket 訪問了虛擬內存,產生缺頁中斷,此時會產生磁盤 IO,從磁
盤 Load 消息到 PAGECACHE,而後直接從 socket 發出去。
(7). 同 5 一致。
(8). 同 6 一致。
前面提到衡量消息中間件堆積能力的幾個指標,現將 RocketMQ 的堆積能力整理以下
表7-1
在有 Slave 狀況下,Master 一旦發現 Consumer 訪問堆積在磁盤的數據時,會向 Consumer 下達一個重定向指 令,令 Consumer 從 Slave 拉取數據,這樣正常的發消息與正常消費的 Consumer 都不會由於消息堆積受影響,由於 系統將堆積場景與非堆積場景分割在了兩個不一樣的節點處理。這裏會產生另外一個問題,Slave 會不會寫性能降低, 答案是否認的。由於 Slave 的消息寫入只追求吞吐量,不追求實時性,只要總體的吞吐量高就能夠,而 Slave 每次 都是從 Master 拉取一批數據,如 1M,這種批量順序寫入方式即便堆積狀況,總體吞吐量影響相對較小,只是寫入 RT 會變長。
/** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
如以上代碼所示,簡單消息過濾經過指定多個 Tag 來過濾消息,過濾動做在服務器進行。實現原理參照第 7.4 節
圖8-2
Broker 所在的機器會啓動多個 FilterServer 過濾進程
Consumer 啓動後,會向 FilterServer 上傳一個過濾的 Java 類
Consumer 從 FilterServer 拉消息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到消息後,按照 Consumer 上傳的 Java 過濾程序作過濾,過濾完成後返回給 Consumer。
總結:
使用 CPU 資源來換取網卡流量資源
FilterServer 與 Broker 部署在同一臺機器,數據經過本地迴環通訊,不走網卡
一臺 Broker 部署多個 FilterServer,充分利用 CPU 資源,由於單個 Jvm 難以全面利用高配的物理機 Cpu 資源
由於過濾代碼使用 Java 語言來編寫,應用幾乎能夠作任意形式的服務器端消息過濾,例如經過 Message Header 進行過濾,甚至能夠按照 Message Body 進行過濾。
使用 Java 語言進行做爲過濾表達式是一個雙刃劍,方便了應用的過濾操做,可是帶來了服務器端的安全風險。 須要應用來保證過濾代碼安全,例如在過濾程序裏儘量不作申請大內存,建立線程等操做。避免 Broker 服 務器發生資源泄漏。
使用方式參見 Github 例子
RocketMQ 通訊組件使用了 Netty-4.0.9.Final,在之上作了簡單的協議封裝。
Name Server 是專爲 RocketMQ 設計的輕量級名稱服務,代碼小於 1000 行,具備簡單、可集羣橫向擴展、無狀 態等特色。將要支持的主備自動切換功能會強依賴 Name Server。
參考:
http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/
https://www.jianshu.com/p/453c6e7ff81c
http://valleylord.github.io/post/201607-mq-rocketmq/