百度百科上關於「消息隊列」的定義是這樣的redis
「消息隊列(Message Queue)」是在消息的傳輸過程當中保存消息的容器。算法
消息隊列(Message Queue
,簡稱MQ),指保存消息的一個容器,本質是個隊列。數據庫
不論是什麼類型的消息隊列,消息隊列的本質都是:一發一存一消費緩存
生產者先將消息投遞一個叫作「隊列」的容器中,而後再從這個容器中取出消息,最後再轉發給消費者,僅此而已。服務器
消息隊列最原始的模型包含了兩個關鍵詞:消息和隊列。markdown
一、消息:就是要傳輸的數據,能夠是最簡單的文本字符串,也能夠是自定義的複雜格式(只要能按預約格式解析出來便可)。網絡
二、隊列:你們應該再熟悉不過了,是一種先進先出數據結構。它是存放消息的容器,消息從隊尾入隊,從隊頭出隊,入隊即發消息的過程,出隊即收消息的過程。數據結構
你們比較熟知的消息隊列引用場景有:架構
除此以外,還有延遲通知、最終一致性保證、順序消息、流式處理等等。負載均衡
使用消息隊列還能夠:
屏蔽異構平臺的細節 發送方、接收方系統之間不須要了解雙方,只需認識消息。
複用 一次發送屢次消費。
可靠一次保證消息的傳遞。若是發送消息時接收者不可用,消息隊列會保留消息,直到成功地傳遞它。
提供路由發送者無需與接收者創建鏈接,雙方經過消息隊列保證消息可以從發送者路由到接收者,甚至對於原本網絡不易互通的兩個服務,也能夠提供消息路由。
舉個實際的例子說明一下,用戶參加一個優惠券活動,須要輸入活動推廣碼,而後系統須要給用戶發放優惠券和發送短信、郵件等通知用戶參加活動成功。
解耦:使用消息隊列後用戶輸入推廣碼後將事件發送到消息隊列,以後直接記錄下用戶使用推廣碼的記錄便可,發券和發送觸達則有相應的系統去消息隊列獲取事件進行處理便可,減小了系統依賴,實現了系統解耦。
異步: 改造後至關於發券和發觸達這些後續步驟所有變成了異步執行,能減小輸入推廣碼登記的時間,提高了系統的吞吐量。
削峯:消息隊列轉儲消息,能夠做爲「漏斗」進行限流保護。
高可用 項目中使用消息隊列,都是得集羣/分佈式的。
數據丟失 消息隊列中的數據須要存在別的地方,這樣才儘量減小數據的丟失
數據一致性 使用分佈式事務,把全部關聯操做放在一個事務裏
重複消費 保證消息消費的冪等性
順序消費
RocketMQ的topic
內的隊列機制,能夠保證存儲知足FIFO
(First Input First Output 先進先出),剩下的只須要消費者順序消費便可。RocketMQ僅保證順序發送,順序消費由消費者業務保證。
寫入一個 partition中的數據必定是有順序的。
生產者在寫的時候,能夠指定一個 key,好比訂單id做爲key,那麼訂單相關的數據,必定會被分發到一個 partition中,此時這個 partition中的數據必定是有順序的。Kafka 中一個 partition 只能被一個消費者消費。消費者從partition中取出數據的時候 ,必定是有順序的。
消息丟失可能發生在生產者發送消息、MQ自己丟失消息、消費者丟失消息3個方面。
生產者發送消息 JOB輪詢超過必定時間(時間根據業務配置)還未發送成功的消息去重試,在監控平臺配置或者JOB程序處理超過必定次數一直髮送不成功的消息,告警,人工介入。
MQ丟失
RocketMQ分爲同步刷盤和異步刷盤兩種方式,默認的是異步刷盤,就有可能致使消息還未刷到硬盤上就丟失了。能夠經過設置爲同步刷盤的方式來保證消息可靠性,這樣即便MQ掛了,恢復的時候也能夠從磁盤中去恢復消息。當咱們選擇同步刷盤以後,若是刷盤超時會給返回FLUSH_DISK_TIMEOUT
。
Kafka也能夠經過配置作到:acks=all 只有參與複製的全部節點所有收到消息,才返回生產者成功。除非全部的節點都掛了,消息纔會丟失。
ack
確認,而kafka須要手動開啓配置關閉自動offset
。消費方不返回ack確認,重發的機制根據MQ類型的不一樣發送時間間隔、次數都不盡相同,若是重試超過次數以後會進入死信隊列,須要手工來處理了。(Kafka沒有這些)所謂消息冪等就是當出現消費者對某條消息重複消費的狀況時,重複消費的結果與消費一次的結果是相同的,而且屢次消費並未對業務系統產生任何負面影響。
思路:
經常使用的業務冪等性保證方法
利用數據庫的惟一約束實現冪等:好比將訂單表中的訂單編號設置爲惟一索引,建立訂單時,根據訂單編號就能夠保證冪等
去重表:本質也是根據數據庫的惟一性約束來實現。思路是:首先在去重表上建惟一索引,其次操做時把業務表和去重表放在同個本地事務中,若是出現重現重複消費,數據庫會拋惟一約束異常,操做就會回滾
利用redis的原子性:每次操做都直接set到redis裏面,而後將redis數據定時同步到數據庫中
多版本(樂觀鎖)控制:此方案多用於更新的場景下。其實現的大致思路是:給業務數據增長一個版本號屬性,每次更新數據前,比較當前數據的版本號是否和消息中的版本一致,若是不一致則拒絕更新數據,更新數據的同時將版本號+1
狀態機機制:此方案多用於更新且業務場景存在多種狀態流轉的場景
token機制:生產者發送每條數據的時候,增長一個全局惟一的id,這個id一般是業務的惟一標識,好比訂單編號。在消費端消費時,則驗證該id是否被消費過,若是還沒消費過,則進行業務處理。處理結束後,在把該id存入redis,同時設置狀態爲已消費。若是已經消費過了,則不進行處理。
目前在市面上比較主流的消息隊列中間件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ
等這幾種。
RocketMQ是阿里開源的,定位是非日誌的可靠消息傳輸。例如:訂單,交易,充值,流計算,消息推送,日誌流式處理,binglog分發等。
Kafka是世界範圍級別的消息隊列標杆,定位是系統間的數據流管道,實時數據處理。例如:常規的消息系統、網站活性跟蹤,監控數據,日誌收集、處理等。
RocketMQ由NameServer
註冊中心集羣、Producer
生產者集羣、Consumer
消費者集羣和若干Broker
(RocketMQ進程)組成,它的架構原理是這樣的:
RocketMQ 底層用了 DLedger
,用 Raft 同步日誌,從原理上保證了不會腦裂。 經過broker主從機制實現了高可用。
由於使用了順序存儲、Page Cache
和異步刷盤。
RocketMQ延遲隊列的核心思路是:
全部的延遲消息由producer發出以後,都會存放到同一個topic
(SCHEDULE_TOPIC_XXXX)下,不一樣的延遲級別會對應不一樣的隊列序號。
當延遲時間到以後,由定時線程讀取轉換爲普通的消息存的真實指定的topic下,此時對於consumer端此消息纔可見,從而被consumer消費。
注意:RocketMQ不支持任意時間的延時,只支持如下幾個固定的延時等級
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h >2h";