分佈式消息隊列:如何保證消息的可靠性傳輸

rabbitmq

(1)生產者弄丟了數據

生產者將數據發送到rabbitmq的時候,可能數據就在半路給搞丟了,由於網絡啥的問題,都有可能。api

此時能夠選擇用rabbitmq提供的事務功能,就是生產者發送數據以前開啓rabbitmq事務(channel.txSelect),而後發送消息,若是消息沒有成功被rabbitmq接收到,那麼生產者會收到異常報錯,此時就能夠回滾事務(channel.txRollback),而後重試發送消息;若是收到了消息,那麼能夠提交事務(channel.txCommit)。可是問題是,rabbitmq事務機制一搞,基本上吞吐量會下來,由於太耗性能。網絡

因此通常來講,若是你要確保說寫rabbitmq的消息別丟,能夠開啓confirm模式,在生產者那裏設置開啓confirm模式以後,你每次寫的消息都會分配一個惟一的id,而後若是寫入了rabbitmq中,rabbitmq會給你回傳一個ack消息,告訴你說這個消息ok了。若是rabbitmq沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你能夠重試。並且你能夠結合這個機制本身在內存裏維護每一個消息id的狀態,若是超過必定時間還沒接收到這個消息的回調,那麼你能夠重發。異步

事務機制和cnofirm機制最大的不一樣在於,事務機制是同步的,你提交一個事務以後會阻塞在那兒,可是confirm機制是異步的,你發送個消息以後就能夠發送下一個消息,而後那個消息rabbitmq接收了以後會異步回調你一個接口通知你這個消息接收到了。分佈式

因此通常在生產者這塊避免數據丟失,都是用confirm機制的。post

(2)rabbitmq弄丟了數據

就是rabbitmq本身弄丟了數據,這個你必須開啓rabbitmq的持久化,就是消息寫入以後會持久化到磁盤,哪怕是rabbitmq本身掛了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。除非極其罕見的是,rabbitmq還沒持久化,本身就掛了,可能致使少許數據會丟失的,可是這個機率較小。性能

設置持久化有兩個步驟,第一個是建立queue的時候將其設置爲持久化的,這樣就能夠保證rabbitmq持久化queue的元數據,可是不會持久化queue裏的數據;第二個是發送消息的時候將消息的deliveryMode設置爲2,就是將消息設置爲持久化的,此時rabbitmq就會將消息持久化到磁盤上去。必需要同時設置這兩個持久化才行,rabbitmq哪怕是掛了,再次重啓,也會從磁盤上重啓恢復queue,恢復這個queue裏的數據。orm

並且持久化能夠跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤以後,纔會通知生產者ack了,因此哪怕是在持久化到磁盤以前,rabbitmq掛了,數據丟了,生產者收不到ack,你也是能夠本身重發的。cdn

哪怕是你給rabbitmq開啓了持久化機制,也有一種可能,就是這個消息寫到了rabbitmq中,可是還沒來得及持久化到磁盤上,結果不巧,此時rabbitmq掛了,就會致使內存裏的一點點數據會丟失。blog

(3)消費端弄丟了數據

rabbitmq若是丟失了數據,主要是由於你消費的時候,剛消費到,還沒處理,結果進程掛了,好比重啓了,那麼就尷尬了,rabbitmq認爲你都消費了,這數據就丟了。接口

這個時候得用rabbitmq提供的ack機制,簡單來講,就是你關閉rabbitmq自動ack,能夠經過一個api來調用就行,而後每次你本身代碼裏確保處理完的時候,再程序裏ack一把。這樣的話,若是你還沒處理完,不就沒有ack?那rabbitmq就認爲你還沒處理完,這個時候rabbitmq會把這個消費分配給別的consumer去處理,消息是不會丟的。


kafka

(1)消費端弄丟了數據

惟一可能致使消費者弄丟數據的狀況,就是說,你那個消費到了這個消息,而後消費者那邊自動提交了offset,讓kafka覺得你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你本身就掛了,此時這條消息就丟咯。

這不是同樣麼,你們都知道kafka會自動提交offset,那麼只要關閉自動提交offset,在處理完以後本身手動提交offset,就能夠保證數據不會丟。可是此時確實仍是會重複消費,好比你剛處理完,還沒提交offset,結果本身掛了,此時確定會重複消費一次,本身保證冪等性就行了。

生產環境碰到的一個問題,就是說咱們的kafka消費者消費到了數據以後是寫到一個內存的queue裏先緩衝一下,結果有的時候,你剛把消息寫入內存queue,而後消費者會自動提交offset。

而後此時咱們重啓了系統,就會致使內存queue裏還沒來得及處理的數據就丟失了


(2)kafka弄丟了數據

這塊比較常見的一個場景,就是kafka某個broker宕機,而後從新選舉partiton的leader時。你們想一想,要是此時其餘的follower恰好還有些數據沒有同步,結果此時leader掛了,而後選舉某個follower成leader以後,他不就少了一些數據?這就丟了一些數據啊。

生產環境也遇到過,咱們也是,以前kafka的leader機器宕機了,將follower切換爲leader以後,就會發現說這個數據就丟了

因此此時通常是要求起碼設置以下4個參數:

給這個topic設置replication.factor參數:這個值必須大於1,要求每一個partition必須有至少2個副本

在kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟本身保持聯繫,沒掉隊,這樣才能確保leader掛了還有一個follower吧

在producer端設置acks=all:這個是要求每條數據,必須是寫入全部replica以後,才能認爲是寫成功了

在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裏了

咱們生產環境就是按照上述要求配置的,這樣配置以後,至少在kafka broker端就能夠保證在leader所在broker發生故障,進行leader切換時,數據不會丟失

(3)生產者會不會弄丟數據

若是按照上述的思路設置了ack=all,必定不會丟,要求是,你的leader接收到消息,全部的follower都同步到了消息以後,才認爲本次寫成功了。若是沒知足這個條件,生產者會自動不斷的重試,重試無限次。

更多系列文章

分佈式消息隊列:如何保證消息隊列的高可用

分佈式消息隊列:如何保證消息不被重複消費

分佈式消息隊列:如何保證消息的可靠性傳輸

最後

後續會持續更新分佈式知識,你們以爲不錯能夠點個贊在關注下,之後還會分享更多文章!

相關文章
相關標籤/搜索