如何保證消息隊列的高可用和冪等性以及數據丟失,順序一致性

如何保證消息隊列的高可用和冪等性以及數據丟失,順序一致性

<!-- more -->html

RabbitMQ的高可用性

RabbitMQ是比較有表明性的,由於是基於主從作高可用性的,咱們就以他爲例子講解第一種MQ的高可用性怎麼實現。git

rabbitmq有三種模式:

  • 單機模式
  • 普通集羣模式
  • 鏡像集羣模式
單機模式

就是demo級別的,通常就是你本地啓動了玩玩兒的,沒人生產用單機模式 github

普通集羣模式

意思就是在多臺機器上啓動多個rabbitmq實例,每一個機器啓動一個。可是你建立的queue,只會放在一個rabbtimq實例上,可是每一個實例都同步queue的元數據。完了你消費的時候,實際上若是鏈接到了另一個實例,那麼那個實例會從queue所在實例上拉取數據過來。 這種方式確實很麻煩,也不怎麼好,沒作到所謂的分佈式,就是個普通集羣。由於這致使你要麼消費者每次隨機鏈接一個實例而後拉取數據,要麼固定鏈接那個queue所在實例消費數據,前者有數據拉取的開銷,後者致使單實例性能瓶頸。 並且若是那個放queue的實例宕機了,會致使接下來其餘實例就沒法從那個實例拉取,若是你開啓了消息持久化,讓rabbitmq落地存儲消息的話,消息不必定會丟,得等這個實例恢復了,而後才能夠繼續從這個queue拉取數據。 因此這個事兒就比較尷尬了,這就沒有什麼所謂的高可用性可言了,這方案主要是提升吞吐量的,就是說讓集羣中多個節點來服務某個queue的讀寫操做。 web

鏡像集羣模式

這種模式,纔是所謂的rabbitmq的高可用模式,跟普通集羣模式不同的是,你建立的queue,不管元數據仍是queue裏的消息都會存在於多個實例上,而後每次你寫消息到queue的時候,都會自動把消息到多個實例的queue裏進行消息同步。這樣的話,好處在於,你任何一個機器宕機了,沒事兒,別的機器均可以用。壞處在於,第一,這個性能開銷也太大了吧,消息同步全部機器,致使網絡帶寬壓力和消耗很重!第二,這麼玩兒,就沒有擴展性可言了,若是某個queue負載很重,你加機器,新增的機器也包含了這個queue的全部數據,並無辦法線性擴展你的queue那麼怎麼開啓這個鏡像集羣模式呢?我這裏簡單說一下,避免面試人家問你你不知道,其實很簡單rabbitmq有很好的管理控制檯,就是在後臺新增一個策略,這個策略是鏡像集羣模式的策略,指定的時候能夠要求數據同步到全部節點的,也能夠要求就同步到指定數量的節點,而後你再次建立queue的時候,應用這個策略,就會自動將數據同步到其餘的節點上去了。面試

kafka的高可用性

kafka一個最基本的架構認識:多個broker組成,每一個broker是一個節點;你建立一個topic,這個topic能夠劃分爲多個partition,每一個partition能夠存在於不一樣的broker上,每一個partition就放一部分數據。 這就是自然的分佈式消息隊列,就是說一個topic的數據,是分散放在多個機器上的,每一個機器就放一部分數據。 實際上rabbitmq之類的,並非分佈式消息隊列,他就是傳統的消息隊列,只不過提供了一些集羣、HA的機制而已,由於不管怎麼玩兒,rabbitmq一個queue的數據都是放在一個節點裏的,鏡像集羣下,也是每一個節點都放這個queue的完整數據。 kafka 0.8之前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,無法寫也無法讀,沒有什麼高可用性可言。 kafka 0.8之後,提供了HA機制,就是replica副本機制。每一個partition的數據都會同步到吉他機器上,造成本身的多個replica副本。而後全部replica會選舉一個leader出來,那麼生產和消費都跟這個leader打交道,而後其餘replica就是follower。寫的時候,leader會負責把數據同步到全部follower上去,讀的時候就直接讀leader上數據便可。只能讀寫leader?很簡單,要是你能夠隨意讀寫每一個follower,那麼就要care數據一致性的問題,系統複雜度過高,很容易出問題。kafka會均勻的將一個partition的全部replica分佈在不一樣的機器上,這樣才能夠提升容錯性。 這麼搞,就有所謂的高可用性了,由於若是某個broker宕機了,沒事兒,那個broker上面的partition在其餘機器上都有副本的,若是這上面有某個partition的leader,那麼此時會從新選舉一個新的leader出來,你們繼續讀寫那個新的leader便可。這就有所謂的高可用性了。 寫數據的時候,生產者就寫leader,而後leader將數據落地寫本地磁盤,接着其餘follower本身主動從leader來pull數據。一旦全部follower同步好數據了,就會發送ack給leader,leader收到全部follower的ack以後,就會返回寫成功的消息給生產者。(固然,這只是其中一種模式,還能夠適當調整這個行爲)消費的時候,只會從leader去讀,可是隻有一個消息已經被全部follower都同步成功返回ack的時候,這個消息纔會被消費者讀到。 redis

