消息隊列常見問題和解決方案

<div id="content_views" class="markdown_views prism-atom-one-dark"> <!-- flowchart 箭頭圖標 勿刪 --> <svg xmlns="http://www.w3.org/2000/svg" style="display: none;"><path stroke-linecap="round" d="M5,0 0,2.5 5,5z" id="raphael-marker-block" style="-webkit-tap-highlight-color: rgba(0, 0, 0, 0);"></path></svg> <p>說明:此文是筆者對中華石衫老師對消息隊列講解的一篇總結包括筆者本身的一些理解</p> <h2><a name="t0"></a><a id="_2" target="_blank"></a>1、爲何使用消息隊列?</h2> <p>消息隊列使用的場景和中間件有不少,但解決的核心問題主要是:異步、解耦、消峯填谷。</p> <h2><a name="t1"></a><a id="_7" target="_blank"></a>2、消息隊列的優缺點</h2> <p>異步、解耦、消峯填谷這是消息隊列最大的優勢,除了這些消息隊列還能夠會解決一些咱們特殊業務場景的問題。可是缺點主要在於系統的可用性、複雜性、一致性問題,引入消息隊列後,須要考慮MQ的可用性,萬一MQ崩潰了豈不是要爆炸?並且複雜性明顯提升了,須要考慮一些消息隊列的常見問題和解決方案,還有就是一致性問題,一條消息由多個消費者消費,萬一有一個消費者消費失敗了,就會致使數據不一致。</p> <h2><a name="t2"></a><a id="_12" target="_blank"></a>3、消息隊列選型</h2> <p>目前常見和使用普遍的MQ有<a href="https://www.baidu.com/s?wd=ActiveMQ&amp;tn=24004469_oem_dg&amp;rsv_dl=gh_pl_sl_csd" target="_blank">ActiveMQ</a>、RabbitMQ、RocketMQ、Kakfa,其特性以下:<br>   <img src="https://img-blog.csdn.net/20180723203239485?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM2MjM2ODkw/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70" alt="mq對比"><br>   我的總結:<br>   ActiveMQ早期用的比較多,可是如今貌似用的都不是不少了,網上也沒有大規模吞吐量的使用案例分析,社區也貌似不是很活躍了,若是是新項目不建議採用ActiveMQ。</p> <p>RabbitMQ如今使用的較爲多一些,社區活躍度也很高,功能也很強大,官方還提供了管理的web界面,性能也很好,可是RabbitMQ性能好的主要緣由是由於使用erlang語言開發的,erlang語言貌似天生性能好,但對於咱們java開發者來講,源碼基本看不懂,更別提深刻的研究了,不過spring推出了rabbit的支持,貌似還比較好用,比本身去封裝實現而且去處理一些問題的要好多了。</p> <p>RocketMQ如今開始用的人也比較多,不少人對於RocketMQ的見解是集成了Kafka和RabbitMQ的有點,是阿里開源的產品,貌似如今是捐贈給了Apache,其源碼是java寫的,功能十分強大而且是通過阿里大規模應用的,能通過阿里實踐使用的通常來講可靠性和可用性都是至關高的,可是也存在一些小問題,如今RocketMQ雖然使用的人好像愈來愈多了,可是文檔資料仍是比較少,含金量不怎麼高,而且阿里開源的有不維護的風險,就像dubbo中間也用2年沒維護,有實力的團隊應該沒有什麼問題,小公司小團隊須要考慮一下使用RocketMQ。</p> <p>Kafka就很少說了,Kafka能夠說是業內標準,基本上大數據領域的實時計算、日誌、數據處理都是用kafka,開源社區異常活躍,並且像如今<a href="https://www.baidu.com/s?wd=%E9%98%BF%E9%87%8C%E4%BA%91&amp;tn=24004469_oem_dg&amp;rsv_dl=gh_pl_sl_csd" target="_blank">阿里雲</a>、騰訊雲都推出了Kafka的雲服務,因此說Kafka就不說了,絕對沒問題,放心大膽的用吧。</p> <p>最後給一個我的選型意見(不必定對啊),若是是小公司小團隊最好採用Kafka和RabbitMQ,有實力的團隊能夠去搞一搞RocketMQ。</p> <h2><a name="t3"></a><a id="_28" target="_blank"></a>4、如何保證消息隊列的高可用性</h2> <p>因爲筆者只使用和實踐過RabbitMQ和Kafka,RocketMQ和ActiveMQ瞭解的不深,因此分析一下RabbitMQ和Kafka的高可用。</p> <h2><a name="t4"></a><a id="RabbitMQ_33" target="_blank"></a>(一)RabbitMQ</h2> <p>RabbitMQ有三種模式:單機模式,普通集羣模式,鏡像集羣模式</p> <p><strong>(1)單機模式</strong></p> <p>單機模式日常使用在開發或者本地測試場景,通常就是測試是否是可以正確的處理消息,生產上基本沒人去用單機模式,風險很大。</p> <p><strong>(2)普通集羣模式</strong></p> <p>普通集羣模式就是啓動多個RabbitMQ實例。在你建立的queue,只會放在一個rabbtimq實例上,可是每一個實例都同步queue的元數據。在消費的時候完了,上若是鏈接到了另一個實例,那麼那個實例會從queue所在實例上拉取數據過來。</p> <p>這種方式確實很麻煩,也不怎麼好,沒作到所謂的分佈式,就是個普通集羣。由於這致使你要麼消費者每次隨機鏈接一個實例而後拉取數據,要麼固定鏈接那個queue所在實例消費數據,前者有數據拉取的開銷,後者致使單實例性能瓶頸。</p> <p>並且若是那個放queue的實例宕機了,會致使接下來其餘實例就沒法從那個實例拉取,若是你開啓了消息持久化,讓RabbitMQ落地存儲消息的話,消息不必定會丟,得等這個實例恢復了,而後才能夠繼續從這個queue拉取數據。</p> <p>這方案主要是提升吞吐量的,就是說讓集羣中多個節點來服務某個queue的讀寫操做。</p> <p><strong>(3)鏡像集羣模式</strong></p> <p>鏡像集羣模式是所謂的RabbitMQ的高可用模式,跟普通集羣模式不同的是,你建立的queue,不管元數據仍是queue裏的消息都會存在於多個實例上,而後每次你寫消息到queue的時候,都會自動把消息到多個實例的queue裏進行消息同步。</p> <p>優勢在於你任何一個實例宕機了,<a href="https://www.baidu.com/s?wd=%E6%B2%A1%E4%BA%8B%E5%84%BF&amp;tn=24004469_oem_dg&amp;rsv_dl=gh_pl_sl_csd" target="_blank">沒事兒</a>,別的實例均可以用。缺點在於性能開銷太大和擴展性很低,同步全部實例,這會致使網絡帶寬和壓力很重,並且擴展性很低,每增長一個實例都會去包含已有的queue的全部數據,並無辦法線性擴展queue。</p> <p>開啓鏡像集羣模式能夠去RabbitMQ的管理控制檯去增長一個策略,指定要求數據同步到全部節點的,也能夠要求就同步到指定數量的節點,而後你再次建立queue的時候,應用這個策略,就會自動將數據同步到其餘的節點上去了。</p> <h2><a name="t5"></a><a id="Kafka_60" target="_blank"></a>(二)Kafka</h2> <p>Kafka天生就是一個分佈式的消息隊列,它能夠由多個broker組成,每一個broker是一個節點;你建立一個topic,這個topic能夠劃分爲多個partition,每一個partition能夠存在於不一樣的broker上,每一個partition就放一部分數據。</p> <p>kafka 0.8之前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,無法寫也無法讀,沒有什麼高可用性可言。</p> <p>kafka 0.8之後,提供了HA機制,就是replica副本機制。kafka會均勻的將一個partition的全部replica分佈在不一樣的機器上,來提升容錯性。每一個partition的數據都會同步到吉他機器上,造成本身的多個replica副本。而後全部replica會選舉一個leader出來,那麼生產和消費都去leader,其餘replica就是follower,leader會同步數據給follower。當leader掛了會自動去找replica,而後會再選舉一個leader出來,這樣就具備高可用性了。</p> <p>寫數據的時候,生產者就寫leader,而後leader將數據落地寫本地磁盤,接着其餘follower本身主動從leader來pull數據。一旦全部follower同步好數據了,就會發送ack給leader,leader收到全部follower的ack以後,就會返回寫成功的消息給生產者。(固然,這只是其中一種模式,還能夠適當調整這個行爲)</p> <p>消費的時候,只會從leader去讀,可是隻有一個消息已經被全部follower都同步成功返回ack的時候,這個消息纔會被消費者讀到。</p> <h2><a name="t6"></a><a id="_73" target="_blank"></a>5、如何保證消息消費時的冪等性</h2> <p>其實消息重複消費的主要緣由在於回饋機制(RabbitMQ是ack,Kafka是offset),在某些場景中咱們採用的回饋機制不一樣,緣由也不一樣,例如消費者消費完消息後回覆ack, 可是剛消費完還沒來得及提交系統就重啓了,這時候上來就pull消息的時候因爲沒有提交ack或者offset,消費的仍是上條消息。</p> <p>那麼如何怎麼來保證消息消費的冪等性呢?實際上咱們只要保證多條相同的數據過來的時候只處理一條或者說多條處理和處理一條形成的結果相同便可,可是具體怎麼作要根據業務需求來定,例如入庫消息,先查一下消息是否已經入庫啊或者說搞個惟一約束啊什麼的,還有一些是天生保證冪等性就根本不用去管,例如redis就是自然冪等性。</p> <p>還有一個問題,消費者消費消息的時候在某些場景下要放過消費不了的消息,遇到消費不了的消息經過日誌記錄一下或者搞個什麼措施之後再來處理,可是必定要放過消息,由於在某些場景下例如spring-rabbitmq的默認回饋策略是出現異常就沒有提交ack,致使了一直在重發那條消費異常的消息,並且一直還消費不了,這就尷尬了,後果你會懂的。</p> <h2><a name="t7"></a><a id="_82" target="_blank"></a>6、如何保證消息的可靠性傳輸?</h2> <p>因爲筆者只使用和實踐過RabbitMQ和Kafka,RocketMQ和ActiveMQ瞭解的不深,因此分析一下RabbitMQ和Kafka的消息可靠性傳輸的問題。、</p> <h2><a name="t8"></a><a id="RabbitMQ_87" target="_blank"></a>(一)RabbitMQ</h2> <p><strong>(1)生產者弄丟了數據</strong><br>   生產者將數據發送到RabbitMQ的時候,可能數據就在半路給搞丟了,由於網絡啥的問題,都有可能。此時能夠選擇用RabbitMQ提供的事務功能,就是生產者發送數據以前開啓RabbitMQ事務(channel.txSelect),而後發送消息,若是消息沒有成功被RabbitMQ接收到,那麼生產者會收到異常報錯,此時就能夠回滾事務(channel.txRollback),而後重試發送消息;若是收到了消息,那麼能夠提交事務(channel.txCommit)。可是問題是,RabbitMQ事務機制一搞,基本上吞吐量會下來,由於太耗性能。</p> <p>因此通常來講,若是你要確保說寫RabbitMQ的消息別丟,能夠開啓confirm模式,在生產者那裏設置開啓confirm模式以後,你每次寫的消息都會分配一個惟一的id,而後若是寫入了RabbitMQ中,RabbitMQ會給你回傳一個ack消息,告訴你說這個消息ok了。若是RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你能夠重試。並且你能夠結合這個機制本身在內存裏維護每一個消息id的狀態,若是超過必定時間還沒接收到這個消息的回調,那麼你能夠重發。</p> <p>事務機制和cnofirm機制最大的不一樣在於,事務機制是同步的,你提交一個事務以後會阻塞在那兒,可是confirm機制是異步的,你發送個消息以後就能夠發送下一個消息,而後那個消息RabbitMQ接收了以後會異步回調你一個接口通知你這個消息接收到了。</p> <p>因此通常在生產者這塊避免數據丟失,都是用confirm機制的。</p> <p><strong>(2)RabbitMQ弄丟了數據</strong></p> <p>就是RabbitMQ本身弄丟了數據,這個你必須開啓RabbitMQ的持久化,就是消息寫入以後會持久化到磁盤,哪怕是RabbitMQ本身掛了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。除非極其罕見的是,RabbitMQ還沒持久化,本身就掛了,可能致使少許數據會丟失的,可是這個機率較小。</p> <p>設置持久化有兩個步驟,第一個是建立queue的時候將其設置爲持久化的,這樣就能夠保證RabbitMQ持久化queue的元數據,可是不會持久化queue裏的數據;第二個是發送消息的時候將消息的deliveryMode設置爲2,就是將消息設置爲持久化的,此時RabbitMQ就會將消息持久化到磁盤上去。必需要同時設置這兩個持久化才行,RabbitMQ哪怕是掛了,再次重啓,也會從磁盤上重啓恢復queue,恢復這個queue裏的數據。</p> <p>並且持久化能夠跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤以後,纔會通知生產者ack了,因此哪怕是在持久化到磁盤以前,RabbitMQ掛了,數據丟了,生產者收不到ack,你也是能夠本身重發的。</p> <p>哪怕是你給RabbitMQ開啓了持久化機制,也有一種可能,就是這個消息寫到了RabbitMQ中,可是還沒來得及持久化到磁盤上,結果不巧,此時RabbitMQ掛了,就會致使內存裏的一點點數據會丟失。</p> <p><strong>(3)消費端弄丟了數據</strong></p> <p>RabbitMQ若是丟失了數據,主要是由於你消費的時候,剛消費到,還沒處理,結果進程掛了,好比重啓了,那麼就尷尬了,RabbitMQ認爲你都消費了,這數據就丟了。</p> <p>這個時候得用RabbitMQ提供的ack機制,簡單來講,就是你關閉RabbitMQ自動ack,能夠經過一個api來調用就行,而後每次你本身代碼裏確保處理完的時候,再程序裏ack一把。這樣的話,若是你還沒處理完,不就沒有ack?那RabbitMQ就認爲你還沒處理完,這個時候RabbitMQ會把這個消費分配給別的consumer去處理,消息是不會丟的。</p> <h2><a name="t9"></a><a id="Kafka_115" target="_blank"></a>(二)Kafka</h2> <p><strong>(1)消費端弄丟了數據</strong></p> <p>惟一可能致使消費者弄丟數據的狀況,就是說,你那個消費到了這個消息,而後消費者那邊自動提交了offset,讓kafka覺得你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你本身就掛了,此時這條消息就丟咯。</p> <p>你們都知道kafka會自動提交offset,那麼只要關閉自動提交offset,在處理完以後本身手動提交offset,就能夠保證數據不會丟。可是此時確實仍是會重複消費,好比你剛處理完,還沒提交offset,結果本身掛了,此時確定會重複消費一次,本身保證冪等性就行了。</p> <p>生產環境碰到的一個問題,就是說咱們的kafka消費者消費到了數據以後是寫到一個內存的queue裏先緩衝一下,結果有的時候,你剛把消息寫入內存queue,而後消費者會自動提交offset。</p> <p>而後此時咱們重啓了系統,就會致使內存queue裏還沒來得及處理的數據就丟失了</p> <p><strong>(2)kafka弄丟了數據</strong></p> <p>這塊比較常見的一個場景,就是kafka某個broker宕機,而後從新選舉partiton的leader時。你們想一想,要是此時其餘的follower恰好還有些數據沒有同步,結果此時leader掛了,而後選舉某個follower成leader以後,他不就少了一些數據?這就丟了一些數據啊。</p> <p>生產環境也遇到過,咱們也是,以前kafka的leader機器宕機了,將follower切換爲leader以後,就會發現說這個數據就丟了。</p> <p>因此此時通常是要求起碼設置以下4個參數:</p> <ul> <li>給這個topic設置replication.factor參數:這個值必須大於1,要求每一個partition必須有至少2個副本。</li> <li>在kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟本身保持聯繫,沒掉隊,這樣才能確保leader掛了還有一個follower吧。</li> <li>在producer端設置acks=all:這個是要求每條數據,必須是寫入全部replica以後,才能認爲是寫成功了。</li> <li>在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裏了。</li> </ul> <p><strong>(3)生產者會不會弄丟數據</strong></p> <p>若是按照上述的思路設置了ack=all,必定不會丟,要求是,你的leader接收到消息,全部的follower都同步到了消息以後,才認爲本次寫成功了。若是沒知足這個條件,生產者會自動不斷的重試,重試無限次。</p> <h2><a name="t10"></a><a id="_145" target="_blank"></a>6、如何保證消息的順序性</h2> <p>由於在某些狀況下咱們扔進MQ中的消息是要嚴格保證順序的,尤爲涉及到訂單什麼的業務需求,消費的時候也是要嚴格保證順序,否則會出大問題的。</p> <p>先看看順序會錯亂的倆場景</p> <ol> <li>rabbitmq:一個queue,多個consumer,這不明顯亂了</li> <li>kafka:一個topic,一個partition,一個consumer,內部多線程,這不也明顯亂了<br>   如何來保證消息的順序性呢?</li> <li>rabbitmq:拆分多個queue,每一個queue一個consumer,就是多一些queue而已,確實是麻煩點;或者就一個queue可是對應一個consumer,而後這個consumer內部用內存隊列作排隊,而後分發給底層不一樣的worker來處理。</li> <li>kafka:一個topic,一個partition,一個consumer,內部單線程消費,寫N個內存queue,而後N個線程分別消費一個內存queue便可。</li> </ol> <h2><a name="t11"></a><a id="_157" target="_blank"></a>7、如何解決消息隊列的延時以及過時失效問題?消息隊列滿了之後該怎麼處理?有幾百萬消息持續積壓幾小時怎麼解決?</h2> <h2><a name="t12"></a><a id="mq_160" target="_blank"></a>(一)、大量消息在mq裏積壓了幾個小時了還沒解決</h2> <p>幾千萬條數據在MQ裏積壓了七八個小時,從下午4點多,積壓到了晚上很晚,10點多,11點多<br> 這個是咱們真實遇到過的一個場景,確實是線上故障了,這個時候要否則就是修復consumer的問題,讓他恢復消費速度,而後傻傻的等待幾個小時消費完畢。這個確定不能在面試的時候說吧。</p> <p>一個消費者一秒是1000條,一秒3個消費者是3000條,一分鐘是18萬條,1000多萬條,因此若是你積壓了幾百萬到上千萬的數據,即便消費者恢復了,也須要大概1小時的時間才能恢復過來。</p> <p>通常這個時候,只能操做臨時緊急擴容了,具體操做步驟和思路以下:</p> <ol> <li>先修復consumer的問題,確保其恢復消費速度,而後將現有cnosumer都停掉。</li> <li>新建一個topic,partition是原來的10倍,臨時創建好原先10倍或者20倍的queue數量。</li> <li>而後寫一個臨時的分發數據的consumer程序,這個程序部署上去消費積壓的數據,消費以後不作耗時的處理,直接均勻輪詢寫入臨時創建好的10倍數量的queue。</li> <li>接着臨時徵用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據。</li> <li>這種作法至關因而臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據。</li> <li>等快速消費完積壓數據以後,得恢復原先部署架構,從新用原先的consumer機器來消費消息。</li> </ol> <h2><a name="t13"></a><a id="_177" target="_blank"></a>(二)、消息隊列過時失效問題</h2> <p>假設你用的是rabbitmq,rabbitmq是能夠設置過時時間的,就是TTL,若是消息在queue中積壓超過必定的時間就會被rabbitmq給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在mq裏,而是大量的數據會直接搞丟。</p> <p>這個狀況下,就不是說要增長consumer消費積壓的消息,由於實際上沒啥積壓,而是丟了大量的消息。咱們能夠採起一個方案,就是批量重導,這個咱們以前線上也有相似的場景幹過。就是大量積壓的時候,咱們當時就直接丟棄數據了,而後等過了高峯期之後,好比你們一塊兒喝咖啡熬夜到晚上12點之後,用戶都睡覺了。</p> <p>這個時候咱們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,而後從新灌入mq裏面去,把白天丟的數據給他補回來。也只能是這樣了。</p> <p>假設1萬個訂單積壓在mq裏面,沒有處理,其中1000個訂單都丟了,你只能手動寫程序把那1000個訂單給查出來,手動發到mq裏去再補一次。</p> <h2><a name="t14"></a><a id="_188" target="_blank"></a>(三)、消息隊列滿了怎麼搞?</h2> <p>若是走的方式是消息積壓在mq裏,那麼若是你很長時間都沒處理掉,此時致使mq都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉全部的消息。而後走第二個方案,到了晚上再補數據吧。</p> </div>java

相關文章
相關標籤/搜索