又到了年末跳槽高峯季,不少小夥伴出去面試時,很多面試官都會問到消息隊列的問題,很多小夥伴回答的不是很完美,有些小夥伴是內心知道答案,嘴上卻沒有很好的表達出來,究其根本緣由,仍是對相關的知識點理解的不夠透徹。今天,咱們就一塊兒來探討下這個話題。注:文章有點長,你說你能一氣呵成看完,我有點不信!!java
文章已收錄到:mysql
https://github.com/sunshinelyz/technology-binghegit
https://gitee.com/binghe001/technology-binghegithub
消息隊列(Message Queue)是在消息的傳輸過程當中保存消息的容器,是應用間的通訊方式。消息發送後能夠當即返回,由消息系統保證消息的可靠傳輸,消息發佈者只管把消息寫到隊列裏面而不用考慮誰須要消息,而消息的使用者也不須要知道誰發佈的消息,只管到消息隊列裏面取,這樣生產和消費即可以作到分離。面試
優勢:sql
使用消息隊列比較核心的做用就是:解耦、異步、削峯。數據庫
缺點:api
如下主要討論的RabbitMQ和Kafka兩種消息隊列。微信
RabbitMQ的高可用是基於主從(非分佈式)作高可用性。RabbitMQ 有三種模式:單機模式(Demo級別)、普通集羣模式(無高可用性)、鏡像集羣模式(高可用性)。網絡
普通集羣模式,意思就是在多臺機器上啓動多個 RabbitMQ 實例,每一個機器啓動一個。你建立的 queue,只會放在一個 RabbitMQ 實例上,可是每一個實例都同步 queue 的元數據(元數據能夠認爲是 queue 的一些配置信息,經過元數據,能夠找到 queue 所在實例)。你消費的時候,實際上若是鏈接到了另一個實例,那麼那個實例會從 queue 所在實例上拉取數據過來。
這種方式確實很麻煩,也不怎麼好,沒作到所謂的分佈式,就是個普通集羣。由於這致使你要麼消費者每次隨機鏈接一個實例而後拉取數據,要麼固定鏈接那個 queue 所在實例消費數據,前者有數據拉取的開銷,後者致使單實例性能瓶頸。
並且若是那個放 queue 的實例宕機了,會致使接下來其餘實例就沒法從那個實例拉取,若是你開啓了消息持久化,讓 RabbitMQ 落地存儲消息的話,消息不必定會丟,得等這個實例恢復了,而後才能夠繼續從這個 queue 拉取數據。
因此這個事兒就比較尷尬了,這就沒有什麼所謂的高可用性,這方案主要是提升吞吐量的,就是說讓集羣中多個節點來服務某個 queue 的讀寫操做。
這種模式,纔是所謂的 RabbitMQ 的高可用模式。跟普通集羣模式不同的是,在鏡像集羣模式下,你建立的 queue,不管元數據仍是 queue 裏的消息都會存在於多個實例上,就是說,每一個 RabbitMQ 節點都有這個 queue 的一個完整鏡像,包含 queue 的所有數據的意思。而後每次你寫消息到 queue 的時候,都會自動把消息同步到多個實例的 queue 上。
那麼如何開啓這個鏡像集羣模式呢?其實很簡單,RabbitMQ 有很好的管理控制檯,就是在後臺新增一個策略,這個策略是鏡像集羣模式的策略,指定的時候是能夠要求數據同步到全部節點的,也能夠要求同步到指定數量的節點,再次建立 queue 的時候,應用這個策略,就會自動將數據同步到其餘的節點上去了。
這樣的話,好處在於,你任何一個機器宕機了,沒事兒,其它機器(節點)還包含了這個 queue 的完整數據,別的 consumer 均可以到其它節點上去消費數據。壞處在於,第一,這個性能開銷也太大了吧,消息須要同步到全部機器上,致使網絡帶寬壓力和消耗很重!第二,這麼玩兒,不是分佈式的,就沒有擴展性可言了,若是某個 queue 負載很重,你加機器,新增的機器也包含了這個 queue 的全部數據,並沒有辦法線性擴展你的 queue。你想,若是這個 queue 的數據量很大,大到這個機器上的容量沒法容納了,此時該怎麼辦呢?
Kafka 一個最基本的架構認識:由多個 broker 組成,每一個 broker 是一個節點;你建立一個 topic,這個 topic 能夠劃分爲多個 partition,每一個 partition 能夠存在於不一樣的 broker 上,每一個 partition 就放一部分數據。
這就是自然的分佈式消息隊列,就是說一個 topic 的數據,是分散放在多個機器上的,每一個機器就放一部分數據。
實際上 RabbmitMQ 之類的,並非分佈式消息隊列,它就是傳統的消息隊列,只不過提供了一些集羣、HA(High Availability, 高可用性) 的機制而已,由於不管怎麼玩兒,RabbitMQ 一個 queue 的數據都是放在一個節點裏的,鏡像集羣下,也是每一個節點都放這個 queue 的完整數據。
Kafka 0.8 之前,是沒有 HA 機制的,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了,無法寫也無法讀,沒有什麼高可用性可言。
好比說,咱們假設建立了一個 topic,指定其 partition 數量是 3 個,分別在三臺機器上。可是,若是第二臺機器宕機了,會致使這個 topic 的 1/3 的數據就丟了,所以這個是作不到高可用的。
Kafka 0.8 之後,提供了 HA 機制,就是 replica(複製品) 副本機制。每一個 partition 的數據都會同步到其它機器上,造成本身的多個 replica 副本。全部 replica 會選舉一個 leader 出來,那麼生產和消費都跟這個 leader 打交道,而後其餘 replica 就是 follower。寫的時候,leader 會負責把數據同步到全部 follower 上去,讀的時候就直接讀 leader 上的數據便可。只能讀寫 leader?很簡單,要是你能夠隨意讀寫每一個 follower,那麼就要 care 數據一致性的問題,系統複雜度過高,很容易出問題。Kafka 會均勻地將一個 partition 的全部 replica 分佈在不一樣的機器上,這樣才能夠提升容錯性。
這麼搞,就有所謂的高可用性了,由於若是某個 broker 宕機了,沒事兒,那個 broker上面的 partition 在其餘機器上都有副本的。若是這個宕機的 broker 上面有某個 partition 的 leader,那麼此時會從 follower 中從新選舉一個新的 leader 出來,你們繼續讀寫那個新的 leader 便可。這就有所謂的高可用性了。
寫數據的時候,生產者就寫 leader,而後 leader 將數據落地寫本地磁盤,接着其餘 follower 本身主動從 leader 來 pull 數據。一旦全部 follower 同步好數據了,就會發送 ack 給 leader,leader 收到全部 follower 的 ack 以後,就會返回寫成功的消息給生產者。(固然,這只是其中一種模式,還能夠適當調整這個行爲)
消費的時候,只會從 leader 去讀,可是隻有當一個消息已經被全部 follower 都同步成功返回 ack 的時候,這個消息纔會被消費者讀到。
首先,全部的消息隊列都會有這樣重複消費的問題,由於這是不MQ來保證,而是咱們本身開發保證的,咱們使用Kakfa來討論是如何實現的。
Kakfa有個offset的概念,就是每一個消息寫進去都會有一個offset值,表明消費的序號,而後consumer消費了數據以後,默認每隔一段時間會把本身消費過的消息的offset值提交,表示我已經消費過了,下次要是我重啓啥的,就讓我從當前提交的offset處來繼續消費。
可是凡事總有意外,好比咱們以前生產常常遇到的,就是你有時候重啓系統,看你怎麼重啓了,若是碰到點着急的,直接 kill 進程了,再重啓。這會致使 consumer 有些消息處理了,可是沒來得及提交 offset,尷尬了。重啓以後,少數消息會再次消費一次。
其實重複消費不可怕,可怕的是你沒考慮到重複消費以後,怎麼保證冪等性。
舉個例子吧。假設你有個系統,消費一條消息就往數據庫裏插入一條數據,要是你一個消息重複兩次,你不就插入了兩條,這數據不就錯了?可是你要是消費到第二次的時候,本身判斷一下是否已經消費過了,如果就直接扔了,這樣不就保留了一條數據,從而保證了數據的正確性。一條數據重複出現兩次,數據庫裏就只有一條數據,這就保證了系統的冪等性。冪等性,通俗點說,就一個數據,或者一個請求,給你重複來屢次,你得確保對應的數據是不會改變的,不能出錯。
因此第二個問題來了,怎麼保證消息隊列消費的冪等性?
其實仍是得結合業務來思考,我這裏給幾個思路:
固然,如何保證 MQ 的消費是冪等性的,須要結合具體的業務來看。
這個是確定的,MQ的基本原則就是數據不能多一條,也不能少一條,不能多其實就是咱們前面重複消費的問題。不能少,就是數據不能丟,像計費,扣費的一些信息,是確定不能丟失的。
數據的丟失問題,可能出如今生產者、MQ、消費者中,我們從 RabbitMQ 和 Kafka 分別來分析一下吧。
生產者丟數據
生產者將數據發送到 RabbitMQ 的時候,可能數據就在半路給搞丟了,由於網絡問題啥的,都有可能。
此時能夠選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據以前開啓 RabbitMQ 事務channel.txSelect
,而後發送消息,若是消息沒有成功被 RabbitMQ 接收到,那麼生產者會收到異常報錯,此時就能夠回滾事務channel.txRollback
,而後重試發送消息;若是收到了消息,那麼能夠提交事務channel.txCommit
。
// 開啓事務 channel.txSelect try { // 這裏發送消息 } catch (Exception e) { channel.txRollback // 這裏再次重發這條消息 } // 提交事務 channel.txCommit
可是問題是,RabbitMQ 事務機制(同步)一搞,基本上吞吐量會下來,由於太耗性能。
因此通常來講,若是你要確保說寫 RabbitMQ 的消息別丟,能夠開啓 confirm
模式,在生產者那裏設置開啓 confirm
模式以後,你每次寫的消息都會分配一個惟一的 id,而後若是寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack
消息,告訴你說這個消息 ok 了。若是 RabbitMQ 沒能處理這個消息,會回調你的一個 nack
接口,告訴你這個消息接收失敗,你能夠重試。並且你能夠結合這個機制本身在內存裏維護每一個消息 id 的狀態,若是超過必定時間還沒接收到這個消息的回調,那麼你能夠重發。
事務機制和 confirm
機制最大的不一樣在於,事務機制是同步的,你提交一個事務以後會阻塞在那兒,可是 confirm
機制是異步的,你發送個消息以後就能夠發送下一個消息,而後那個消息 RabbitMQ 接收了以後會異步回調你的一個接口通知你這個消息接收到了。
因此通常在生產者這塊避免數據丟失,都是用 confirm
機制的。
RabbitMQ丟數據
就是 RabbitMQ 本身弄丟了數據,這個你必須開啓 RabbitMQ 的持久化,就是消息寫入以後會持久化到磁盤,哪怕是 RabbitMQ 本身掛了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。除非極其罕見的是,RabbitMQ 還沒持久化,本身就掛了,可能致使少許數據丟失,可是這個機率較小。
設置持久化有兩個步驟:
deliveryMode
設置爲 2 就是將消息設置爲持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。必需要同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啓,也會從磁盤上重啓恢復 queue,恢復這個 queue 裏的數據。
注意,哪怕是你給 RabbitMQ 開啓了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,可是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會致使內存裏的一點點數據丟失。
因此,持久化能夠跟生產者那邊的 confirm
機制配合起來,只有消息被持久化到磁盤以後,纔會通知生產者 ack
了,因此哪怕是在持久化到磁盤以前,RabbitMQ 掛了,數據丟了,生產者收不到 ack
,你也是能夠本身重發的。
消費者丟數據
RabbitMQ 若是丟失了數據,主要是由於你消費的時候,剛消費到,還沒處理,結果進程掛了,好比重啓了,那麼就尷尬了,RabbitMQ 認爲你都消費了,這數據就丟了。
這個時候得用 RabbitMQ 提供的 ack
機制,簡單來講,就是你必須關閉 RabbitMQ 的自動 ack
,能夠經過一個 api 來調用就行,而後每次你本身代碼裏確保處理完的時候,再在程序裏 ack
一把。這樣的話,若是你還沒處理完,不就沒有 ack
了?那 RabbitMQ 就認爲你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。
惟一可能致使消費者弄丟數據的狀況,就是說,你消費到了這個消息,而後消費者那邊自動提交了 offset,讓 Kafka 覺得你已經消費好了這個消息,但其實你纔剛準備處理這個消息,你還沒處理,你本身就掛了,此時這條消息就丟咯。
這不是跟 RabbitMQ 差很少嗎,你們都知道 Kafka 會自動提交 offset,那麼只要關閉自動提交 offset,在處理完以後本身手動提交 offset,就能夠保證數據不會丟。可是此時確實仍是可能會有重複消費,好比你剛處理完,還沒提交 offset,結果本身掛了,此時確定會重複消費一次,本身保證冪等性就行了。
生產環境碰到的一個問題,就是說咱們的 Kafka 消費者消費到了數據以後是寫到一個內存的 queue 裏先緩衝一下,結果有的時候,你剛把消息寫入內存 queue,而後消費者會自動提交 offset。而後此時咱們重啓了系統,就會致使內存 queue 裏還沒來得及處理的數據就丟失了。
Kafka丟數據
這塊比較常見的一個場景,就是 Kafka 某個 broker 宕機,而後從新選舉 partition 的 leader。你們想一想,要是此時其餘的 follower 恰好還有些數據沒有同步,結果此時 leader 掛了,而後選舉某個 follower 成 leader 以後,不就少了一些數據?這就丟了一些數據啊。
生產環境也遇到過,咱們也是,以前 Kafka 的 leader 機器宕機了,將 follower 切換爲 leader 以後,就會發現說這個數據就丟了。
因此此時通常是要求起碼設置以下 4 個參數:
replication.factor
參數:這個值必須大於 1,要求每一個 partition 必須有至少 2 個副本。min.insync.replicas
參數:這個值必須大於 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟本身保持聯繫,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。acks=all
:這個是要求每條數據,必須是寫入全部 replica 以後,才能認爲是寫成功了。retries=MAX
(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裏了。咱們生產環境就是按照上述要求配置的,這樣配置以後,至少在 Kafka broker 端就能夠保證在 leader 所在 broker 發生故障,進行 leader 切換時,數據不會丟失。
若是按照上述的思路設置了 acks=all
,必定不會丟,要求是,你的 leader 接收到消息,全部的 follower 都同步到了消息以後,才認爲本次寫成功了。若是沒知足這個條件,生產者會自動不斷的重試,重試無限次。
我舉個例子,咱們之前作過一個 mysql binlog
同步的系統,壓力仍是很是大的,日同步數據要達到上億,就是說數據從一個 mysql 庫原封不動地同步到另外一個 mysql 庫裏面去(mysql -> mysql)。常見的一點在於說好比大數據 team,就須要同步一個 mysql 庫過來,對公司的業務系統的數據作各類複雜的操做。
你在 mysql 裏增刪改一條數據,對應出來了增刪改 3 條 binlog
日誌,接着這三條 binlog
發送到 MQ 裏面,再消費出來依次執行,起碼得保證人家是按照順序來的吧?否則原本是:增長、修改、刪除;你楞是換了順序給執行成刪除、修改、增長,不全錯了麼。
原本這個數據同步過來,應該最後這個數據被刪除了;結果你搞錯了這個順序,最後這個數據保留下來了,數據同步就出錯了。
先看看順序會錯亂的倆場景:
拆分多個 queue,每一個 queue 一個 consumer,就是多一些 queue 而已,確實是麻煩點;或者就一個 queue 可是對應一個 consumer,而後這個 consumer 內部用內存隊列作排隊,而後分發給底層不一樣的 worker 來處理。
一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘就是 18 萬條。因此若是你積壓了幾百萬到上千萬的數據,即便消費者恢復了,也須要大概 1 小時的時間才能恢復過來。
通常這個時候,只能臨時緊急擴容了,具體操做步驟和思路以下:
假設你用的是 RabbitMQ,RabbtiMQ 是能夠設置過時時間的,也就是 TTL。若是消息在 queue 中積壓超過必定的時間就會被 RabbitMQ 給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在 mq 裏,而是大量的數據會直接搞丟。
這個狀況下,就不是說要增長 consumer 消費積壓的消息,由於實際上沒啥積壓,而是丟了大量的消息。咱們能夠採起一個方案,就是批量重導,這個咱們以前線上也有相似的場景幹過。就是大量積壓的時候,咱們當時就直接丟棄數據了,而後等過了高峯期之後,好比你們一塊兒喝咖啡熬夜到晚上12點之後,用戶都睡覺了。這個時候咱們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,而後從新灌入 mq 裏面去,把白天丟的數據給他補回來。也只能是這樣了。
假設 1 萬個訂單積壓在 mq 裏面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程序把那 1000 個訂單給查出來,手動發到 mq 裏去再補一次。
若是消息積壓在 mq 裏,你很長時間都沒有處理掉,此時致使 mq 都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉全部的消息。而後走第二個方案,到了晚上再補數據吧。
參考資料:
好了,今天就到這兒吧,我是冰河,你們有啥問題能夠在下方留言,也能夠加我微信:sun_shine_lyz,我拉你進羣,一塊兒交流技術,一塊兒進階,一塊兒牛逼~~