怎麼保證消息隊列消費的冪等性?

先大概說一說可能會有哪些重複消費的問題。首先就是好比rabbitmq、rocketmq、kafka,都有可能會出現消費重複消費的問題,正常。由於這問題一般不是mq本身保證的,是給你保證的。而後咱們挑一個kafka來舉個例子,說說怎麼重複消費吧。kafka實際上有個offset的概念,就是每一個消息寫進去,都有一個offset,表明他的序號,而後consumer消費了數據以後,每隔一段時間,會把本身消費過的消息的offset提交一下,表明我已經消費過了,下次我要是重啓啥的,你就讓我繼續從上次消費到的offset來繼續消費吧。 可是凡事總有意外,好比咱們以前生產常常遇到的,就是你有時候重啓系統,看你怎麼重啓了,若是碰到點着急的,直接kill進程了,再重啓。這會致使consumer有些消息處理了,可是沒來得及提交offset,尷尬了。重啓以後,少數消息會再次消費一次。其實重複消費不可怕,可怕的是你沒考慮到重複消費以後,怎麼保證冪等性。給你舉個例子吧。假設你有個系統,消費一條往數據庫裏插入一條,要是你一個消息重複兩次,你不就插入了兩條,這數據不就錯了?可是你要是消費到第二次的時候,本身判斷一下已經消費過了,直接扔了,不就保留了一條數據?一條數據重複出現兩次,數據庫裏就只有一條數據,這就保證了系統的冪等性冪等性,我通俗點說,就一個數據,或者一個請求,給你重複來屢次,你得確保對應的數據是不會改變的,不能出錯。 數據庫

其實仍是得結合業務來思考,我這裏給幾個思路:

  1. 好比你拿個數據要寫庫,你先根據主鍵查一下,若是這數據都有了,你就別插入了,update一下好吧
  2. 好比你是寫redis,那沒問題了,反正每次都是set,自然冪等性
  3. 好比你不是上面兩個場景,那作的稍微複雜一點,你須要讓生產者發送每條數據的時候,裏面加一個全局惟一的id,相似訂單id之類的東西,而後你這裏消費到了以後,先根據這個id去好比redis裏查一下,以前消費過嗎?若是沒有消費過,你就處理,而後這個id寫redis。若是消費過了,那你就別處理了,保證別重複處理相同的消息便可。

還有好比基於數據庫的惟一鍵來保證重複數據不會重複插入多條,咱們以前線上系統就有這個問題,就是拿到數據的時候,每次重啓可能會有重複,由於kafka消費者還沒來得及提交offset,重複數據拿到了之後咱們插入的時候,由於有惟一鍵約束了,因此重複數據只會插入報錯,不會致使數據庫中出現髒數據api

如何保證MQ的消費是冪等性的,須要結合具體的業務來看 網絡

數據丟失怎麼辦(如何保證消息的可靠性傳輸)

1、rabbitmq

生產者弄丟了數據

