消息隊列連環炮
項目裏怎麼樣使用 MQ 的?mysql
爲何要使用消息隊列?面試
消息隊列有什麼優勢和缺點?redis
kafka,activemq,rabbitmq,rocketmq 都有什麼去唄?sql
如何保證消息隊列高可用?數據庫
如何保證消息不被重複消費?緩存
如何保證消息的可靠性傳輸?性能優化
如何保證消息的順序性?微信
寫一個消息隊列架構設計?網絡
消息隊列技術選型
解決的問題:多線程
解耦
異步
削峯
不用 MQ 系統耦合場景
A 系統產生了一個比較關鍵的數據,不少系統須要 A 系統將數據發過來,強耦合(B,C,D,E 系統可能參數不同、一會須要一會不須要數據,A 系統要不斷修改代碼維護)
A 系統還要考慮 B、C、D、E 系統是否掛了,是否訪問超時?是否重試?
使用 MQ 系統解耦場景
維護這個代碼,不須要考慮人家是否調用成功,失敗超時
若是新系統須要數據,直接從 MQ 裏消費便可,若是某個系統不須要這條數據就取消對 MQ 消息的消費便可。
總結:經過一個 MQ 的發佈訂閱消息模型(Pub/Sub), 系統 A 跟其餘系統就完全解耦了。
不用 MQ 同步高延遲請求場景
通常互聯網類的企業,對用戶的直接操做,通常要求每一個請求都必須在 200ms之內,對用戶幾乎是無感知的。
使用 MQ 進行異步化以後的接口性能優化
提升高延時接口
沒有用 MQ 時高峯期系統被打死的場景
高峯期每秒 5000 個請求,每秒對 MySQL 執行 5000 條 SQL(通常MySQL每秒 2000 個請求差很少了),若是MySQL被打死,而後整個系統就崩潰,用戶就沒辦法使用系統了。可是高峯期過了以後,每秒鐘可能就 50 個請求,對整個系統沒有任何壓力。
使用 MQ 進行削峯的場景
5000 個請求寫入到 MQ 裏面,系統 A 每秒鐘最多隻能處理 2000 個請求(MySQL 每秒鐘最多處理 2000 個請求),系統 A 從 MQ 裏慢慢拉取請求,每秒鐘拉取 2000 個請求。MQ,每秒鐘 5000 個請求進來,結果只有 2000 個請求出去,結果致使在高峯期(21小時),可能有幾十萬甚至幾百萬的請求積壓在 MQ 中,這個是正常的,由於過了高峯期以後,每秒鐘就 50 個請求,可是系統 A 仍是會按照每秒 2000 個該請求的速度去處理。只要高峯期一過,系統 A 就會快速的將積壓的消息給解決掉。
算一筆帳,每秒積壓在 MQ 裏消息有 3000 條,一分鐘就會積壓 18W 條消息,一個小時就會積壓 1000 萬條消息。等高峯期一過,差很少須要 1 個多小時就能夠把 1000W 條積壓的消息給處理掉
架構中引入 MQ 後存在的問題
系統可用性下降
MQ 可能掛掉,致使整個系統崩潰
系統複雜性變高
可能發重複消息,致使插入重複數據;消息丟了;消息順序亂了;系統 B,C,D 掛了,致使 MQ 消息積累,磁盤滿了;
一致性問題
原本應該A,B,C,D 都執行成功了再返回,結果A,B,C 執行成功 D 失敗
Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什麼優缺點
建議:中小型公司 RabbitMQ 大公司:RocketMQ 大數據實時計算:Kafka
消息隊列高可用
RabbtitMQ 高可用
RabbitMQ有三種模式:單機模式 、普通集羣模式、鏡像集羣模式
單機模式
demo級
普通集羣模式(非高可用)
隊列的元數據存在於多個實例中,可是消息不存在多個實例中,每次多臺機器上啓動多個 rabbitmq 實例,每一個機器啓動一個。
優勢:能夠多個機器消費消息,能夠提升消費的吞吐量
缺點:可能會在 rabbitmq 內部產生大量的數據傳輸 ;可用性基本沒保障,queue 所在機器宕機,就沒辦法消費了
沒有高可用性可言
鏡像集羣模式(高可用,非分佈式)
隊列的元數據和消息都會存在於多個實例中,每次寫消息到 queue的時候,都會自動把消息到多個實例的 queue 裏進行消息同步。也就 是每一個節點上都有這個 queue 的一個完整鏡像(這個 queue的所有數據)。任何一個節點宕機了,其餘節點還包含這個 queue的完整數據,其餘 consumer 均可以到其餘活着的節點上去消費數據都是 OK 的。
缺點:不是分佈式的,若是這個 queue的數據量很大,大到這個機器上的容量沒法容納 。
開啓鏡像集羣模式方法:管理控制檯,Admin頁面下,新增一個鏡像集羣模式的策略,指定的時候能夠要求數據同步到全部節點,也能夠要求同步到指定數量的節點,而後你再次建立 queue 的時候 ,應用這個策略,就 會自動將數據同步到其餘的節點上去。
Kafka 高可用架構
broker進程就是kafka在每臺機器上啓動的本身的一個進程。每臺機器+機器上的broker進程,就能夠認爲是 kafka集羣中的一個節點。
你建立一個 topic,這個topic能夠劃分爲多個 partition,每一個 partition 能夠存在於不一樣的 broker 上,每一個 partition就存放一部分數據。
這就是自然的分佈式消息隊列,也就是說一個 topic的數據,是分散放在 多個機器上的,每一個機器就放一部分數據。
分佈式的真正含義是每一個節點只放一部分數據,而不是完整數據(完整數據就是HA、集羣機制)
Kafka 0.8版本以前是沒有 HA 機制的,任何一個 broker 宕機了,那麼就缺失一部分數據。
Kafka 0.8之後,提供了 HA 機制,就是 replica 副本機制。
每一個 partition的數據都會同步到其餘機器上,造成本身的多個 replica 副本。而後全部 replica 會選舉一個 leader。那麼生產者、消費者都會和這個 leader 打交道,而後其餘 replica 就是 follow。寫的時候,leader 負責把數據同步到全部 follower上去,讀的時候就直接讀 leader 上的數據便可。
若是某個 broker宕機了,恰好也是 partition的leader,那麼此時會選舉一個新的 leader出來,你們繼續讀寫那個新的 leader便可,這個就 是所謂的高可用性。更多面試題:面試題內容聚合
leader和follower的同步機制:
寫數據的時候,生產者就寫 leader,而後 leader將數據落地寫本地磁盤,接着其餘 follower 本身主動從 leader來pull數據。一旦全部 follower同步好數據了,就會發送 ack給 leader,leader收到全部 follower的 ack以後,就會返回寫成功的消息給生產者。
消費的時候,只會從 leader去讀,可是隻有一個消息已經被全部 follower都同步成功返回 ack的時候,這個消息纔會被消費者讀到。
消息隊列重複數據
MQ 只能保證消息不丟,不能保證重複發送
Kafka 消費端可能出現的重複消費問題
每條消息都有一個 offset 表明 了這個消息的順序的序號,按照數據進入 kafka的順序,kafka會給每條數據分配一個 offset,表明了這個是數據的序號,消費者從 kafka去消費的時候,按照這個順序去消費,消費者會去提交 offset,就是告訴 kafka已經消費到 offset=153這條數據了 ;zk裏面就記錄了消費者當前消費到了 offset =幾的那條消息;假如此時消費者系統被重啓,重啓以後,消費者會找kafka,讓kafka把上次我消費到的那個地方後面的數據繼續給我傳遞過來。更多面試題:面試題內容聚合
重複消息緣由:(主要發生在消費者重啓後)
消費者不是說消費完一條數據就立馬提交 offset的,而是定時按期提交一次 offset。消費者若是再準備提交 offset,可是還沒提交 offset的時候,消費者進程重啓了,那麼此時已經消費過的消息的 offset並無提交,kafka也就不知道你已經消費了 offset= 153那條數據,這個時候kafka會給你發offset=152,153,154的數據,此時 offset = 152,153的消息重複消費了
保證 MQ 重複消費冪等性
冪等:一個數據或者一個請求,給你重複來屢次,你得確保對應的數據是不會改變的,不能出錯。
思路:
拿數據要寫庫,首先檢查下主鍵,若是有數據,則不插入,進行一次update
若是是寫 redis,就沒問題,反正每次都是 set ,自然冪等性
生產者發送消息的時候帶上一個全局惟一的id,消費者拿到消息後,先根據這個id去 redis裏查一下,以前有沒消費過,沒有消費過就處理,而且寫入這個 id 到 redis,若是消費過了,則不處理。
基於數據庫的惟一鍵
保證 MQ 消息不丟
MQ 傳遞很是核心的消息,好比:廣告計費系統,用戶點擊一次廣告,扣費一塊錢,若是扣費的時候消息丟了,則會不斷少錢,聚沙成塔,對公司是一個很大的損失。
RabbitMQ可能存在的數據丟失問題
生產者寫消息的過程當中,消息都沒有到 rabbitmq,在網絡傳輸過程當中就丟了。或者消息到了 rabbitmq,可是人家內部出錯了沒保存下來。
RabbitMQ 接收到消息以後先暫存在主機的內存裏,結果消費者還沒來得及消費,RabbitMQ本身掛掉了,就致使暫存在內存裏的數據給搞丟了。
消費者消費到了這個消費,可是還沒來得及處理,本身就掛掉了,RabbitMQ 覺得這個消費者已經處理完了。
問題 1解決方案:
事務機制:(通常不採用,同步的,生產者發送消息會同步阻塞卡住等待你是成功仍是失敗。會致使生產者發送消息的吞吐量降下來)
channel.txSelect
try {
//發送消息
} catch(Exception e){
channel.txRollback;
//再次重試發送這條消息
}
channel.txCommit;
confirm機制:(通常採用這種機制,異步的模式,不會阻塞,吞吐量會比較高)
先把 channel 設置成 confirm 模式
發送一個消息到 rabbitmq
發送完消息後就不用管了
rabbitmq 若是接收到了這條消息,就會回調你生產者本地的一個接口,通知你說這條消息我已經收到了
rabbitmq 若是在接收消息的時候報錯了,就會回調你的接口,告訴你這個消息接收失敗了,你能夠再次重發。
public void ack(String messageId){
}
public void nack(String messageId){
//再次重發一次這個消息
}
問題 2 解決方案:
持久化到磁盤
建立queue的時候將其設置爲持久化的,這樣就能夠保證 rabbitmq持久化queue的元數據,可是不會持久化queue裏的數據
發送消息的時候將 deliveryMode 設置爲 2,將消息設置爲持久化的,此時 rabbitmq就會將消息持久化到磁盤上去。必須同時設置 2 個持久化才行。
持久化能夠跟生產者那邊的 confirm機制配合起來,只有消息被持久化到磁盤以後,纔會通知生產者 ack了 ,因此哪怕是在持久化到磁盤以前 ,rabbitmq掛了,數據丟了,生產者收不到 ack,你也能夠本身重發。
缺點:可能會有一點點丟失數據的可能,消息恰好寫到了 rabbitmq中,可是還沒來得及持久化到磁盤上,結果不巧, rabbitmq掛了,會致使內存裏的一點點數據會丟失。更多面試題:面試題內容聚合
問題 3 解決方案:
緣由:消費者打開了 autoAck機制(消費到一條消息,還在處理中,還沒處理完,此時消費者自動 autoAck了,通知 rabbitmq說這條消息已經消費了,此時不巧,消費者系統宕機了,那條消息丟失了,還沒處理完,並且 rabbitmq還覺得這個消息已經處理掉了)
解決方案:關閉 autoAck,本身處理完了一條消息後,再發送 ack給 rabbitmq,若是此時還沒處理完就宕機了,此時rabbitmq沒收到你發的ack消息,而後 rabbitmq 就會將這條消息從新分配給其餘的消費者去處理。
Kafka 可能存在的數據丟失問題
消費端弄丟數據
緣由:消費者消費到那條消息後,自動提交了 offset,kafka覺得你已經消費好了這條消息,結果消費者掛了,這條消息就丟了。
例子:消費者消費到數據後寫到一個內存 queue裏緩存下,消息自動提交 offset,重啓了系統,結果會致使內存 queue 裏還沒來得及處理的數據丟失。
解決方法:kafka會自動提交 offset,那麼只要關閉自動提交 offset,在處理完以後本身手動提交,能夠保證數據不會丟。可是此時確實仍是會重複消費,好比恰好處理完,還沒提交 offset,結果本身掛了,此時確定會重複消費一次 ,作好冪等便可。
Kafka 丟掉消息
緣由:kafka 某個 broker 宕機,而後從新選舉 partition 的 leader時,此時其餘的 follower 恰好還有一些數據沒有同步,結果此時 leader掛了,而後選舉某個 follower成 leader以後,就丟掉了以前leader裏未同步的數據。更多面試題:面試題內容聚合
例子:kafka的leader機器宕機,將 follower 切換爲 leader以後,發現數據丟了
解決方案:(保證 kafka broker端在 leader發生故障,或者leader切換時,數據不會丟)
給 topic設置 replication.factor ,這個值必須大於 1,保證每一個 partition 必須至少有 2 個副本
在 kafka 服務端設置 min.insync.replicas 參數,這個值必須大於 1,這個是要求一個leader至少感知到有至少一個follower還跟本身保持聯繫,沒掉隊,這樣才能確保 leader掛了還有一個follower,保證至少一個 follower能和leader保持正常的數據同步。
在 producer 端設置 acks =all,這個是要求每條數據,必須是寫入全部 replica 以後,才能認爲是寫成功了。不然會生產者會一直重試,此時設置 retries = MAX(很大的重試的值),要求一旦寫入失敗,就卡在這裏(避免消息丟失)
kafka 生產者丟消息
按 2 的方案設置了 ack =all,必定不會丟。它會要求 leader 接收到消息,全部的 follower 都同步 到了消息以後,才認爲本次寫成功。若是沒知足這個條件,生產者會無限次重試 。
消息隊列順序性
背景:mysql binlog 同步的系統,在mysql裏增刪改一條數據,對應出來了增刪改 3 條binlog,接着這 3 條binlog發送到 MQ 裏面,到消費出來依次執行,起碼是要保證順序的吧,否則順序變成了 刪除、修改、增長。日同步數據達到上億,mysql->mysql,好比大數據 team,須要同步一個mysql庫,來對公司的業務系統的數據作各類複雜的操做。
場景:
rabbitmq,一個queue,多個consumer,這不明顯亂了
kafka,一個topic,一個partition,一個consumer,內部多線程,這不也亂了
RabbitMQ 消息順序錯亂
RabbitMQ 如何保證消息順序性
須要保證順序的數據放到同一個queue裏
Kafka 消息順序錯亂
寫入一個 partition中的數據必定是有順序的。
生產者在寫的時候,能夠指定一個 key,好比訂單id做爲key,那麼訂單相關的數據,必定會被分發到一個 partition中區,此時這個 partition中的數據必定是有順序的。Kafka 中一個 partition 只能被一個消費者消費。消費者從partition中取出數據的時候 ,必定是有順序的。
Kafka 保證消息順序性
若是消費者單線程消費+處理,若是處理比較耗時,處理一條消息是幾十ms,一秒鐘只能處理幾十條數據,這個吞吐量過低了。確定要用多線程去併發處理,壓測消費者4 核 8G 單機,32 條線程,最高每秒能夠處理上千條消息
消息隊列延遲以及過時失效
消費端出了問題,不消費了或者消費極其慢。接着坑爹了,你的消息隊列集羣的磁盤都快寫滿了 ,都沒人消費,怎麼辦?積壓了幾個小時,rabbitmq設置了消息過時時間後就沒了,怎麼辦?
例如:
每次消費以後都要寫 mysql,結果mysql掛了,消費端 hang 不動了。
消費者本地依賴的一個東西掛了,致使消費者掛了。
長時間沒處理消費,致使 mq 寫滿了。
場景:幾千萬條數據再 MQ 裏積壓了七八個小時
快速處理積壓的消息
一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘是 18W 條,1000 多 W 條須要一個小時恢復。
步驟:
先修復 consumer 的問題,確保其恢復消費速度,而後將現有的 consumer 都停掉
新建一個topic,partition是原來的 10 倍,臨時創建好原先 10 倍或者 20 倍的 queue 數量
而後寫一個臨時的分發數據的 consumer 程序,這個程序部署上去消費積壓的數據,消費以後不作耗時的處理,直接均勻輪詢寫入臨時創建好的 10 倍數量的 queue
接着臨時徵用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的數據
這種作法至關 因而臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常 10 倍速度
等快速消費完積壓數據以後,恢復原先部署架構 ,從新用原先的 consumer機器消費消息
原來 3 個消費者須要 1 個小時能夠搞定,如今 30 個臨時消費者須要 10 分鐘就能夠搞定。
若是用的 rabbitmq,而且設置了過時時間,若是此消費在 queue裏積壓超過必定的時間會被 rabbitmq清理掉,數據直接搞丟。
這個時候開始寫程序,將丟失的那批 數據查出來,而後從新灌入mq裏面,把白天丟的數據補回來。
若是消息積壓mq,長時間沒被處理掉,致使mq快寫完滿了,你臨時寫一個程序,接入數據來消費,寫到一個臨時的mq裏,再讓其餘消費者慢慢消費 或者消費一個丟棄一個,都不要了,快速消費掉全部的消息,而後晚上補數據。
如何設計消息隊列中間件架構
mq要支持可伸縮性,快速擴容。設計一個分佈式的 MQ,broker->topic->partition,每一個 partition 放一個機器,就存一部分數據。若是如今資源不夠,給 topic 增長 partition ,而後作數據遷移,增長機器。
mq數據落磁盤,避免進程掛了數據丟了,順序寫,這樣就沒有磁盤隨機讀寫的尋址開銷,磁盤順序讀寫的性能是很高的,這個就是 kafka的思路。
mq高可用性。多副本->leader & follower-> broker 掛了從新選舉 leader 對外提供服務
支持數據 0 丟失。
本文分享自微信公衆號 - 碼農沉思錄(code-thinker)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。