metaq原理簡介

1. 前言

本文檔旨在描述RocketMQ的多個關鍵特性的實現原理,並對消息中間件遇到的各類問題進行總結,闡述RocketMQ如何解決這些問題。文中主要引用了JMS規範與CORBA Notification規範,規範爲咱們設計系統指明瞭方向,可是仍有很多問題規範沒有說起,對於消息中間件又相當重要。RocketMQ並不遵循任何規範,可是參考了各類規範與同類產品的設計思想。前端

產品發展歷史

大約經歷了三個主要版本迭代
1、Metaq(Metamorphosis)1.x
由開源社區killme2008維護,開源社區很是活躍。
https://github.com/killme2008/Metamorphosis
2、Metaq 2.x
於2012年10月份上線,在淘寶內部被普遍使用。
3、RocketMQ 3.x
基於公司內部開源共建原則,RocketMQ項目只維護核心功能,且去除了全部其餘運行時依賴,核心功能最簡化。 每一個BU的個性化需求都在RocketMQ項目之上進行深度定製。RocketMQ向其餘BU提供的僅僅是Jar包,例如要定製一個Broker,那麼只須要依賴rocketmq-broker這個jar包便可,可經過API進行交互,若是定製client,則依賴rocketmq-client這個jar包,對其提供的api進行再封裝。開源社區地址:
https://github.com/alibaba/RocketMQ在RocketMQ項目基礎上衍生的項目以下java

  • com.taobao.metaq v3.0 = RocketMQ + 淘寶個性化需求
    爲淘寶應用提供消息服務
  • com.alipay.zpullmsg v1.0 = RocketMQ + 支付寶個性化需求
    爲支付寶應用提供消息服務
  • com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B個性化需求
    爲B2B應用提供消息服務

3. 專業術語

  • Producer
    消息生產者,負責產生消息,通常由業務系統負責產生消息。
  • Consumer
    消息消費者,負責消費消息,通常是後臺系統負責異步消費。
  • Push Consumer
    Consumer的一種,應用一般向Consumer對象註冊一個Listener接口,一旦收到消息,Consumer對象馬上回調Listener接口方法。
  • Pull Consumer
    Consumer的一種,應用一般主動調用Consumer的拉消息方法從Broker拉消息,主動權由應用控制。
  • ProducerGroup
    一類Producer的集合名稱,這類Producer一般發送一類消息,且發送邏輯一致。
  • Consumer Group
    一類Consumer的集合名稱,這類Consumer一般消費一類消息,且消費邏輯一致。
  • Broker
    消息中轉角色,負責存儲消息,轉發消息,通常也稱爲Server。 在JMS規範中稱爲Provider。
  • 廣播消費
    一條消息被多個Consumer消費,即便這些Consumer屬於同一個Consumer Group,消息也會被Consumer Group中的每一個Consumer都消費一次,廣播消費中的Consumer Group概念能夠認爲在消息劃分方面無心義。
    在CORBA Notification規範中,消費方式都屬於廣播消費。
    在JMS規範中,至關於JMS publish/subscribe model
  • 集羣消費
    一個Consumer Group中的Consumer實例平均分攤消費消息。例如某個Topic有9條消息,其中一個Consumer Group有3個實例(多是3個進程,或者3臺機器),那麼每一個實例只消費其中的3條消息。
    在CORBA Notification規範中,無此消費方式。
    在JMS規範中,JMS point-to-point model與之相似,可是RocketMQ的集羣消費功能大等於PTP模型。由於RocketMQ單個Consumer Group內的消費者相似於PTP,可是一個Topic/Queue能夠被多個Consumer Group消費。
  • 順序消息
    消費消息的順序要同發送消息的順序一致,在RocketMQ中,主要指的是局部順序,即一類消息爲知足順序性,必須Producer單線程順序發送,且發送到同一個隊列(這就是原理),這樣Consumer就能夠按照Producer發送的順序去消費消息。
  • 普通順序消息
    順序消息的一種,正常狀況下能夠保證徹底的順序消息,可是一旦發生通訊異常, Broker重啓,因爲隊列總數發生變化,哈希取模後定位的隊列會變化,產生短暫的消息順序不一致。 若是業務能容忍在集羣異常狀況(如某個Broker宕機或者重啓)下,消息短暫的亂序,使用普通順序方式比較合適。
  • 嚴格順序消息
    順序消息的一種, 不管正常異常狀況都能保證順序,可是犧牲了分佈式Failover特性,即Broker集羣中只要有一臺機器不可用,則整個集羣都不可用,服務可用性大大下降。
    若是服務器部署爲同步雙寫模式,此缺陷可經過備機自動切換爲主避免,不過仍然會存在幾分鐘的服務不可用。(依賴同步雙寫,主備自動切換,自動切換功能目前還未實現)
    目前已知的應用只有數據庫binlog同步強依賴嚴格順序消息,其餘應用絕大部分均可以容忍短暫亂序,推薦使用普通的順序消息。
  • Message Queue
    在RocketMQ中,全部消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指 隊列中的每一個存儲單元都是定長,訪問其中的存儲單元使用Offset來訪問,offset爲java long類型,64位,理論上在100年內不會溢出,因此認爲是長度無限, 另外隊列中 只保存最近幾天的數據, 以前的數據會按照過時時間來刪除。
    也能夠認爲Message Queue是一個長度無限的數組,offset就是下標。