生產者將數據發送到rabbitmq的時候,可能數據就在半路給搞丟了,由於網絡啥的問題,都有可能。 此時能夠選擇用rabbitmq提供的事務功能,就是生產者發送數據以前開啓rabbitmq事務(channel.txSelect),而後發送消息,若是消息沒有成功被rabbitmq接收到,那麼生產者會收到異常報錯,此時就能夠回滾事務(channel.txRollback),而後重試發送消息;若是收到了消息,那麼能夠提交事務(channel.txCommit)。可是問題是,rabbitmq事務機制一搞,基本上吞吐量會下來,由於太耗性能。 因此通常來講,若是你要確保說寫rabbitmq的消息別丟,能夠開啓confirm模式,在生產者那裏設置開啓confirm模式以後,你每次寫的消息都會分配一個惟一的id,而後若是寫入了rabbitmq中,rabbitmq會給你回傳一個ack消息,告訴你說這個消息ok了。若是rabbitmq沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你能夠重試。並且你能夠結合這個機制本身在內存裏維護每一個消息id的狀態,若是超過必定時間還沒接收到這個消息的回調,那麼你能夠重發。事務機制和cnofirm機制最大的不一樣在於,事務機制是同步的,你提交一個事務以後會阻塞在那兒,可是confirm機制是異步的,你發送個消息以後就能夠發送下一個消息,而後那個消息rabbitmq接收了以後會異步回調你一個接口通知你這個消息接收到了。 因此通常在生產者這塊避免數據丟失,都是用confirm機制的。 #####rabbitmq弄丟了數據 就是rabbitmq本身弄丟了數據,這個你必須開啓rabbitmq的持久化,就是消息寫入以後會持久化到磁盤,哪怕是rabbitmq本身掛了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。除非極其罕見的是,rabbitmq還沒持久化,本身就掛了,可能致使少許數據會丟失的,可是這個機率較小。設置持久化有兩個步驟,第一個是建立queue的時候將其設置爲持久化的,這樣就能夠保證rabbitmq持久化queue的元數據,可是不會持久化queue裏的數據;第二個是發送消息的時候將消息的deliveryMode設置爲2,就是將消息設置爲持久化的,此時rabbitmq就會將消息持久化到磁盤上去。必需要同時設置這兩個持久化才行,rabbitmq哪怕是掛了,再次重啓,也會從磁盤上重啓恢復queue,恢復這個queue裏的數據。並且持久化能夠跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤以後,纔會通知生產者ack了,因此哪怕是在持久化到磁盤以前,rabbitmq掛了,數據丟了,生產者收不到ack,你也是能夠本身重發的。哪怕是你給rabbitmq開啓了持久化機制,也有一種可能,就是這個消息寫到了rabbitmq中,可是還沒來得及持久化到磁盤上,結果不巧,此時rabbitmq掛了,就會致使內存裏的一點點數據會丟失。架構

消費端弄丟了數據

rabbitmq若是丟失了數據,主要是由於你消費的時候,剛消費到,還沒處理,結果進程掛了,好比重啓了,那麼就尷尬了,rabbitmq認爲你都消費了,這數據就丟了。這個時候得用rabbitmq提供的ack機制,簡單來講,就是你關閉rabbitmq自動ack,能夠經過一個api來調用就行,而後每次你本身代碼裏確保處理完的時候,再程序裏ack一把。這樣的話,若是你還沒處理完,不就沒有ack?那rabbitmq就認爲你還沒處理完,這個時候rabbitmq會把這個消費分配給別的consumer去處理,消息是不會丟的。

2、kafka

消費端弄丟了數據

惟一可能致使消費者弄丟數據的狀況,就是說,你那個消費到了這個消息,而後消費者那邊自動提交了offset,讓kafka覺得你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你本身就掛了,此時這條消息就丟咯。這不是同樣麼,你們都知道kafka會自動提交offset,那麼只要關閉自動提交offset,在處理完以後本身手動提交offset,就能夠保證數據不會丟。可是此時確實仍是會重複消費,好比你剛處理完,還沒提交offset,結果本身掛了,此時確定會重複消費一次,本身保證冪等性就行了。生產環境碰到的一個問題,就是說咱們的kafka消費者消費到了數據以後是寫到一個內存的queue裏先緩衝一下,結果有的時候,你剛把消息寫入內存queue,而後消費者會自動提交offset。而後此時咱們重啓了系統,就會致使內存queue裏還沒來得及處理的數據就丟失了

kafka弄丟了數據

