RabbitMQ消息的存儲機制以及隊列的結構

消息的存儲機制

不論是持久化的消息仍是非持久化的消息均可以被寫入到磁盤。持久化的消息在到達隊列時就被寫入到磁盤,而且若是能夠,持久化的消息也會在內存中保存一個備份,這樣就能夠提升必定的性能,當內存吃緊的時候會從內存中清除。非持久化的消息通常只保存在內存中,在內存吃緊的時候會被換入到磁盤中,以節省內存空間。這兩種類型的消息的落盤處理都在RabbitMQ的「持久層」中完成。性能

持久層是一個邏輯上的概念,實際包含兩個部分:隊列索引(rabbit_queue_index)和消息存儲(rabbit_msg_store)。rabbit_queue_index負責維護隊列中落盤消息的信息,包括消息的存儲地點、是否已經交互給消費者、是否已經被消費者ack等。每一個隊列都有與之對應的一個rabbit_queue_index。rabbit_msg_store以鍵值對的形式存儲消息,它被全部隊列共享,在每一個節點中有且只有一個。從技術層面上來講,rabbit_msg_store具體還能夠分爲msg_store_persistent和msg_store_transient,msg_store_persistent負責持久化消息的持久化,重啓後消息不會丟失;msg_store_transient負責非持久化消息的持久化,重啓後消息會丟失。一般狀況下,習慣性的將msg_store_persistent和msg_store_transient當作rabbit_msg_store這樣一個總體。fetch

消息(包括消息體、屬性和headers)能夠直接存儲在rabbit_queue_index中,也能夠被保存在rabbit_msg_store中。默認在$RABBITMQ_HOME/var/lib/mnesia/rabbit@$HOSTNAME/ 路徑下包含queues、msg_store_persistent、msg_store_transient這三個文件夾下,其分別存儲對應的信息。優化

最佳的配備是較小的消息存儲在rabbit_queue_index中而較大的消息存儲在rabbit_msg_store中。這個消息的大小的界定能夠經過queue_index_embed_msgs_below來配置,默認大小爲4096B。注意這裏的消息大小是指消息體、屬性及headers總體的大小。當一個消息小於設定的大小閾值時就能夠存儲在rabbit_queue_index中,這樣就能夠獲得性能上的優化。3d

rabbit_queue_index中以順序(文件名以0開始累加)的段文件來進行存儲,後綴爲「.idx」,每一個段文件中包含固定的SEGMENT_ENTRY_COUNT條記錄,SEGMENT_ENTRY_COUNT默認值爲16384.每一個rabbit_queue_index從磁盤中讀取消息的時候至少要在內存中維護一個段文件,因此設置queue_index_embed_msgs_below 值的時候要格外的當心謹慎,一點點增大可能會引發內存爆炸式的增加。blog

通過rabbit_msg_store處理的全部消息都會以追加的方式寫入到文件中,當一個文件的大小超過指定的限制(file_size_limit)後,關閉這個文件再建立一個新的文件以供新的消息寫入。文件名(文件後綴是「.rdq」)從0開始進行累加,所以文件名最小的文件也是最老的文件。在進行消息的存儲時,RabbitMQ會在ETS(Erlang Term Storage)表中記錄消息在文件中的映射(Index)和文件的相關信息(FileSummary)。索引

在讀取消息的時候,先根據消息的ID(msg_id)找到對應存儲的文件,若是文件存在而且未被鎖住,則直接打開文件,從指定位置讀取消息的內容。若是文件不存在或者被鎖住了,則發送請求由rabbit_msg_store進行處理。接口

消息的刪除只是從ETS表中刪除指定消息的相關信息,同時更新消息對應的存儲文件的相關信息。執行消息刪除操做時,並不當即對在文件中的消息進行刪除,也就是說消息依然在文件中,僅僅是標記爲垃圾數據而已。當一個文件中都是垃圾數據時能夠將這個文件刪除。當檢測到先後兩個文件中的有效數據能夠合併在一個文件中,而且全部的垃圾數據的大小和全部文件(至少有3個文件存在的狀況下)的數據大小的比值超過設置的閾值GARBAGE_FACTORION(默認值爲0.5)時纔會觸發垃圾回收將兩個文件合併。隊列

執行合併的兩個文件必定是邏輯上相鄰的兩個文件。以下圖所示,執行合併時首先鎖定兩個文件,並先對前面文件中的有效數據進行整理,再將後面的文件的有效數據寫入到前面的文件,同時更新消息在ETS表中的記錄,最後刪除後面的文件。ip

隊列的結構