4. 消息中間件須要解決哪些問題?

本節闡述消息中間件一般須要解決哪些問題,在解決這些問題當中會遇到什麼困難,RocketMQ是否能夠解決,規範中如何定義這些問題。linux

4.1 Publish/Subscribe

發佈訂閱是消息中間件的最基本功能,也是相對於傳統RPC通訊而言。在此再也不詳述。git

4.2 Message Priority

規範中描述的優先級是指在一個消息隊列中,每條消息都有不一樣的優先級,通常用整數來描述,優先級高的消息先投遞,若是消息徹底在一個內存隊列中,那麼在投遞前能夠按照優先級排序,令優先級高的先投遞。
因爲RocketMQ全部消息都是持久化的,因此若是按照優先級來排序,開銷會很是大,所以RocketMQ沒有特地支持消息優先級,可是能夠經過變通的方式實現相似功能, 即單獨配置一個優先級高的隊列,和一個普通優先級的隊列,將不一樣優先級發送到不一樣隊列便可。
對於優先級問題,能夠概括爲2類
1)只要達到優先級目的便可,不是嚴格意義上的優先級,一般將優先級劃分爲高、中、低,或者再多幾個級別。每一個優先級能夠用不一樣的topic表示,發消息時,指定不一樣的topic來表示優先級,這種方式能夠解決絕大部分的優先級問題,可是對業務的優先級精確性作了妥協。
2)嚴格的優先級,優先級用整數表示,例如0 ~ 65535,這種優先級問題通常使用不一樣topic解決就很是不合適。若是要讓MQ解決此問題,會對MQ的性能形成很是大的影響。這裏要確保一點,業務上是否確實須要這種嚴格的優先級,若是將優先級壓縮成幾個,對業務的影響有多大?github

4.3 Message Order

消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了3條消息,分別是訂單建立,訂單付款,訂單完成。消費時,要按照這個順序消費纔能有意義。可是同時訂單之間是能夠並行消費的。
RocketMQ能夠嚴格的保證消息有序。原理是?web

4.4 Message Filter

  • Broker端消息過濾
    在Broker中,按照Consumer的要求作過濾,優勢是減小了對於Consumer無用消息的網絡傳輸。缺點是增長了Broker的負擔,實現相對複雜。
    (1).淘寶Notify支持多種過濾方式,包含直接按照消息類型過濾,靈活的語法表達式過濾,幾乎能夠知足最苛刻的過濾需求。
    (2). 淘寶RocketMQ支持按照簡單的Message Tag過濾,也支持按照Message Header、body進行過濾。
    (3).CORBA Notification規範中也支持靈活的語法表達式過濾。
  • Consumer端消息過濾
    這種過濾方式可由應用徹底自定義實現,可是缺點是不少無用的消息要傳輸到Consumer端。

4.5 Message Persistence

消息中間件一般採用的幾種持久化方式:
(1).持久化到數據庫,例如Mysql。
(2).持久化到KV存儲,例如levelDB、伯克利DB等KV存儲系統。
(3). 文件記錄形式持久化,例如Kafka,RocketMQ
(4).對內存數據作一個持久化鏡像,例如beanstalkd,VisiNotify
(1)、(2)、(3)三種持久化方式都具備將內存隊列Buffer進行擴展的能力,(4)只是一個內存的鏡像,做用是當Broker掛掉重啓後仍然能將以前內存的數據恢復出來。
JMS與CORBA Notification規範沒有明確說明如何持久化,可是持久化部分的性能直接決定了整個消息中間件的性能。
RocketMQ參考了Kafka的持久化方式,充分利用Linux文件系統內存cache來提升性能。算法