這塊比較常見的一個場景,就是kafka某個broker宕機,而後從新選舉partiton的leader時。你們想一想,要是此時其餘的follower恰好還有些數據沒有同步,結果此時leader掛了,而後選舉某個follower成leader以後,他不就少了一些數據?這就丟了一些數據啊。生產環境也遇到過,咱們也是,以前kafka的leader機器宕機了,將follower切換爲leader以後,就會發現說這個數據就丟了因此此時通常是要求起碼設置以下4個參數:給這個topic設置replication.factor參數:這個值必須大於1,要求每一個partition必須有至少2個副本在kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟本身保持聯繫,沒掉隊,這樣才能確保leader掛了還有一個follower吧在producer端設置acks=all:這個是要求每條數據,必須是寫入全部replica以後,才能認爲是寫成功了在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裏了咱們生產環境就是按照上述要求配置的,這樣配置以後,至少在kafka broker端就能夠保證在leader所在broker發生故障,進行leader切換時,數據不會丟失 3)生產者會不會弄丟數據若是按照上述的思路設置了ack=all,必定不會丟,要求是,你的leader接收到消息,全部的follower都同步到了消息以後,才認爲本次寫成功了。若是沒知足這個條件,生產者會自動不斷的重試,重試無限次。

數據的順序性

rabbitmq保證數據的順序性

若是存在多個消費者,那麼就讓每一個消費者對應一個queue,而後把要發送 的數據全都放到一個queue,這樣就能保證全部的數據只到達一個消費者從而保證每一個數據到達數據庫都是順序的。

rabbitmq:拆分多個queue,每一個queue一個consumer,就是多一些queue而已,確實是麻煩點;或者就一個queue可是對應一個consumer,而後這個consumer內部用內存隊列作排隊,而後分發給底層不一樣的worker來處理

kafka保證數據的順序性

kafka 寫入partion時指定一個key,列如訂單id,那麼消費者從partion中取出數據的時候確定是有序的,當開啓多個線程的時候可能致使數據不一致,這時候就須要內存隊列,將相同的hash過的數據放在一個內存隊列裏,這樣就能保證一條線程對應一個內存隊列的數據寫入數據庫的時候順序性的,從而能夠開啓多條線程對應多個內存隊列(2)kafka:一個topic,一個partition,一個consumer,內部單線程消費,寫N個內存queue,而後N個線程分別消費一個內存queue便可

MQ積壓幾百萬條數據怎麼辦?

這個是咱們真實遇到過的一個場景,確實是線上故障了,這個時候要否則就是修復consumer的問題,讓他恢復消費速度,而後傻傻的等待幾個小時消費完畢。這個確定不能在面試的時候說吧。一個消費者一秒是1000條,一秒3個消費者是3000條,一分鐘是18萬條,1000多萬條因此若是你積壓了幾百萬到上千萬的數據,即便消費者恢復了,也須要大概1小時的時間才能恢復過來 通常這個時候,只能操做臨時緊急擴容了,具體操做步驟和思路以下:

  1. 先修復consumer的問題,確保其恢復消費速度,而後將現有cnosumer都停掉
  2. 新建一個topic,partition是原來的10倍,臨時創建好原先10倍或者20倍的queue數量
  3. 而後寫一個臨時的分發數據的consumer程序,這個程序部署上去消費積壓的數據,消費以後不作耗時的處理,直接均勻輪詢寫入臨時創建好的10倍數量的queue
  4. 接着臨時徵用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據
  5. 這種作法至關因而臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據
  6. 等快速消費完積壓數據以後,得恢復原先部署架構,從新用原先的consumer機器來消費消息
這裏咱們假設再來第二個坑

假設你用的是rabbitmq,rabbitmq是能夠設置過時時間的,就是TTL,若是消息在queue中積壓超過必定的時間就會被rabbitmq給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在mq裏,而是大量的數據會直接搞丟。這個狀況下,就不是說要增長consumer消費積壓的消息,由於實際上沒啥積壓,而是丟了大量的消息。咱們能夠採起一個方案,就是批量重導,這個咱們以前線上也有相似的場景幹過。就是大量積壓的時候,咱們當時就直接丟棄數據了,而後等過了高峯期之後,好比你們一塊兒喝咖啡熬夜到晚上12點之後,用戶都睡覺了。這個時候咱們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,而後從新灌入mq裏面去,把白天丟的數據給他補回來。也只能是這樣了。假設1萬個訂單積壓在mq裏面,沒有處理,其中1000個訂單都丟了,你只能手動寫程序把那1000個訂單給查出來,手動發到mq裏去再補一次

而後咱們再來假設第三個坑

若是走的方式是消息積壓在mq裏,那麼若是你很長時間都沒處理掉,此時致使mq都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉全部的消息。而後走第二個方案,到了晚上再補數據吧。

做者:_雲起 連接:https://www.jianshu.com/p/7a6deaba34d2 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。


原文連接


Posted by hexo-deployer-metaweblog

相關文章
相關標籤/搜索