一.瞭解RocketMQ?html
rocketMQ是阿里開源的一款十分優秀的消息隊列,rocketMQ具備不少其餘消息隊列不具備的特性,更重要的是rocketMQ是用java開發的學習成本較低,而且經歷了雙11的數據洪峯的考驗。rocketMQ已經加入了apache,成爲apache的頂級項目,最近阿里的另外一款開源項目dubbo也從新開始維護。java
阿里在RocketMQ 項目基礎上衍生的項目以下:git
com.taobao.metaq v3.0 = RocketMQ + 淘寶個性化需求爲淘寶應用提供消息服務。github
om.alipay.zpullmsg v1.0 = RocketMQ + 支付寶個性化需求爲支付寶應用提供消息服務 數據庫
com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B 個性化需求爲 B2B 應用提供消息服務apache
目前RocketMQ的代碼託管在github上:
老的地址:https://github.com/alibaba/RocketMQ
新的地址:https://github.com/apache/incubator-rocketmq安全
中間件比較:服務器
https://rocketmq.apache.org/docs/motivation/網絡
https://help.aliyun.com/document_detail/52577.html?spm=5176.7946988.881668.1.754942betpCaPZ數據結構
二.RocketMQ 是什麼?
1.是一個隊列模型的消息中間件,具備高性能、高可靠、高實時、分佈式特色。
2.Producer、Consumer隊列均可以分佈式。
3.Producer向一些隊列輪流發送消息,隊列集合稱爲 Topic,Consumer 若是作廣播消費,則一個consumer實例消費這個Topic 對應的全部隊列,若是作集羣消費,則多個Consumer 實例平均消費這個topic對應的隊列集合。(默認是集羣消費)
4.可以保證嚴格的消息順序(由於性能緣由,不能保證消息不重複,由於總有網絡不可達的狀況發生,需業務端保證)。
5.提供豐富的消息拉取模式
6.高效的訂閱者水平擴展能力
7.實時的消息訂閱機制
8.億級消息堆積能力
9.較少的依賴
三.RocketMQ的基本概念
它是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。(2.X版本以前rocketMQ使用zookeeper作topic路由管理)。Name Server 是專爲 RocketMQ設計的輕量級名稱服務,代碼小於1000行,具備簡單、可集羣橫吐擴展、無狀態等特色。將要支持的主備自動切換功能會強依賴 Name Server。
Broker 部署相對複雜,Broker分爲Master與Slave,一個Master能夠對應多個Slave,可是一個Slave只能對應一個Master,Master與Slave的對應關係經過指定相同的BrokerName,不一樣的BrokerId來定義,BrokerId爲0表示Master,非0表示Slave。Master也能夠部署多個。每一個Broker與Name Server 集羣中的全部節點創建長鏈接,定時註冊Topic信息到全部Name Server。
Producer 與Name Server集羣中的其中一個節點(隨機選擇,但不一樣於上一次)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master創建長鏈接,且定時向Master發送心跳。Producer徹底無狀態,可集羣部署。
Consumer與Name Server集羣中的其中一個節點(隨機選擇,但不一樣於上一次)創建長鏈接,按期從Name Server 取Topic路由信息,並向提供Topic服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer既能夠從Master訂閱消息,也能夠從Slave訂閱消息,訂閱規則由Broker配置決定(目前版本沒有找到可配置的地方,能夠在原碼裏修改)。
5.Producer Group
用來表示一個収送消息應用,一個Producer Group下包含多個Producer實例,能夠是多臺機器,也能夠是一臺機器的多個迕程,或者一個進程的多個Producer對象。一個Producer Group能夠發送多個Topic消息,Producer Group做用以下:
1.標識一類 Producer
2.能夠經過運維工具查詢這個發送消息應用下有多個Producer實例
3.發送分佈式事務消息時,若是 Producer中途意外宕機,Broker會主動回調 Producer Group內的任意一臺機器來確認事務狀態。
用來表示一個消費消息應用,一個Consumer Group下包含多個Consumer實例,能夠是多臺機器,也能夠是多個進程,或者是一個進程的多個Consumer對象。一個Consumer Group下的多個Consumer以均攤方式消費消息,若是設置爲廣播方式,那麼這個 Consumer Group下的每一個實例都消費全量數據。
三.生產消息
producer配置
發送消息注意事項
1.一個應用盡量用一個Topic,消息子類型用tags來標識,tags能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用tags在broker作消息過濾。 message.setTags("TagA");
2.每一個消息在業務局面的惟一標識碼,要設置到keys字段,方便未來定位消息丟失問題。服務器會爲每一個消息建立索引(哈希索引),應用能夠經過topic,key來查詢這條消息內 容,以及消息被誰消費。因爲是哈希索引,請務必保證key儘量惟一,這樣能夠避免潛在的哈希衝突。 //訂單Id String orderId = "20034568923546"; message.setKeys(orderId);
3.消息發送成功或者失敗,要打印消息日誌,務必要打印sendresult和key字段。
4.send消息方法,只要不拋異常,就表明發送成功。可是發送成功會有多個狀態,在sendResult裏定義。
SEND_OK 消息發送成功
FLUSH_DISK_TIMEOUT 消息發送成功,可是服務器刷盤超時,消息已經迕入服務器隊列,只有此時服務器宕機,消息纔會丟失
FLUSH_SLAVE_TIMEOUT 消息發送成功,可是服務器同步到 Slave時超時,消息已經迕入服務器隊列,只有此時服務器宕機,消息纔會丟失
SLAVE_NOT_AVAILABLE 消息發送成功,可是此時 slave 不可用,消息已經迕入服務器隊列,只有此時服務器宕機,消息纔會丟失
對於精確發送順序消息的應用,因爲順序消息的侷限性,可能會涉及到主備自動切換問題,因此若是sendresult中的status字段不等於SEND_OK,就應該嘗試重試。對於其餘應用,則沒有必要返樣。
5. 對於消息不可丟失應用,務必要有消息重發機制,例如若是消息發送失敗,存儲到數據庫,能有定時程序嘗試重發,或者人工觸發重發。
消息發送失敗如何處理
Producer的send方法自己支持內部重試,重試邏輯以下:
1.至多重試 3 次。
2.若是發送失敗,則輪轉到下一個 Broker。
3.這個方法的總耗時時間不超過 sendMsgTimeout設置的值,默認10s。因此,若是自己向broker發送消息產生超時異常,就不會再作重試。
以上策略仍然不能保證消息必定發送成功,爲保證消息必定成功,建議應用這樣作:
若是調用send同步方法發送失敗,則嘗試將消息存儲到db,由後臺線程定時重試,保證消息必定到達Broker。
上述 db 重試方式爲何沒有集成到 MQ客戶端內部作,而是要求應用本身去完成,阿里主要是基於如下幾點考慮:
1.MQ的客戶端設計爲無狀態模式,方便任意的水平擴展,且對機器資源的消耗僅僅是cpu、內存、網絡。
2.若是MQ客戶端內部集成一個KV存儲模塊,那麼數據只有同步落盤才能較可靠,而同步落盤自己性能開銷較大,因此一般會採用異步落盤,又因爲應用關閉過程不受MQ運維人員控制,可能常常會發生kill -9 這樣暴力方式關閉,形成數據沒有及時落盤而丟失。
3.Producer所在機器的可靠性較低,通常爲虛擬機,不適合存儲重要數據。
綜上,建議重試過程交由應用來控制。
四.消費消息
消費過程要作到冪等(即消費端去重)
RocketMQ 沒法避免消息重複,因此若是業務對消費重複很是敏感,務必要在業務局面去重,有如下幾種去重方式:
1.將消息的惟一鍵,能夠是 msgId,也能夠是消息內容中的惟一標識字段,例如訂單Id等,消費以前判斷是否在 Db或Tair(全局KV存儲)中存在,若是不存在則插入入,並消費,不然跳過。(實際過程要考慮原子性問題,判斷是否存在能夠嘗試插入,若是報主鍵衝突,則插入失敗,直接跳過) msgId必定是全句惟一標識符,可是可能會存在一樣的消息有兩個不一樣 msgId的狀況(有多種緣由),這種狀況可能會使業務上重複消費,建議最好使用消息內容中的惟一標識字段去重。
2.使用業務局面的狀態機去重 。
RocketMQ的Consumer都是從Broker拉消息來消費,可是爲了能作到實時收消息,RocketMQ使用長輪詢方式,能夠保證消息實時性同Push方式一致。'
push consumer配置
pull consumer配置
message數據結構
針對producer
在Producer端,使用com.alibaba.rocketmq.common.message.Message這個數據結構,因爲Broker會爲Message增長數據結構,因此消息到達Consumer後,會在Message基礎之上增長多個字段,Consumer看到的是com.alibaba.rocketmq.common.message.MessageExt返個數據結構,MessageExt繼承於Message。
批量方式消費
某些業務流程若是支持批量方式消費,則能夠很大程度上提升消費吞吏量,例如訂單扣款類應用,一次處理一個訂單耗時1秒鐘,一次處理10個訂單可能也只耗時2秒鐘,這樣便可大幅度提升消費的吞吐量,經過設置consumer的consumeMessageBatchMaxSize這個參數,默認是1,即一次只消費一條消息,例如設置爲N,那麼每次消費的 消息數小於等於N。
五.RocketMQ 存儲特色
RocketMQ 參考了Kafka的持久化方式,充分利用Linux文件系統內存cache來提升性能。
Consumer消費消息過程,使用了零拷貝,零拷貝包含如下兩種方式 :
1.使用 mmap + write 方式 優勢:即便頻繁調用,使用小塊文件傳輸,效率也很高缺點:不能很好的利用DMA方式,會比sendfile多消耗CPU,內存安全性控制複雜,須要避免JVM Crash問題。
2.使用 sendfile 方式優勢:能夠利用DMA方式,消耗CPU較少,大塊文件傳輸效率高,無內存安全新問題。 缺點:小塊文件效率低於mmap方式,只能是BIO方式傳輸,不能使用NIO。
RocketMQ 選擇了第一種方式,mmap+write方式,由於有小塊數據傳輸的需求,效果會比sendfile更好。
數據存儲結構
RocketMQ的全部消息都是持久化的,先寫入系統PAGECACHE,而後刷盤,能夠保證內存不磁盤都有一份數據,訪問時,直接從內存讀取。
集羣管理夠工具
http://192.168.102.35:8080/rocketmq-console/cluster/list.do
參考文檔:
rocketmq開發指南-v3.2.4
RocketMQ運維指令
--------------------- 本文來自 SIMILAR_ZHANG 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/qq_32711825/article/details/78579864?utm_source=copy