關於「 消息隊列」你應該知道的事

消息隊列是什麼

百度百科上關於「消息隊列」的定義是這樣的redis

「消息隊列(Message Queue)」是在消息的傳輸過程當中保存消息的容器。算法

消息隊列(Message Queue,簡稱MQ),指保存消息的一個容器,本質是個隊列。數據庫

不論是什麼類型的消息隊列,消息隊列的本質都是:一發一存一消費緩存

生產者先將消息投遞一個叫作「隊列」的容器中,而後再從這個容器中取出消息,最後再轉發給消費者,僅此而已。服務器

image.png

消息隊列最原始的模型包含了兩個關鍵詞:消息和隊列markdown

一、消息:就是要傳輸的數據,能夠是最簡單的文本字符串,也能夠是自定義的複雜格式(只要能按預約格式解析出來便可)。網絡

二、隊列:你們應該再熟悉不過了,是一種先進先出數據結構。它是存放消息的容器,消息從隊尾入隊,從隊頭出隊,入隊即發消息的過程,出隊即收消息的過程。數據結構

爲何使用消息隊列

你們比較熟知的消息隊列引用場景有:架構

  • 系統解耦
  • 異步通訊
  • 流量削峯

除此以外,還有延遲通知、最終一致性保證、順序消息、流式處理等等。負載均衡

使用消息隊列還能夠:

屏蔽異構平臺的細節 發送方、接收方系統之間不須要了解雙方,只需認識消息。

複用 一次發送屢次消費。

可靠一次保證消息的傳遞。若是發送消息時接收者不可用,消息隊列會保留消息,直到成功地傳遞它。

提供路由發送者無需與接收者創建鏈接,雙方經過消息隊列保證消息可以從發送者路由到接收者,甚至對於原本網絡不易互通的兩個服務,也能夠提供消息路由。

舉個實際的例子說明一下,用戶參加一個優惠券活動,須要輸入活動推廣碼,而後系統須要給用戶發放優惠券和發送短信、郵件等通知用戶參加活動成功。

解耦:使用消息隊列後用戶輸入推廣碼後將事件發送到消息隊列,以後直接記錄下用戶使用推廣碼的記錄便可,發券和發送觸達則有相應的系統去消息隊列獲取事件進行處理便可,減小了系統依賴,實現了系統解耦。

異步: 改造後至關於發券和發觸達這些後續步驟所有變成了異步執行,能減小輸入推廣碼登記的時間,提高了系統的吞吐量。

削峯:消息隊列轉儲消息,能夠做爲「漏斗」進行限流保護。

使用消息隊列須要注意的問題

  • 高可用 項目中使用消息隊列,都是得集羣/分佈式的。

  • 數據丟失 消息隊列中的數據須要存在別的地方,這樣才儘量減小數據的丟失

  • 數據一致性 使用分佈式事務,把全部關聯操做放在一個事務裏

  • 重複消費 保證消息消費的冪等性

  • 順序消費

如何保證消息的順序性?

RocketMQ的topic內的隊列機制,能夠保證存儲知足FIFO(First Input First Output 先進先出),剩下的只須要消費者順序消費便可。RocketMQ僅保證順序發送,順序消費由消費者業務保證。

Kafka 保證消息順序性

image.png

寫入一個 partition中的數據必定是有順序的。

生產者在寫的時候,能夠指定一個 key,好比訂單id做爲key,那麼訂單相關的數據,必定會被分發到一個 partition中,此時這個 partition中的數據必定是有順序的。Kafka 中一個 partition 只能被一個消費者消費。消費者從partition中取出數據的時候 ,必定是有順序的。

消息可靠性怎麼保證?

消息丟失可能發生在生產者發送消息MQ自己丟失消息消費者丟失消息3個方面。

  • 生產者發送消息 JOB輪詢超過必定時間(時間根據業務配置)還未發送成功的消息去重試,在監控平臺配置或者JOB程序處理超過必定次數一直髮送不成功的消息,告警,人工介入。

  • MQ丟失

RocketMQ分爲同步刷盤異步刷盤兩種方式,默認的是異步刷盤,就有可能致使消息還未刷到硬盤上就丟失了。能夠經過設置爲同步刷盤的方式來保證消息可靠性,這樣即便MQ掛了,恢復的時候也能夠從磁盤中去恢復消息。當咱們選擇同步刷盤以後,若是刷盤超時會給返回FLUSH_DISK_TIMEOUT

Kafka也能夠經過配置作到:acks=all 只有參與複製的全部節點所有收到消息,才返回生產者成功。除非全部的節點都掛了,消息纔會丟失。

  • 消費者丟失 RocketMQ默認是須要消費者回復ack確認,而kafka須要手動開啓配置關閉自動offset。消費方不返回ack確認,重發的機制根據MQ類型的不一樣發送時間間隔、次數都不盡相同,若是重試超過次數以後會進入死信隊列,須要手工來處理了。(Kafka沒有這些)

image.png