4.6 Message Reliablity

影響消息可靠性的幾種狀況:
(1).Broker正常關閉
(2).Broker異常Crash
(3).OS Crash
(4).機器掉電,可是能當即恢復供電狀況。
(5).機器沒法開機(多是cpu、主板、內存等關鍵設備損壞)
(6).磁盤設備損壞。
(1)、(2)、(3)、(4)四種狀況都屬於硬件資源可當即恢復狀況,RocketMQ在這四種狀況下能保證消息不丟,或者丟失少許數據(依賴刷盤方式是同步仍是異步)。
(5)、(6)屬於單點故障,且沒法恢復,一旦發生,在此單點上的消息所有丟失。RocketMQ在這兩種狀況下,經過異步複製,可保證99%的消息不丟,可是仍然會有極少許的消息可能丟失。經過同步雙寫技術能夠徹底避免單點,同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與Money相關的應用。
RocketMQ從3.0版本開始支持同步雙寫。sql

4.7 Low Latency Messaging

在消息不堆積狀況下,消息到達Broker後,能馬上到達Consumer。
RocketMQ使用長輪詢Pull方式,可保證消息很是實時,消息實時性不低於Push。數據庫

4.8 At least Once

是指每一個消息必須投遞一次
RocketMQ Consumer 先 pull 消息到本地,消費完成後,才向服務器返回 ack,若是沒有消費必定不會 ack 消息, 因此 RocketMQ 能夠很好的支持此特性。編程

4.9 Exactly Only Once

(1). 發送消息階段,不容許發送重複的消息。
(2). 消費消息階段,不容許消費重複的消息。

只有以上兩個條件都知足狀況下,才能認爲消息是「Exactly Only Once」,而要實現以上兩點,在分佈式系統環 境下,不可避免要產生巨大的開銷。因此 RocketMQ 爲了追求高性能,並不保證此特性,要求在業務上進行去重, 也就是說消費消息要作到冪等性。RocketMQ 雖然不能嚴格保證不重複,可是正常狀況下不多會出現重複發送、消費狀況,只有網絡異常,Consumer 啓停等異常狀況下會出現消息重複。

此問題的本質緣由是網絡調用存在不肯定性,即既不成功也不失敗的第三種狀態,因此才產生了消息重複性問題。

4.10 Broker 的 Buffer 滿了怎麼辦?

Broker 的 Buffer 一般指的是 Broker 中一個隊列的內存 Buffer 大小,這類 Buffer 一般大小有限,若是 Buffer 滿 了之後怎麼辦?

下面是 CORBA Notification 規範中處理方式:
(1). RejectNewEvents
拒絕新來的消息,向 Producer 返回 RejectNewEvents 錯誤碼。

(2). 按照特定策略丟棄已有消息
a) AnyOrder - Any event may be discarded on overflow. This is the default setting for this property.
b) FifoOrder - The first event received will be the first discarded.
c) LifoOrder - The last event received will be the first discarded.
d) PriorityOrder - Events should be discarded in priority order, such that lower priority

RocketMQ 沒有內存 Buffer 概念,RocketMQ 的隊列都是持久化磁盤,數據按期清除。

對於此問題的解決思路,RocketMQ 同其餘 MQ 有很是顯著的區別,RocketMQ 的內存 Buffer 抽象成一個無限長度的隊列,無論有多少數據進來都能裝得下,這個無限是有前提的, Broker 會按期刪除過時的數據,例如 Broker 只保存 3 天的消息,那麼這個 Buffer 雖然長度無限,可是 3 天前的數據會被從隊尾刪除。

4.11 回溯消費

回溯消費是指 Consumer 已經消費成功的消息,因爲業務上需求須要從新消費, 要支持此功能,Broker 在向 Consumer 投遞成功消息後,消息仍然須要保留。而且從新消費通常是按照時間維度,例如因爲 Consumer 系統故障, 恢復後須要從新消費 1 小時前的數據,那麼 Broker 要提供一種機制,能夠按照時間維度來回退消費進度。

RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,能夠向前回溯,也能夠向後回溯。

4.12 消息堆積

消息中間件的主要功能是異步解耦,還有個重要功能是擋住前端的數據洪峯,保證後端系統的穩定性, 這就要求消息中間件具備必定的消息堆積能力,消息堆積分如下兩種狀況:

(1). 消息堆積在內存 Buffer,一旦超過內存 Buffer,能夠根據必定的丟棄策略來丟棄消息,如 CORBA Notification 規範中描述。適合能容忍丟棄消息的業務,這種狀況消息的堆積能力主要在於內存 Buffer 大小,並且消息 堆積後,性能降低不會太大,由於內存中數據多少對於對外提供的訪問能力影響有限。

(2). 消息堆積到持久化存儲系統中,例如 DB,KV 存儲,文件記錄形式。

當消息不能在內存 Cache 命中時,要不可避免的訪問磁盤,會產生大量讀 IO,讀 IO 的吞吐量直接決定了 消息堆積後的訪問能力。

評估消息堆積能力主要有如下四點:
(1). 消息能堆積多少條,多少字節?即消息的堆積容量。
(2). 消息堆積後,發消息的吞吐量大小,是否會受堆積影響?
(3). 消息堆積後,正常消費的 Consumer 是否會受影響?
(4). 消息堆積後,訪問堆積在磁盤的消息時,吞吐量有多大?

4.13 分佈式事務

已知的幾個分佈式事務規範,如 XA,JTA 等。其中 XA 規範被各大數據庫廠商普遍支持,如 Oracle,Mysql 等。 其中 XA 的 TM 實現佼佼者如 Oracle Tuxedo,在金融、電信等領域被普遍應用。

分佈式事務涉及到兩階段提交問題,在數據存儲方面的方面必然須要 KV 存儲的支持,由於第二階段的提交回滾須要修改消息狀態,必定涉及到根據 Key 去查找 Message 的動做。RocketMQ 在第二階段繞過了根據 Key 去查找 Message 的問題, 採用第一階段發送 Prepared 消息時,拿到了消息的 Offset,第二階段經過 Offset 去訪問消息, 並修改狀態,Offset 就是數據的地址。

RocketMQ 這種實現事務方式,沒有經過 KV 存儲作,而是經過 Offset 方式,存在一個顯著缺陷,即經過 Offset 更改數據,會令系統的髒頁過多,須要特別關注。

4.14 定時消息

定時消息是指消息發到 Broker 後,不能馬上被 Consumer 消費,要到特定的時間點或者等待特定的時間後才能 被消費。

若是要支持任意的時間精度,在 Broker 層面,必需要作消息排序, 若是再涉及到持久化,那麼消息排序要不可避免的產生巨大性能開銷。

RocketMQ 支持定時消息,可是不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。

4.15 消息重試

Consumer 消費消息失敗後,要提供一種重試機制,令消息再消費一次。Consumer 消費消息失敗一般能夠認爲 有如下幾種狀況

  1. 因爲消息自己的緣由,例如反序列化失敗,消息數據自己沒法處理(例如話費充值,當前消息的手機號被註銷,沒法充值)等。 這種錯誤一般須要跳過這條消息,再消費其餘消息,而這條失敗的消息即便馬上重試消費,99%也不成功, 因此最好提供一種定時重試機制,即過 10s 秒後再重試。

  2. 因爲依賴的下游應用服務不可用,例如 db 鏈接不可用,外系統網絡不可達等。 遇到這種錯誤,即便跳過當前失敗的消息,消費其餘消息一樣也會報錯。這種狀況建議應用 sleep 30s,再消費下一條消息,這樣能夠減輕 Broker 重試消息的壓力。

5 RocketMQ Overview

5.1 RocketMQ 是什麼?

圖5-1

  • 是一個隊列模型的消息中間件,具備高性能、高可靠、高實時、分佈式特色。
  • Producer、Consumer、隊列均可以分佈式。
  • Producer 向一些隊列輪流發送消息,隊列集合稱爲 Topic, Consumer 若是作廣播消費,則一個 consumer 實例消費這個 Topic 對應的全部隊列,若是作集羣消費,則多個 Consumer 實例平均消費這個 topic 對應的隊列集合。
  • 可以保證嚴格的消息順序
  • 提供豐富的消息拉取模式
  • 高效的訂閱者水平擴展能力
  • 實時的消息訂閱機制
  • 億級消息堆積能力
  • 較少的依賴

5.2 RocketMQ 物理部署結構

