RocketMQ 消息發送system busy、broker busy緣由分析與解決方案

@(本節目錄)git

一、現象

最近收到不少RocketMQ使用者,反饋生產環境中在消息發送過程當中偶爾會出現以下4個錯誤信息之一:
1)[REJECTREQUEST]system busy, start flow control for a while
2)too many requests and system thread pool busy, RejectedExecutionException
3)[PC_SYNCHRONIZED]broker busy, start flow control for a while
4)[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %dgithub

二、原理解讀

在進行消息中間件的選型時,若是待選中間件在功能上、性能上都能知足業務的狀況下,建議把中間件的實現語言這個因素也考慮進去,畢竟選擇一門用本身擅長的語言實現的中間件會更具掌控性。在出現異常的狀況下,咱們能夠根據本身的經驗提取錯誤信息關鍵字system busy,在RocketMQ源碼中直接搜索,獲得拋出上述錯誤信息的代碼以下:
在這裏插入圖片描述
其代碼入口爲:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand。從圖中能夠看出,拋出上述錯誤的關鍵緣由是:pair.getObject1().rejectRequest()和拋出RejectedExecutionException異常。apache

備註:本文偏實戰,源碼只是做爲分析的重點證據,故本文只會點出關鍵源碼,並不會詳細跟蹤其整個實現流程,若是想詳細瞭解其實現,能夠查閱筆者編著的《RocketMQ技術內幕》。json

2.1 RocketMQ 網絡處理機制概述

RocketMQ的網絡設計很是值得咱們學習與借鑑,首先在客戶端端將不一樣的請求定義不一樣的請求命令CODE,服務端會將客戶端請求進行分類,每一個命令或每類請求命令定義一個處理器(NettyRequestProcessor),而後每個NettyRequestProcessor綁定到一個單獨的線程池,進行命令處理,不一樣類型的請求將使用不一樣的線程池進行處理,實現線程隔離。服務器

爲了方便下文的描述,咱們先簡單的認識一下NettyRequestProcessor、Pair、RequestCode。其核心關鍵點以下:
在這裏插入圖片描述網絡

  1. NettyRequestProcessor
    RocketMQ 服務端請求處理器,例如SendMessageProcessor是消息發送處理器、PullMessageProcessor是消息拉取命令處理器。
  2. RequestCode
    請求CODE,用來區分請求的類型,例如SEND_MESSAGE:表示該請求爲消息發送,PULL_MESSAGE:消息拉取請求。
  3. Pair
    用來封裝NettyRequestProcessor與ExecuteService的綁定關係。在RocketMQ的網絡處理模型中,會爲每個NettyRequestProcessor與特定的線程池綁定,全部該NettyRequestProcessor的處理邏輯都在該線程池中運行。

2.2 pair.getObject1().rejectRequest()

因爲讀者朋友提出的問題,都是發生在消息發送過程當中,故本文重點關注SendMessageProcessor#rejectRequest方法。
SendMessageProcessor#rejectRequest數據結構

public boolean rejectRequest() {
    return this.brokerController.getMessageStore().isOSPageCacheBusy() ||               // @1
        this.brokerController.getMessageStore().isTransientStorePoolDeficient();        // @2
}

拒絕請求的條件有兩個,只要其中任意一個知足,則返回true。架構

代碼@1:Os PageCache busy,判斷操做系統PageCache是否繁忙,若是忙,則返回true。想必看到這裏你們確定與我同樣好奇,RocketMQ是如何判斷pageCache是否繁忙呢?下面會重點分析。併發

代碼@2:transientStorePool是否不足。

2.2.1 isOSPageCacheBusy()

DefaultMessageStore#isOSPageCacheBusy()

public boolean isOSPageCacheBusy() {
    long begin = this.getCommitLog().getBeginTimeInLock();  // @1 start
    long diff = this.systemClock.now() - begin;                         // @1  end

    return diff < 10000000
                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();     // @2
}

代碼@1:先重點解釋begin、diff兩個局部變量的含義:

  • begin
    通俗的一點講,就是將消息寫入Commitlog文件所持有鎖的時間,精確說是將消息體追加到內存映射文件(DirectByteBuffer)或pageCache(FileChannel#map)該過程當中開始持有鎖的時間戳,具體的代碼請參考:CommitLog#putMessage。
  • diff
    一次消息追加過程當中持有鎖的總時長,即往內存映射文件或pageCache追加一條消息所耗時間。

代碼@2:若是一次消息追加過程的時間超過了Broker配置文件osPageCacheBusyTimeOutMills,則認爲pageCache繁忙,osPageCacheBusyTimeOutMills默認值爲1000,表示1s。

2.2.2 isTransientStorePoolDeficient()

DefaultMessageStore#isTransientStorePoolDeficient

public boolean isTransientStorePoolDeficient() {
    return remainTransientStoreBufferNumbs() == 0;
}
public int remainTransientStoreBufferNumbs() {
    return this.transientStorePool.remainBufferNumbs();
}

最終調用TransientStorePool#remainBufferNumbs方法。

public int remainBufferNumbs() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
}

若是啓用transientStorePoolEnable機制,返回當前可用的ByteBuffer個數,即整個isTransientStorePoolDeficient方法的用意是是否還存在可用的ByteBuffer,若是不存在,即表示pageCache繁忙。那什麼是transientStorePoolEnable機制呢?

2.3 漫談transientStorePoolEnable機制

Java NIO的內存映射機制,提供了將文件系統中的文件映射到內存機制,實現對文件的操做轉換對內存地址的操做,極大的提升了IO特性,但這部份內存並非常駐內存,能夠被置換到交換內存(虛擬內存),RocketMQ爲了提升消息發送的性能,引入了內存鎖定機制,即將最近須要操做的commitlog文件映射到內存,並提供內存鎖定功能,確保這些文件始終存在內存中,該機制的控制參數就是transientStorePoolEnable。

2.3.1 MappedFile

重點關注MappedFile的ByteBuffer writeBuffer、MappedByteBuffer mappedByteBuffer這兩個屬性的初始化,由於這兩個方法是寫消息與查消息操做的直接數據結構。
在這裏插入圖片描述
兩個關鍵點以下:

  • ByteBuffer writeBuffer
    若是開啓了transientStorePoolEnable,則使用ByteBuffer.allocateDirect(fileSize),建立(java.nio的內存映射機制)。若是未開啓,則爲空。
  • MappedByteBuffer mappedByteBuffer
    使用FileChannel#map方法建立,即真正意義上的PageCache。

消息寫入時:
MappedFile#appendMessagesInner
在這裏插入圖片描述
從中可見,在消息寫入時,若是writerBuffer不爲空,說明開啓了transientStorePoolEnable機制,則消息首先寫入writerBuffer中,若是其爲空,則寫入mappedByteBuffer中。

消息拉取(讀消息):
MappedFile#selectMappedBuffer
在這裏插入圖片描述
消息讀取時,是從mappedByteBuffer中讀(pageCache)。

你們是否是發現了一個有趣的點,若是開啓transientStorePoolEnable機制,是否是有了讀寫分離的效果,先寫入writerBuffer中,讀倒是從mappedByteBuffer中讀取。

爲了對transientStorePoolEnable引入意圖闡述的更加明白,這裏我引入Rocketmq社區貢獻者胡宗棠關於此問題的看法。

一般有以下兩種方式進行讀寫:

  1. 第一種,Mmap+PageCache的方式,讀寫消息都走的是pageCache,這樣子讀寫都在pagecache裏面不可避免會有鎖的問題,在併發的讀寫操做狀況下,會出現缺頁中斷下降,內存加鎖,污染頁的回寫。
  2. 第二種,DirectByteBuffer(堆外內存)+PageCache的兩層架構方式,這樣子能夠實現讀寫消息分離,寫入消息時候寫到的是DirectByteBuffer——堆外內存中,讀消息走的是PageCache(對於,DirectByteBuffer是兩步刷盤,一步是刷到PageCache,還有一步是刷到磁盤文件中),帶來的好處就是,避免了內存操做的不少容易堵的地方,下降了時延,好比說缺頁中斷下降,內存加鎖,污染頁的回寫。

舒適提示:若是想與胡宗棠大神進一步溝通交流,能夠關注他的github帳號:https://github.com/zongtanghu

不知道你們會不會有另一個擔心,若是開啓了transientStorePoolEnable,內存鎖定機制,那是否是隨着commitlog文件的不斷增長,最終致使內存溢出?

2.3.2 TransientStorePool初始化

在這裏插入圖片描述
從這裏能夠看出,TransientStorePool默認會初始化5個DirectByteBuffer(對外內存),並提供內存鎖定功能,即這部份內存不會被置換,能夠經過transientStorePoolSize參數控制。

在消息寫入消息時,首先從池子中獲取一個DirectByteBuffer進行消息的追加。當5個DirectByteBuffer所有寫滿消息後,該如何處理呢?從RocketMQ的設計中來看,同一時間,只會對一個commitlog文件進行順序寫,寫完一個後,繼續建立一個新的commitlog文件。故TransientStorePool的設計思想是循環利用這5個DirectByteBuffer,只須要寫入到DirectByteBuffer的內容被提交到PageCache後,便可重複利用。對應的代碼以下:
TransientStorePool#returnBuffer

public void returnBuffer(ByteBuffer byteBuffer) {
    byteBuffer.position(0);
    byteBuffer.limit(fileSize);
    this.availableBuffers.offerFirst(byteBuffer);
}

其調用棧以下:
在這裏插入圖片描述
從上面的分析看來,並不會隨着消息的不斷寫入而致使內存溢出。

三、現象解答

3.1 [REJECTREQUEST]system busy

在這裏插入圖片描述
其拋出的源碼入口點:NettyRemotingAbstract#processRequestCommand,上面的原理分析部分已經詳細介紹其實現原理,總結以下。

在不開啓transientStorePoolEnable機制時,若是Broker PageCache繁忙時則拋出上述錯誤,判斷PageCache繁忙的依據就是向PageCache追加消息時,若是持有鎖的時間超過1s,則會拋出該錯誤;在開啓transientStorePoolEnable機制時,其判斷依據是若是TransientStorePool中不存在可用的堆外內存時拋出該錯誤。

3.2 too many requests and system thread pool busy, RejectedExecutionException

在這裏插入圖片描述
其拋出的源碼入口點:NettyRemotingAbstract#processRequestCommand,其調用地方緊跟3.1,是在向線程池執行任務時,被線程池拒絕執行時拋出的,咱們能夠順便看看Broker消息處理髮送的線程信息:
BrokerController#registerProcessor
在這裏插入圖片描述
該線程池的隊列長度默認爲10000,咱們能夠經過sendThreadPoolQueueCapacity來改變默認值。

3.3 [PC_SYNCHRONIZED]broker busy

在這裏插入圖片描述
其拋出的源碼入口點:DefaultMessageStore#putMessage,在進行消息追加時,再一次判斷PageCache是否繁忙,若是繁忙,則拋出上述錯誤。

3.4 broker busy, period in queue: %sms, size of queue: %d

在這裏插入圖片描述
其拋出源碼的入口點:BrokerFastFailure#cleanExpiredRequest。該方法的調用頻率爲每隔10s中執行一次,不過有一個執行前提條件就是Broker端要開啓快速失敗,默認爲開啓,能夠經過參數brokerFastFailureEnable來設置。該方法的實現要點是每隔10s,檢測一次,若是檢測到PageCache繁忙,而且發送隊列中還有排隊的任務,則直接再也不等待,直接拋出系統繁忙錯誤,使正在排隊的線程快速失敗,結束等待。

四、實踐建議

通過上面的原理講解與現象分析,消息發送時拋出system busy、broker busy的緣由都是PageCache繁忙,那是否是能夠經過調整上述提到的某些參數來避免拋出錯誤呢?.例如以下參數:

  • osPageCacheBusyTimeOutMills
    設置PageCache系統超時的時間,默認爲1000,表示1s,那是否是能夠把增長這個值,例如設置爲2000或3000。做者觀點:很是不可取。
  • sendThreadPoolQueueCapacity
    Broker服務器處理的排隊隊列,默認爲10000,若是隊列中積壓了10000個請求,則會拋出RejectExecutionException。做者觀點:不可取。
  • brokerFastFailureEnable
    是否啓用快速失敗,默認爲true,表示當若是發現Broker服務器的PageCache繁忙,若是發現sendThreadPoolQueue隊列中不爲空,表示還有排隊的發送請求在排隊等待執行,則直接結束等待,返回broker busy。那若是不開啓快速失敗,則一樣能夠避免拋出這個錯誤。做者觀點:很是不可取。

修改上述參數,都不可取,緣由是出現system busy、broker busy這個錯誤,其本質是系統的PageCache繁忙,通俗一點講就是向PageCache追加消息時,單個消息發送佔用的時間超過1s了,若是繼續往該Broker服務器發送消息並等待,其TPS根本沒法知足,哪仍是高性能的消息中間了呀。故纔會採用快速失敗機制,直接給消息發送者返回錯誤,消息發送者默認狀況會重試2次,將消息發往其餘Broker,保證其高可用。

下面根據我的的看法,提出以下解決辦法:

4.1 開啓transientStorePoolEnable

在broker.config中將transientStorePoolEnable=true。

  • 方案依據:
    啓用「讀寫」分離,消息發送時消息先追加到DirectByteBuffer(堆外內存)中,而後在異步刷盤機制下,會將DirectByteBuffer中的內容提交到PageCache,而後刷寫到磁盤。消息拉取時,直接從PageCache中拉取,實現了讀寫分離,減輕了PageCaceh的壓力,能從根本上解決該問題。

  • 方案缺點:
    會增長數據丟失的可能性,若是Broker JVM進程異常退出,提交到PageCache中的消息是不會丟失的,但存在堆外內存(DirectByteBuffer)中但還未提交到PageCache中的這部分消息,將會丟失。但一般狀況下,RocketMQ進程退出的可能性不大。

4.2 擴容Broker服務器

方案依據:

當Broker服務器自身比較忙的時候,快速失敗,而且在接下來的一段時間內會規避該Broker,這樣該Broker恢復提供了時間保證,Broker自己的架構是支持分佈式水平擴容的,增長Topic的隊列數,下降單臺Broker服務器的負載,從而避免出現PageCache。

舒適提示:在Broker擴容時候,能夠複製集羣中任意一臺Broker服務下${ROCKETMQ_HOME}/store/config/topics.json到新Broker服務器指定目錄,避免在新Broker服務器上爲Broker建立隊列,而後消息發送者、消息消費者都能動態獲取Topic的路由信息。

與之擴容對應的,也能夠經過對原有Broker進行升配,例如增長內存、把機械盤換成SSD,但這種狀況,一般須要重啓Broekr服務器,沒有擴容來的方便。

本文就介紹到這裏了,若是你們以爲文章對本身有用的話,麻煩幫忙點個贊,謝謝。親愛的讀者朋友,還有更好的方案沒?歡迎留言與做者互動,共同探討。


做者介紹:
丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。

相關文章
相關標籤/搜索