一般隊列由rabbit_amqqueue_process和backing_queue這兩部分組成,rabbit_amqqueue_process負責協議相關的消息處理,即接收生產者發佈的消息、向消費者交付消息、處理消息的確認(包括生產端的confirm和消費端的ack等)backing_queue是消息存儲的具體形式和引擎,並向rabbit_amqqueue_process提供相關的接口以供調用。若是消息投遞的目的隊列是空的,而且有消費者訂閱了這個隊列,那麼該消息會直接發送給消費者,不會通過隊列這一步。而當消息沒法直接投遞給消費者時,須要暫時將消息存入隊列,以便從新投遞。消息存入隊列後,不是固定不變的,它會隨着系統的負載在隊列中不斷的流動,消息的狀態會不斷的發生變化。RabbitMQ中的隊列消息可能會處於如下4種狀態:內存

  ❤ alpha:消息內容(包括消息體、屬性和headers)和消息索引都存儲在內存中;

  ❤ beta:消息內容保存在磁盤中,消息索引保存在內存中;

  ❤ gamma:消息內容保存在磁盤中,消息索引在磁盤和內存中都有;

  ❤ delta:消息內容和索引都在磁盤中;

對於持久化的消息,消息內容和消息索引都必須保存在磁盤上,纔會處於上述狀態中的一種。而gamma狀態的消息是隻有持久化的消息纔會有的狀態。

RabbitMQ在運行時會根據統計的消息傳送速度按期計算一個當前內存中可以保存的最大的消息數量(target_ram_count),若是alpha狀態的消息數量大於此值時,就會引發消息的狀態轉換,多餘的消息可能會轉換到beta狀態、gamma狀態或者delta狀態。區分這4中狀態的做用主要是知足不一樣的內存和CPU需求。alpha狀態最消耗內存,但不多消耗CPU。delta狀態基本不消耗內存,可是須要更多的CPU和磁盤的I/O操做。delta狀態須要執行兩次I/O操做才能讀取到消息,一次是讀消息索引(從rabbit_queue_index中),一次是讀消息內容(從rabbit_msg_store中);beta和gamma狀態都只須要一次I/O操做就能夠讀取到消息(從rabbit_msg_store中)。

對於普通的沒有設置優先級和鏡像的隊列來講,backing_queue的默認實現是rabbit_variable_queue,其內部經過5個子隊列Q一、Q二、Delta、Q3和Q4來體現消息的各個狀態。整個隊列包括rabbit_amqqueue_process和backing_queue的各個子隊列,隊列的結構能夠參考下圖:

 

其中Q一、Q4只包含alpha狀態的消息,Q2和Q3包含beta和gamma狀態的消息,Delta只包含delta狀態的消息。通常狀況下,消息按照Q1>Q2>Delta>Q3>Q4這樣的順序步驟進行流動,但並非每一條消息都必定會經歷全部的狀態,這個取決於當前系統的負載情況。從Q1到Q4基本經歷內存到磁盤,再由磁盤到內存這樣的一個過程,如此能夠在負載很高的狀況下,可以經過將一部分消息由磁盤保存來節省內存空間,而在負載下降的時候,這部分消息又漸漸地回到內存被消費者獲取,使得整個隊列有很好的彈性。

消費者獲取消息也會引發消息的狀態轉換,當消費者獲取消息時,首先會從Q4中獲取消息,若是獲取成功則返回。若是Q4爲空,則嘗試從Q3中獲取消息,系統首先會判斷Q3是否爲空,若是爲空則返回隊列爲空,即此時隊列中沒有消息。若是Q3不爲空,則取出Q3中的消息,進而再判斷此時Q3和Delta中的長度,若是都爲空,則能夠認爲Q二、Delta、Q三、Q4所有爲空,此時將Q1中的消息直接轉移至Q4,下次直接從Q4中獲取消息,在將消息從Delta轉移到Q3的過程當中,是按照索引分段讀取的,首先讀取某一段,而後判斷讀取消息的個數與Delta中的消息的個數是否相等,若是相等,則能夠判斷此時Delta中無消息,則直接將剛讀取到的消息一併放入到Q3中;若是不相等,僅將這次讀取到的消息轉移到Q3。

在系統負載較高時,已接收到的消息若是不能很快被消費掉,這些消息就會進入到很深的隊列中去,這樣會增長處理每一個消息的平均開銷。由於要花更多的時間和資源處理「堆積」的消息,如此用來處理新流入的消息的能力就會下降,使得後流入的消息又被積壓到很深的隊列中繼續增大每一個消息的平均開銷,繼而狀況變得愈來愈惡化,使得系統的處理能力大大下降。

應對這個問題有3種措施:

  (1)增長prefetch_count的值,即一次發送多條消息給消費者,加快消息被消費的速度。

  (2)採用multiple ack,下降處理ack帶來的開銷;

  (3)流量控制;

參考:《RabbitMQ實戰指南》 朱忠華 編著;

相關文章
相關標籤/搜索