RocketMQ 經過本身的方式解決了消息順序性的問題:
RocketMQ經過輪詢全部隊列的方式來肯定消息被髮送到哪個隊列(負載均衡策略)。根據不一樣業務,能夠將業務ID做爲計算隊列,讓業務ID相同的消息前後發送到同一個隊列中,在獲取到路由信息之後,會根據MessageQueueSelector實現的算法來選擇一個隊列,同一個OrderId獲取到的確定是同一個隊列。java
在網絡不穩定的狀況下,RocketMQ會出現消息重複的問題:redis
發送時消息重複算法
- 當一條消息已被成功發送到服務端並完成持久化,此時出現了網絡閃斷或者客戶端宕機,致使服務端對客戶端應答失敗。 若是此時生產者意識到消息發送失敗並嘗試再次發送消息,消費者後續會收到兩條內容相同而且 Message ID 也相同的消息。
投遞時消息重複緩存
- 消息消費的場景下,消息已投遞到消費者並完成業務處理,當客戶端給服務端反饋應答的時候網絡閃斷。 爲了保證消息至少被消費一次,消息隊列 RocketMQ 的服務端將在網絡恢復後再次嘗試投遞以前已被處理過的消息,消費者後續會收到兩條內容相同而且 Message ID 也相同的消息。
負載均衡時消息重複服務器
- 當消息隊列 RocketMQ 的 Broker 或客戶端重啓、擴容或縮容時,會觸發 Rebalance,此時消費者可能會收到重複消息。
形成消息重複的根本緣由是:網絡不可達。只要經過網絡交換數據,就沒法避免這個問題。 因此解決這個問題的辦法就是繞過這個問題。那麼問題就變成了:若是消費端收到兩條同樣的消息,應該怎樣處理?網絡
第1條很好理解,只要保持冪等性,無論來多少條重複消息,最後處理的結果都同樣。
第2條原理就是利用一張日誌表來記錄已經處理成功的消息的ID,若是新到的消息ID已經在日誌表中,那麼就再也不處理這條消息。
第1條解決方案,很明顯應該在消費端實現,不屬於消息系統要實現的功能。
第2條能夠消息系統實現,也能夠業務端實現。正常狀況下出現重複消息的機率其實很小,若是由消息系統來實現的話,確定會對消息系統的吞吐量和高可用有影響,因此最好仍是由業務端本身處理消息重複的問題,這也是RocketMQ不解決消息重複的問題的緣由。
負載均衡
RocketMQ不保證消息不重複,若是你的業務須要保證嚴格的不重複消息,須要你本身在業務端去重。異步
那業務端如何去重呢?原理很簡單,步驟以下:分佈式
事務消息:消息隊列 RocketMQ 提供相似 X/Open XA 的分佈事務功能,經過消息隊列 RocketMQ 事務消息能達到分佈式事務的最終一致。 半消息:暫不能投遞的消息,發送方已經將消息成功發送到了消息隊列 RocketMQ 服務端,可是服務端未收到生產者對該消息的二次確認,此時該消息被標記成「暫不能投遞」狀態,處於該種狀態下的消息即半消息。 消息回查:因爲網絡閃斷、生產者應用重啓等緣由,致使某條事務消息的二次確認丟失,消息隊列 RocketMQ 服務端經過掃描發現某條消息長期處於「半消息」時,須要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該過程即消息回查。函數
事務消息的適用場景示例: 經過購物車進行下單的流程中,用戶入口在購物車系統,交易下單入口在交易系統,兩個系統之間的數據須要保持最終一致,這時能夠經過事務消息進行處理。交易系統下單以後,發送一條交易下單的消息到消息隊列 RocketMQ,購物車系統訂閱消息隊列 RocketMQ 的交易下單消息,作相應的業務處理,更新購物車數據。
交互流程
消息隊列 RocketMQ 事務消息交互流程以下所示: 事務消息
說明:事務消息發送對應步驟 一、二、三、4,事務消息回查對應步驟 五、六、7。
Message message = new Message();
// 在消息屬性中添加第一次消息回查的最快時間,單位秒。例如,如下設置實際第一次回查時間爲 120 秒 ~ 125 秒之間
message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
// 以上方式只肯定事務消息的第一次回查的最快時間,實際回查時間向後浮動0~5秒;如第一次回查後事務仍未提交,後續每隔5秒回查一次。
複製代碼
同步發送是指消息發送方發出數據後,會在收到接收方發回響應以後才發下一個數據包的通信方式。
異步發送是指發送方發出數據後,不等接收方發回響應,接着發送下個數據包的通信方式。 MQ 的異步發送,須要用戶實現異步發送回調接口(SendCallback)。消息發送方在發送了一條消息後,不須要等待服務器響應便可返回,進行第二條消息發送。發送方經過回調接口接收服務器響應,並對響應結果進行處理。
單向(Oneway)發送特色爲發送方只負責發送消息,不等待服務器迴應且沒有回調函數觸發,即只發送請求不等待應答。 此方式發送消息的過程耗時很是短,通常在微秒級別。
RocketMQ採起長輪詢+PULL模式保證消息的持久性
消息隊列 RocketMQ 支持如下兩種訂閱方式:
集羣訂閱:同一個 Group ID 所標識的全部 Consumer 平均分攤消費消息。 例如某個 Topic 有 9 條消息,一個 Group ID 有 3 個 Consumer 實例,那麼在集羣消費模式下每一個實例平均分攤,只消費其中的 3 條消息。
// 集羣訂閱方式設置(不設置的狀況下,默認爲集羣訂閱方式)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
複製代碼
廣播訂閱:同一個 Group ID 所標識的全部 Consumer 都會各自消費某條消息一次。 例如某個 Topic 有 9 條消息,一個 Group ID 有 3 個 Consumer 實例,那麼在廣播消費模式下每一個實例都會各自消費 9 條消息。
// 廣播訂閱方式設置
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
複製代碼
- 定時消息:Producer 將消息發送到消息隊列 RocketMQ 服務端,但並不指望這條消息立馬投遞,而是推遲到在當前時間點以後的某一個時間投遞到 Consumer 進行消費,該消息即定時消息。
- 延時消息:Producer 將消息發送到消息隊列 RocketMQ 服務端,但並不指望這條消息立馬投遞,而是延遲必定時間後才投遞到 Consumer 進行消費,該消息即延時消息。
定時消息與延時消息在代碼配置上存在一些差別,可是最終達到的效果相同:消息在發送到消息隊列 RocketMQ 服務端後並不會立馬投遞,而是根據消息中的屬性延遲固定時間後才投遞給消費者。
定時消息和延時消息適用於如下一些場景: