本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。mysql
比較核心的有3個:解耦、異步、削峯。redis
rabbitmq有三種模式:單機模式,普通集羣模式,鏡像集羣模式。sql
普通集羣模式模式是在多臺機器上啓動多個rabbitmq實例,每一個機器啓動一個。可是你建立的queue,只會放在一個rabbtimq實例上,可是每一個實例都同步queue的元數據。完了你消費的時候,實際上若是鏈接到了另一個實例,那麼那個實例會從queue所在實例上拉取數據過來。數據庫
鏡像集羣模式是,纔是所謂的rabbitmq的高可用模式,跟普通集羣模式不同的是,你建立的queue,不管元數據仍是queue裏的消息都會存在於多個實例上,而後每次你寫消息到queue的時候,都會自動把消息到多個實例的queue裏進行消息同步。api
rabbitmq並非分佈式消息隊列,他就是傳統的消息隊列,只不過提供了一些集羣、HA的機制而已。rabbitmq一個queue的數據都是放在一個節點裏的,鏡像集羣下,也是每一個節點都放這個queue的完整數據。沒法真正實現集羣的擴容。多線程
kafka由多個broker組成,每一個broker是一個節點;你建立一個topic,這個topic能夠劃分爲多個partition,每一個partition能夠存在於不一樣的broker上,每一個partition就放一部分數據。一個topic的數據,是分散放在多個機器上的,每一個機器就放一部分數據。架構
kafka 0.8之前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,無法寫也無法讀,沒有什麼高可用性可言。併發
kafka 0.8之後,提供了HA機制,就是replica副本機制。每一個partition的數據都會同步到其餘機器上,造成本身的多個replica副本。而後全部replica會選舉一個leader出來,那麼生產和消費都跟這個leader打交道,而後其餘replica就是follower。異步
寫的時候,leader會負責把數據同步到全部follower上去,讀的時候就直接讀leader上數據便可。只能讀寫leader?很簡單,要是你能夠隨意讀寫每一個follower,那麼就要care數據一致性的問題,系統複雜度過高,很容易出問題。kafka會均勻的將一個partition的全部replica分佈在不一樣的機器上,這樣才能夠提升容錯性。分佈式
消費的時候,只會從leader去讀,可是隻有一個消息已經被全部follower都同步成功返回ack的時候,這個消息纔會被消費者讀到。
既然是消費消息,那確定要考慮考慮會不會重複消費?能不能避免重複消費?或者重複消費了也別形成系統異常能夠嗎?這個是MQ領域的基本問題,其實本質上如何使用消息隊列保證冪等性。
kafka實際上有個offset的概念,就是每一個消息寫進去,都有一個offset,表明他的序號,而後consumer消費了數據以後,每隔一段時間,會把本身消費過的消息的offset提交一下,表明我已經消費過了,下次我要是重啓啥的,你就讓我繼續從上次消費到的offset來繼續消費吧。
可是凡事總有意外,好比咱們以前生產常常遇到的,就是你有時候重啓系統,看你怎麼重啓了,若是碰到點着急的,直接kill進程了,再重啓。這會致使consumer有些消息處理了,可是沒來得及提交offset,尷尬了。重啓以後,少數消息會再次消費一次。
好比你拿個數據要寫庫,你先根據主鍵查一下,若是這數據都有了,你就別插入了,update一下好吧
好比你是寫redis,那沒問題了,反正每次都是set,自然冪等性。
好比你不是上面兩個場景,那作的稍微複雜一點,你須要讓生產者發送每條數據的時候,裏面加一個全局惟一的id,相似訂單id之類的東西,而後你這裏消費到了以後,先根據這個id去好比redis裏查一下,以前消費過嗎?若是沒有消費過,你就處理,而後這個id寫redis。若是消費過了,那你就別處理了,保證別重複處理相同的消息便可。
基於數據庫的惟一鍵來保證重複數據不會重複插入多條,咱們以前線上系統就有這個問題,就是拿到數據的時候,每次重啓可能會有重複,由於kafka消費者還沒來得及提交offset,重複數據拿到了之後咱們插入的時候,由於有惟一鍵約束了,因此重複數據只會插入報錯,不會致使數據庫中出現髒數據。
基於rabbitmq提供的事務功能,
生產者發送數據以前開啓rabbitmq事務(channel.txSelect),而後發送消息,若是消息沒有成功被rabbitmq接收到,那麼生產者會收到異常報錯,此時就能夠回滾事務(channel.txRollback),而後重試發送消息;若是收到了消息,那麼能夠提交事務(channel.txCommit)。可是問題是,rabbitmq事務機制一搞,基本上吞吐量會下來,由於太耗性能。
基於rabbitmq開啓confirm模式
在生產者那裏設置開啓confirm模式以後,你每次寫的消息都會分配一個惟一的id,而後若是寫入了rabbitmq中,rabbitmq會給你回傳一個ack消息,告訴你說這個消息ok了。若是rabbitmq沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你能夠重試。並且你能夠結合這個機制本身在內存裏維護每一個消息id的狀態,若是超過必定時間還沒接收到這個消息的回調,那麼你能夠重發。
事務機制和cnofirm機制不一樣之處
事務機制和cnofirm機制最大的不一樣在於,事務機制是同步的,你提交一個事務以後會阻塞在那兒,可是confirm機制是異步的,你發送個消息以後就能夠發送下一個消息,而後那個消息rabbitmq接收了以後會異步回調你一個接口通知你這個消息接收到了。
消費端手動ACK機制實現
rabbitmq若是丟失了數據,主要是由於你消費的時候,剛消費到,還沒處理,結果進程掛了,好比重啓了,那麼就尷尬了,rabbitmq認爲你都消費了,這數據就丟了。
這個時候得用rabbitmq提供的ack機制,簡單來講,就是你關閉rabbitmq自動ack,能夠經過一個api來調用就行,而後每次你本身代碼裏確保處理完的時候,再程序裏ack一把。這樣的話,若是你還沒處理完,不就沒有ack?那rabbitmq就認爲你還沒處理完,這個時候rabbitmq會把這個消費分配給別的consumer去處理,消息是不會丟的。
消費端手動ACK機制實現
惟一可能致使消費者弄丟數據的狀況,就是說,你那個消費到了這個消息,而後消費者那邊自動提交了offset,讓kafka覺得你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你本身就掛了,此時這條消息就丟咯。
這不是同樣麼,你們都知道kafka會自動提交offset,那麼只要關閉自動提交offset,在處理完以後本身手動提交offset,就能夠保證數據不會丟。可是此時確實仍是會重複消費,好比你剛處理完,還沒提交offset,結果本身掛了,此時確定會重複消費一次,本身保證冪等性就行了。
生產環境碰到的一個問題,就是說咱們的kafka消費者消費到了數據以後是寫到一個內存的queue裏先緩衝一下,結果有的時候,你剛把消息寫入內存queue,而後消費者會自動提交offset。
而後此時咱們重啓了系統,就會致使內存queue裏還沒來得及處理的數據就丟失了。
kafka broker 數據零丟失保證
好比kafka某個broker宕機,而後從新選舉partiton的leader時。你們想一想,要是此時其餘的follower恰好還有些數據沒有同步,結果此時leader掛了,而後選舉某個follower成leader以後,他不就少了一些數據?這就丟了一些數據啊。
生產環境也遇到過,咱們也是,以前kafka的leader機器宕機了,將follower切換爲leader以後,就會發現說這個數據就丟了
因此此時通常是要求起碼設置以下4個參數:
(1)topic設置replication.factor參數:這個值必須大於1,要求每一個partition必須有至少2個副本
(2)kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟本身保持聯繫,沒掉隊,這樣才能確保leader掛了還有一個follower吧
(3)producer端設置acks=all:這個是要求每條數據,必須是寫入全部replica以後,才能認爲是寫成功了
(4)producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裏了
複製代碼
咱們生產環境就是按照上述要求配置的,這樣配置以後,至少在kafka broker端就能夠保證在leader所在broker發生故障,進行leader切換時,數據不會丟失。
kafka分區partition掛掉以後如何恢復?
在kafka中有一個partition recovery機制用於恢復掛掉的partition。 每一個Partition會在磁盤記錄一個RecoveryPoint(恢復點), 記錄已經flush到磁盤的最大offset。當broker fail 重啓時,會進行loadLogs。 首先會讀取該Partition的RecoveryPoint,找到包含RecoveryPoint點上的segment及之後的segment, 這些segment就是可能沒有徹底flush到磁盤segments。而後調用segment的recover,從新讀取各個segment的msg,並重建索引。
優勢:
以segment爲單位管理Partition數據,方便數據生命週期的管理,刪除過時數據簡單
在程序崩潰重啓時,加快recovery速度,只需恢復未徹底flush到磁盤的segment便可
什麼緣由致使副本與leader不一樣步的呢?
慢副本:在必定週期時間內follower不能追遇上leader。最多見的緣由之一是IO瓶頸致使follower追加複製消息速度慢於從leader拉取速度。
卡住副本:在必定週期時間內follower中止從leader拉取請求。follower replica卡住了是因爲GC暫停或follower失效或死亡。
新啓動副本:當用戶給主題增長副本因子時,新的follower不在同步副本列表中,直到他們徹底遇上了leader日誌。
一個partition的follower落後於leader足夠多時,被認爲不在同步副本列表或處於滯後狀態。正如上述所說,如今kafka斷定落後有兩種,副本滯後判斷依據是副本落後於leader最大消息數量(replica.lag.max.messages)或rep licas響應partition leader的最長等待時間(replica.lag.time.max.ms)。前者是用來檢測緩慢的副本,然後者是用來檢測失效或死亡的副本。
注意:新版本中,replica.lag.max.messages已經廢棄。
例如:在mysql裏增刪改一條數據,對應出來了增刪改3條binlog,接着這三條binlog發送到MQ裏面,到消費出來依次執行,起碼得保證人家是按照順序來的吧?否則原本是:增長、修改、刪除;你楞是換了順序給執行成刪除、修改、增長,不全錯了麼。
原本這個數據同步過來,應該最後這個數據被刪除了;結果你搞錯了這個順序,最後這個數據保留下來了,數據同步就出錯了。
(1)rabbitmq:拆分多個queue,每一個queue一個consumer,就是多一些queue而已,確實是麻煩點;或者就一個queue可是對應一個consumer,而後這個consumer內部用內存隊列作排隊,而後分發給底層不一樣的worker來處理
(2)kafka:一個topic,一個partition,一個consumer,內部單線程消費,寫N個內存queue,而後N個線程分別消費一個內存queue便可
可能你的消費端出了問題,不消費了,或者消費的極其極其慢。致使消息隊列集羣的磁盤都快寫滿了,都沒人消費,這個時候怎麼辦?或者是整個這就積壓了幾個小時,你這個時候怎麼辦?或者是你積壓的時間太長了,致使好比rabbitmq設置了消息過時時間後就沒了怎麼辦?
因此若是你積壓了幾百萬到上千萬的數據,即便消費者恢復了,也須要大概1小時的時間才能恢復過來 通常這個時候,只能操做臨時緊急擴容了,具體操做步驟和思路以下:
1)先修復consumer的問題,確保其恢復消費速度,而後將現有cnosumer都停掉
2)新建一個topic,partition是原來的10倍,臨時創建好原先10倍或者20倍的queue數量
3)而後寫一個臨時的分發數據的consumer程序,這個程序部署上去消費積壓的數據,消費以後不作耗時的處理,直接均勻輪詢寫入臨時創建好的10倍數量的queue
4)接着臨時徵用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據
5)這種作法至關因而臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據
6)等快速消費完積壓數據以後,得恢復原先部署架構,從新用原先的consumer機器來消費消息
複製代碼
過時問題解決思路
假設你用的是rabbitmq,rabbitmq是能夠設置過時時間的,就是TTL,若是消息在queue中積壓超過必定的時間就會被rabbitmq給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在mq裏,而是大量的數據會直接搞丟。
這個狀況下,就不是說要增長consumer消費積壓的消息,由於實際上沒啥積壓,而是丟了大量的消息。咱們能夠採起一個方案,就是批量重導,這個咱們以前線上也有相似的場景幹過。就是大量積壓的時候,咱們當時就直接丟棄數據了,而後等過了高峯期之後,好比你們一塊兒喝咖啡熬夜到晚上12點之後,用戶都睡覺了。
這個時候咱們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,而後從新灌入mq裏面去,把白天丟的數據給他補回來。也只能是這樣了。
假設1萬個訂單積壓在mq裏面,沒有處理,其中1000個訂單都丟了,你只能手動寫程序把那1000個訂單給查出來,手動發到mq裏去再補一次
MQ磁盤爆滿解決思路
若是走的方式是消息積壓在mq裏,那麼若是你很長時間都沒處理掉,此時致使mq都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉全部的消息。而後走第二個方案,到了晚上再補數據。
mq支持可伸縮性。就是須要的時候快速擴容,就能夠增長吞吐量和容量,那怎麼搞?設計個分佈式的系統唄,參照一下kafka的設計理念,broker -> topic -> partition,每一個partition放一個機器,就存一部分數據。若是如今資源不夠了,簡單啊,給topic增長partition,而後作數據遷移,增長機器,不就能夠存放更多數據,提供更高的吞吐量了?
mq支持數據落地磁盤。落磁盤,才能保證別進程掛了數據就丟了。那落磁盤的時候怎麼落啊?順序寫,這樣就沒有磁盤隨機讀寫的尋址開銷,磁盤順序讀寫的性能是很高的,這就是kafka的思路。
mq支持可用性。採用多副本機制 -> leader & follower -> broker掛了,從新選舉leader便可對外服務。
mq支持數據0丟失,參考以前說的那個kafka數據零丟失方案
結合大數據在咱們工業大數據平臺的實踐,總結成一篇實踐指南,方便之後查閱反思,後續我會根據本篇博客進行代碼技術實踐實現。
秦凱新 於鄭州 201903022307