圖5-2
RocketMQ 網絡部署特色

  • Name Server 是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。
  • Broker 部署相對複雜,Broker 分爲 Master 與 Slave,一個 Master 能夠對應多個 Slave,可是一個 Slave 只能 對應一個 Master,Master 與 Slave 的對應關係經過指定相同的 BrokerName,不一樣的 BrokerId 來定義,BrokerId爲 0 表示 Master,非 0 表示 Slave。Master 也能夠部署多個。每一個 Broker 與 Name Server 集羣中的全部節 點創建長鏈接,定時註冊 Topic 信息到全部 Name Server。
  • Producer 與 Name Server 集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從 Name Server 取 Topic 路由信息,並向提供 Topic 服務的 Master 創建長鏈接,且定時向 Master 發送心跳。 Producer 徹底無狀態,可 集羣部署。
  • Consumer 與 Name Server 集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從 Name Server 取 Topic 路由信息,並向提供 Topic 服務的 Master、Slave 創建長鏈接,且定時向 Master、Slave 發送心跳。 Consumer 既能夠從 Master 訂閱消息,也能夠從 Slave 訂閱消息,訂閱規則由 Broker 配置決定。

5.3 RocketMQ 邏輯部署結構

  • Producer Group
    用來表示一個發送消息應用,一個 Producer Group 下包含多個 Producer 實例,能夠是多臺機器,也能夠 是一臺機器的多個進程,或者一個進程的多個 Producer 對象。 一個 Producer Group 能夠發送多個 Topic 消息,Producer Group 做用以下:
    1.標識一類 Producer
    2.能夠經過運維工具查詢這個發送消息應用下有多個 Producer 實例
    3.發送分佈式事務消息時,若是 Producer 中途意外宕機,Broker 會主動回調 Producer Group 內的任意一臺機器來確認事務狀態。

  • Consumer Group
    用來表示一個消費消息應用,一個 Consumer Group 下包含多個 Consumer 實例,能夠是多臺機器,也可 以是多個進程,或者是一個進程的多個 Consumer 對象。一個 Consumer Group 下的多個 Consumer 以均攤 方式消費消息,若是設置爲廣播方式,那麼這個 Consumer Group 下的每一個實例都消費全量數據。

6 RocketMQ 存儲特色

6.1 零拷貝原理

Consumer 消費消息過程,使用了零拷貝,零拷貝包含如下兩種方式

  1. 使用 mmap + write 方式
    優勢:即便頻繁調用,使用小塊文件傳輸,效率也很高
    缺點:不能很好的利用 DMA 方式,會比 sendfile 多消耗 CPU,內存安全性控制複雜,須要避免 JVM Crash 問題。

  2. 使用 sendfile 方式
    優勢:能夠利用 DMA 方式,消耗 CPU 較少,大塊文件傳輸效率高,無內存安全新問題。
    缺點:小塊文件效率低於 mmap 方式,只能是 BIO 方式傳輸,不能使用 NIO。

RocketMQ 選擇了第一種方式,mmap+write 方式,由於有小塊數據傳輸的需求,效果會比 sendfile 更好。

關於 Zero Copy 的更詳細介紹,請參考如下文章

http://www.linuxjournal.com/article/6345

6.2 文件系統

RocketMQ 選擇 Linux Ext4 文件系統,緣由以下:

Ext4 文件系統刪除 1G 大小的文件一般耗時小於 50ms,而 Ext3 文件系統耗時約 1s 左右,且刪除文件時,磁盤 IO 壓力極大,會致使 IO 寫入超時。

文件系統層面須要作如下調優措施

文件系統 IO 調度算法須要調整爲 deadline,由於 deadline 算法在隨機讀狀況下,能夠合併讀請求爲順序跳躍 方式,從而提升讀 IO 吞吐量。

Ext4 文件系統有如下 Bug,請注意

http://blog.donghao.org/2013/03/20/修復ext4日誌(jbd2)bug/

6.3 數據存儲結構

圖6-1

6.4 存儲目錄結構

6.5 數據可靠性

7 RocketMQ 關鍵特性

7.1 單機支持 1 萬以上持久化隊列

圖7-1
(1). 全部數據單獨存儲到一個 Commit Log,徹底順序寫,隨機讀。

(2). 對最終用戶展示的隊列實際只存儲消息在 Commit Log 的位置信息,而且串行方式刷盤。

這樣作的好處以下:

(1). 隊列輕量化,單個隊列數據量很是少。

(2). 對磁盤的訪問串行化,避免磁盤竟爭,不會由於隊列增長致使 IOWAIT 增高。

每一個方案都有缺點,它的缺點以下:

(1). 寫雖然徹底是順序寫,可是讀卻變成了徹底的隨機讀。