保證 MQ 重複消費冪等性

image.png

所謂消息冪等就是當出現消費者對某條消息重複消費的狀況時,重複消費的結果與消費一次的結果是相同的,而且屢次消費並未對業務系統產生任何負面影響。

思路:

  • 拿數據要寫庫,首先檢查下主鍵,若是有數據,則不插入,進行一次update
  • 若是是寫 redis,就沒問題,反正每次都是 set ,自然冪等性
  • 生產者發送消息的時候帶上一個全局惟一的id,消費者拿到消息後,先根據這個id去 redis裏查一下,以前有沒消費過,沒有消費過就處理,而且寫入這個 id 到 redis,若是消費過了,則不處理。
  • 基於數據庫的惟一鍵
經常使用的業務冪等性保證方法
  • 利用數據庫的惟一約束實現冪等:好比將訂單表中的訂單編號設置爲惟一索引,建立訂單時,根據訂單編號就能夠保證冪等

  • 去重表:本質也是根據數據庫的惟一性約束來實現。思路是:首先在去重表上建惟一索引,其次操做時把業務表和去重表放在同個本地事務中,若是出現重現重複消費,數據庫會拋惟一約束異常,操做就會回滾

  • 利用redis的原子性:每次操做都直接set到redis裏面,而後將redis數據定時同步到數據庫中

  • 多版本(樂觀鎖)控制:此方案多用於更新的場景下。其實現的大致思路是:給業務數據增長一個版本號屬性,每次更新數據前,比較當前數據的版本號是否和消息中的版本一致,若是不一致則拒絕更新數據,更新數據的同時將版本號+1

  • 狀態機機制:此方案多用於更新且業務場景存在多種狀態流轉的場景

  • token機制:生產者發送每條數據的時候,增長一個全局惟一的id,這個id一般是業務的惟一標識,好比訂單編號。在消費端消費時,則驗證該id是否被消費過,若是還沒消費過,則進行業務處理。處理結束後,在把該id存入redis,同時設置狀態爲已消費。若是已經消費過了,則不進行處理。

消息隊列如何選型

目前在市面上比較主流的消息隊列中間件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等這幾種。

image.png

RocketMQ是阿里開源的,定位是非日誌的可靠消息傳輸。例如:訂單,交易,充值,流計算,消息推送,日誌流式處理,binglog分發等。

Kafka是世界範圍級別的消息隊列標杆,定位是系統間的數據流管道,實時數據處理。例如:常規的消息系統、網站活性跟蹤,監控數據,日誌收集、處理等。

RocketMQ

RocketMQ實現原理

RocketMQ由NameServer註冊中心集羣、Producer生產者集羣、Consumer消費者集羣和若干Broker(RocketMQ進程)組成,它的架構原理是這樣的:

  1. Broker在啓動的時候去向全部的NameServer註冊,並保持長鏈接,每30s發送一次心跳
  2. Producer在發送消息的時候從NameServer獲取Broker服務器地址,根據負載均衡算法選擇一臺服務器來發送消息
  3. Conusmer消費消息的時候一樣從NameServer獲取Broker地址,而後主動拉取消息來消費

image.png

Borker的Master和Slave之間是怎麼同步數據的呢?

RocketMQ 底層用了 DLedger,用 Raft 同步日誌,從原理上保證了不會腦裂。 經過broker主從機制實現了高可用

  • 在Master broker收到消息後,會被標記爲uncommitted狀態,而後會把消息發送給全部的slave
  • slave在收到消息以後返回ack響應給master
  • master在收到超過半數的ack以後,把消息標記爲committed
  • 發送committed消息給全部slave,slave也修改狀態爲committed
RocketMQ爲何速度快?

由於使用了順序存儲、Page Cache和異步刷盤。

  • 咱們在寫入commitlog的時候是順序寫入的,這樣比隨機寫入的性能就會提升不少
  • 寫入commitlog的時候並非直接寫入磁盤,而是先寫入操做系統的PageCache
  • 最後由操做系統異步將緩存中的數據刷到磁盤
RocketMQ延遲隊列怎麼實現?

RocketMQ延遲隊列的核心思路是:

全部的延遲消息由producer發出以後,都會存放到同一個topic(SCHEDULE_TOPIC_XXXX)下,不一樣的延遲級別會對應不一樣的隊列序號。

當延遲時間到以後,由定時線程讀取轉換爲普通的消息存的真實指定的topic下,此時對於consumer端此消息纔可見,從而被consumer消費。

注意:RocketMQ不支持任意時間的延時,只支持如下幾個固定的延時等級

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h >2h";

參考資料

baike.baidu.com/item/%E6%B6…

www.zhihu.com/question/54…

juejin.cn/post/684490…

juejin.cn/post/684490…

www.zhihu.com/question/54…

www.zhihu.com/question/54…

zhuanlan.zhihu.com/p/138624006

www.zhihu.com/question/65…

zhuanlan.zhihu.com/p/142441996

相關文章
相關標籤/搜索