RocketMQ 一行代碼形成大量消息丟失

一、問題現象java


首先接到項目反饋使用 RocketMQ 會出現以下錯誤:
web


錯誤信息關鍵點:MQBrokerException:CODE:2 DESC:[TIMEOUT_CLEAN_QUEUE]broker busy,start flow control for a while,period in queue:205ms,size of queue:880。

因爲項目組並無對消息發送失敗作任何補償,致使丟失消息發送失敗,故須要對這個問題進行深層次的探討,並加以解決。數據庫

二、問題分析


首先咱們根據關鍵字:TIMEOUT_CLEAN_QUEUE 去 RocketMQ 中查詢,去探究在何時會拋出如上錯誤。根據全文搜索以下圖所示:
微信


該方法是在 BrokerFastFailure 中定義的,經過名稱便可以當作其設計目的:Broker端快速失敗機制。

Broker 端快速失敗其原理圖以下:
網絡

  • 消息發送者向 Broker 發送消息寫入請求,Broker 端在接收到請求後會首先放入一個隊列中(SendThreadPoolQueue),默認容量爲 10000。app

  • Broker 會專門使用一個線程池(SendMessageExecutor)去從隊列中獲取任務並執行消息寫入請求,爲了保證消息的順序處理,該線程池默認線程個數爲1。編輯器

若是 Broker 端受到垃圾回收等等因素形成單條寫入數據發生抖動,單個 Broker 端積壓的請求太多從而得不到及時處理,會極大的形成客戶端消息發送的時間延長。函數

設想一下,若是因爲 Broker 壓力增大,寫入一條消息須要500ms甚至超過1s,而且隊列中積壓了5000條消息,消息發送端的默認超時時間爲3s,若是按照這樣的速度,這些請求在輪到 Broker 執行寫入請求時,客戶端已經將這個請求超時了,這樣不只會形成大量的無效處理,還會致使客戶端發送超時。源碼分析

故 RocketMQ 爲了解決該問題,引入 Broker 端快速失敗機制,即開啓一個定時調度線程,每隔10毫秒去檢查隊列中的第一個排隊節點,若是該節點的排隊時間已經超過了 200ms,就會取消該隊列中全部已超過 200ms 的請求,當即向客戶端返回失敗,這樣客戶端能儘快進行重試,由於 Broker 都是集羣部署,下次重試能夠發送到其餘 Broker 上,這樣能最大程度保證消息發送在默認 3s 的時間內通過重試機制,能有效避免某一臺 Broker 因爲瞬時壓力大而形成的消息發送不可用,從而實現消息發送的高可用。ui

從 Broker 端快速失敗機制引入的初衷來看,快速失敗後會發起重試,除非同一時刻集羣內全部的 Broker 都繁忙,否則消息會發送成功,用戶是不會感知這個錯誤的,那爲何用戶感知了呢?難道 TIMEOUT_ CLEAN _ QUEUE 錯誤,Broker 不重試?

爲了解開這個謎團,接下來會採用源碼分析的手段去探究真相。接下來將以消息同步發送爲例揭示其消息發送處理流程中的核心關鍵點。

MQ Client 消息發送端首先會利用網絡通道將請求發送到 Broker,而後接收到請求結果後並調用 processSendResponse 方法對響應結果進行解析,以下圖所示:

在這裏返回的 code 爲 RemotingSysResponseCode . SYSTEM_BUSY。

咱們從 proccessSendResponse 方法中能夠得知若是 code 爲 SYSTEM_BUSY,該方法會拋出 MQBrokerException,響應 code 爲 SYSTEM_BUSY,其錯誤描述爲開頭部分的錯誤信息。

那咱們沿着該方法的調用鏈路,能夠找到其直接調用方:DefaultMQProducerImpl 的 sendKernelImpl,咱們重點考慮若是底層方法拋出  MQBrokerException 該方法會如何處理。

其關鍵代碼以下圖所示:

能夠看出在 sendKernelImpl 方法中首先會捕捉異常,先執行註冊的鉤子函數,即就算執行失敗,對應的消息發送後置鉤子函數也會執行,而後再原封不動的將該異常向上拋出。

sendKernelImpl 方法被 DefaultMQProducerImpl 的 sendDefaultImpl 方法調用,下面是其核心實現截圖:

從這裏能夠看出 RocketMQ 消息發送高可用設計一個很是關鍵的點,重試機制,其實現是在 for 循環中 使用 try catch 將 sendKernelImpl 方法包裹,就能夠保證該方法拋出異常後能繼續重試。從上文可知,若是 SYSTEM_BUSY 會拋出 MQBrokerException,但發現只有上述幾個錯誤碼纔會重試,由於若是不是上述錯誤碼,會繼續向外拋出異常,此時 for 循環會被中斷,即不會重試。

這裏很是使人意外的是連 SYSTEM_ERROR 都會重試,卻沒有包含 SYSTEM_BUSY,顯然違背了快速失敗的設計初衷,故筆者判定,這是 RocketMQ 的一個BUG,將 SYSTEM_BUSY 遺漏了,後續會提一個 PR,增長一行代碼,將 SYSTEM_BUSY 加上便可。

問題分析到這裏,該問題應該就很是明瞭。

三、解決方案


若是你們在網上搜索 TIMEOUT_CLEAN_QUEUE 的解決方法,你們不約而同提出的解決方案是增長 waitTimeMillsInSendQueue 的值,該值默認爲 200ms,例如將其設置爲 1000s 等等,之前我是反對的,由於個人認知裏 Broker 會重試,但如今發現 Broker 不會重試,因此我如今認爲該 BUG未解決的狀況下適當提升該值能有效的緩解。

但這是並非好的解決方案,我會在近期向官方提交一個PR,將這個問題修復,建議你們在公司儘可能對本身使用的版本進行修改,從新打一個包便可,由於這已經違背了 Broker 端快速失敗的設計初衷。

但在消息發送的業務方,儘可能本身實現消息的重試機制,即不依賴 RocketMQ 自己提供的重試機制,由於受制於網絡等因素,消息發送不可能百分之百成功,建議你們在消息發送時捕獲一下異常,若是發送失敗,能夠將消息存入數據庫,再結合定時任務對消息進行重試,盡最大程度保證消息不丟失。



本文分享自微信公衆號 - 肥朝(feichao_java)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索