(2). 讀一條消息,會先讀 Consume Queue,再讀 Commit Log,增長了開銷。

(3). 要保證 Commit Log 與 Consume Queue 徹底的一致,增長了編程的複雜度。

以上缺點如何克服:

(1). 隨機讀,儘量讓讀命中 PAGECACHE,減小 IO 讀操做,因此內存越大越好。若是系統中堆積的消息過多, 讀數據要訪問磁盤會不會因爲隨機讀致使系統性能急劇降低,答案是否認的。

a) 訪問 PAGECACHE 時,即便只訪問 1k 的消息,系統也會提早預讀出更多數據,在下次讀時,就可能命 中內存。

b) 隨機訪問 Commit Log 磁盤數據,系統 IO 調度算法設置爲 NOOP 方式,會在必定程度上將徹底的隨機 讀變成順序跳躍方式,而順序跳躍方式讀較徹底的隨機讀性能會高 5 倍以上,可參見如下針對各類 IO 方式的性能數據。

http://stblog.baidu-tech.com/?p=851

另外 4k 的消息在徹底隨機訪問狀況下,仍然能夠達到 8K 次每秒以上的讀性能。

(2). 因爲 Consume Queue 存儲數據量極少,並且是順序讀,在 PAGECACHE 預讀做用下,Consume Queue 的讀性能幾乎與內存一致,即便堆積狀況下。 因此可認爲 Consume Queue 徹底不會阻礙讀性能。

(3). Commit Log 中存儲了全部的元信息,包含消息體,相似於 Mysql、Oracle 的 redolog,因此只要有 Commit Log 在,Consume Queue 即便數據丟失,仍然能夠恢復出來。

7.2 刷盤策略

RocketMQ 的全部消息都是持久化的,先寫入系統 PAGECACHE,而後刷盤, 能夠保證內存與磁盤都有一份數據, 訪問時,直接從內存讀取。

7.2.1 異步刷盤

圖7-2-1
在有 RAID 卡,SAS 15000 轉磁盤測試順序寫文件,速度能夠達到 300M 每秒左右,而線上的網卡通常都爲千兆 網卡,寫磁盤速度明顯快於數據網絡入口速度, 那麼是否能夠作到寫完內存就向用戶返回,由後臺線程刷盤呢?

(1). 因爲磁盤速度大於網卡速度,那麼刷盤的進度確定能夠跟上消息的寫入速度。

(2). 萬一因爲此時系統壓力過大,可能堆積消息,除了寫入 IO,還有讀取 IO,萬一出現磁盤讀取落後狀況, 會不會致使系統內存溢出,答案是否認的,緣由以下:

a) 寫入消息到 PAGECACHE 時,若是內存不足,則嘗試丟棄乾淨的 PAGE,騰出內存供新消息使用,策略 是 LRU 方式。

b) 若是乾淨頁不足,此時寫入 PAGECACHE 會被阻塞,系統嘗試刷盤部分數據,大約每次嘗試 32 個 PAGE,來找出更多幹淨 PAGE。

綜上,內存溢出的狀況不會出現。

7.2.2 同步刷盤

圖7-2-2
同步刷盤與異步刷盤的惟一區別是異步刷盤寫完 PAGECACHE 直接返回,而同步刷盤須要等待刷盤完成才返回, 同步刷盤流程以下:

(1). 寫入 PAGECACHE 後,線程等待,通知刷盤線程刷盤。

(2). 刷盤線程刷盤後,喚醒前端等待線程,多是一批線程。

(3). 前端等待線程向用戶返回成功。

7.3 消息查詢

7.3.1 按照 Message Id 查詢消息

圖7-2
MsgId 總共 16 字節,包含消息存儲主機地址,消息 Commit Log offset。 從 MsgId 中解析出 Broker 的地址和 Commit Log 的偏移地址,而後按照存儲格式所在位置消息 buffer 解析成一個完整的消息。

7.3.2 按照 Message Key 查詢消息

圖7-3

  1. 根據查詢的 key 的 hashcode%slotNum 獲得具體的槽的位置(slotNum 是一個索引文件裏面包含的最大槽的數目, 例如圖中所示 slotNum=5000000)。

  2. 根據 slotValue(slot 位置對應的值)查找到索引項列表的最後一項(倒序排列,slotValue 老是指向最新的一個索引項)。

  3. 遍歷索引項列表返回查詢時間範圍內的結果集(默認一次最大返回的 32 條記錄)

  4. Hash 衝突;尋找 key 的 slot 位置時至關於執行了兩次散列函數,一次 key 的 hash,一次 key 的 hash 值取模, 所以這裏存在兩次衝突的狀況;第一種,key 的 hash 值不一樣但模數相同(不一樣key的hash值不一樣但模數相同),此時查詢的時候會在比較一次 key 的 hash 值(每一個索引項保存了 key 的 hash 值),過濾掉 hash 值不相等的項。第二種,hash 值相等但 key 不等(不一樣key的hash值相同,模數固然相同), 出於性能的考慮衝突的檢測放到客戶端處理(key 的原始值是存儲在消息文件中的,避免對數據文件的解析), 客戶端比較一次消息體的 key 是否相同。

  5. 存儲;爲了節省空間索引項中存儲的時間是時間差值(存儲時間-開始時間,開始時間存儲在索引文件頭中), 整個索引文件是定長的,結構也是固定的。索引文件存儲結構參見圖 7.4.3-3 。

7.4 服務器消息過濾

RocketMQ 的消息過濾方式有別於其餘消息中間件,是在訂閱時,再作過濾,先來看下 Consume Queue 的存儲結構。
圖7-4

(1). 在Broker端進行Message Tag比對,先遍歷 Consume Queue,若是存儲的 Message Tag 與訂閱的 Message Tag 不符合,則跳過,繼續比對下一個,符合則傳輸給 Consumer。 注意:Message Tag 是字符串形式,Consume Queue 中存儲的是其對應的 hashcode,比對時也是比對 hashcode。

(2). Consumer 收到過濾後的消息後,一樣也要執行在 Broker 端的操做,可是比對的是真實的 Message Tag 字符串,而不是 Hashcode。

爲何過濾要這樣作?

(1). Message Tag 存儲 Hashcode,是爲了在 Consume Queue 定長方式存儲,節約空間。
(2). 過濾過程當中不會訪問 Commit Log 數據,能夠保證堆積狀況下也能高效過濾。
(3). 即便存在 Hash 衝突,也能夠在 Consumer 端進行修正,保證萬無一失。

7.5 長輪詢 Pull

RocketMQ的Consumer都是從Broker拉消息來消費,可是爲了能作到實時收消息,RocketMQ 使用長輪詢方式,能夠保證消息實時性同 Push 方式一致。 這種長輪詢方式相似於 Web QQ 收發消息機制。請參考如下信息瞭解 更多

http://www.ibm.com/developerworks/cn/web/wa-lo-comet/

7.6 順序消息

7.6.1 順序消息原理

圖7-6-1
在RocketMQ中,主要指的是局部順序,即一類消息爲知足順序性,必須Producer單線程順序發送,且發送到同一個隊列(這就是原理), 這樣Consumer就能夠按照Producer發送的順序去消費消息。

7.6.2 順序消息缺陷

  • 發送順序消息沒法利用集羣 FailOver 特性

  • 消費順序消息的並行度依賴於隊列數量

  • 隊列熱點問題,個別隊列因爲哈希不均致使消息過多,消費速度跟不上,產生消息堆積問題

  • 遇到消息失敗的消息,沒法跳過,當前隊列消費暫停

7.7 事務消息

圖7-7-1

7.8 發送消息負載均衡

圖7-5

如圖所示,5 個隊列能夠部署在一臺機器上,也能夠分別部署在 5 臺不一樣的機器上, 發送消息經過輪詢隊列的方式 發送,每一個隊列接收平均的消息量。經過增長機器,能夠水平擴展隊列容量。

另外也能夠自定義方式選擇發往哪一個隊列。

7.9 訂閱消息負載均衡

圖7-6

如圖所示,若是有 5 個隊列,2 個 consumer,那麼第一個 Consumer 消費 3 個隊列,第二 consumer 消費 2 個隊列。

這樣便可達到平均消費的目的,能夠水平擴展 Consumer 來提升消費能力。 可是 Consumer 數量要小於等於隊列數 量,若是 Consumer 超過隊列數量,那麼多餘的 Consumer 將不能消費消息(不是隊列內也能夠並行?)。
表1-1

7.10 單隊列並行消費

圖7-10
單隊列並行消費採用滑動窗口方式並行消費, 如圖所示,3~7 的消息在一個滑動窗口區間,能夠有多個線程並行消 費,可是每次提交的 Offset 都是最小 Offset,例如 3

7.11 發送定時消息

7.12 消息消費失敗,定時重試

7.13 HA,同步雙寫/異步複製

異步複製的實現思路很是簡單,Slave 啓動一個線程,不斷從 Master 拉取 Commit Log 中的數據,而後在異步 build 出 Consume Queue 數據結構。整個實現過程基本同 Mysql 主從同步相似。

7.14 單個 JVM 進程也能利用機器超大內存

圖7-7

(1). Producer 發送消息,消息從 socket 進入 java 堆。

(2). Producer 發送消息,消息從 java 堆轉入 PAGACACHE,物理內存。

(3). Producer 發送消息,由異步線程刷盤,消息從 PAGECACHE 刷入磁盤。

(4). Consumer 拉消息(正常消費),消息直接從 PAGECACHE(數據在物理內存)轉入 socket,到達 consumer, 不通過 java 堆。這種消費場景最多,線上 96G 物理內存,按照 1K 消息算,能夠在物理內存緩存 1 億條消 息。

(5). Consumer 拉消息(異常消費),消息直接從 PAGECACHE(數據在虛擬內存)轉入 socket。

(6). Consumer 拉消息(異常消費),因爲 Socket 訪問了虛擬內存,產生缺頁中斷,此時會產生磁盤 IO,從磁

盤 Load 消息到 PAGECACHE,而後直接從 socket 發出去。

(7). 同 5 一致。

(8). 同 6 一致。

7.15 消息堆積問題解決辦法

前面提到衡量消息中間件堆積能力的幾個指標,現將 RocketMQ 的堆積能力整理以下
表7-1

在有 Slave 狀況下,Master 一旦發現 Consumer 訪問堆積在磁盤的數據時,會向 Consumer 下達一個重定向指 令,令 Consumer 從 Slave 拉取數據,這樣正常的發消息與正常消費的 Consumer 都不會由於消息堆積受影響,由於 系統將堆積場景與非堆積場景分割在了兩個不一樣的節點處理。這裏會產生另外一個問題,Slave 會不會寫性能降低, 答案是否認的。由於 Slave 的消息寫入只追求吞吐量,不追求實時性,只要總體的吞吐量高就能夠,而 Slave 每次 都是從 Master 拉取一批數據,如 1M,這種批量順序寫入方式即便堆積狀況,總體吞吐量影響相對較小,只是寫入 RT 會變長。

8 RocketMQ 消息過濾

/** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

如以上代碼所示,簡單消息過濾經過指定多個 Tag 來過濾消息,過濾動做在服務器進行。實現原理參照第 7.4 節

8.2 高級消息過濾

圖8-2

  1. Broker 所在的機器會啓動多個 FilterServer 過濾進程

  2. Consumer 啓動後,會向 FilterServer 上傳一個過濾的 Java 類

  3. Consumer 從 FilterServer 拉消息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到消息後,按照 Consumer 上傳的 Java 過濾程序作過濾,過濾完成後返回給 Consumer。

總結:

  1. 使用 CPU 資源來換取網卡流量資源

  2. FilterServer 與 Broker 部署在同一臺機器,數據經過本地迴環通訊,不走網卡

  3. 一臺 Broker 部署多個 FilterServer,充分利用 CPU 資源,由於單個 Jvm 難以全面利用高配的物理機 Cpu 資源

  4. 由於過濾代碼使用 Java 語言來編寫,應用幾乎能夠作任意形式的服務器端消息過濾,例如經過 Message Header 進行過濾,甚至能夠按照 Message Body 進行過濾。

  5. 使用 Java 語言進行做爲過濾表達式是一個雙刃劍,方便了應用的過濾操做,可是帶來了服務器端的安全風險。 須要應用來保證過濾代碼安全,例如在過濾程序裏儘量不作申請大內存,建立線程等操做。避免 Broker 服 務器發生資源泄漏。

使用方式參見 Github 例子

https://github.com/alibaba/RocketMQ/blob/develop/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java

9 RocketMQ 通訊組件

RocketMQ 通訊組件使用了 Netty-4.0.9.Final,在之上作了簡單的協議封裝。

10 RocketMQ 服務發現(Name Server)

Name Server 是專爲 RocketMQ 設計的輕量級名稱服務,代碼小於 1000 行,具備簡單、可集羣橫向擴展、無狀 態等特色。將要支持的主備自動切換功能會強依賴 Name Server。

參考:
http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/
https://www.jianshu.com/p/453c6e7ff81c
http://valleylord.github.io/post/201607-mq-rocketmq/

相關文章
相關標籤/搜索