【RabbitMQ學習記錄】- 消息隊列存儲機制源碼分析

本文來自 網易雲社區 。html

 

RabbitMQ在金融系統,OpenStack內部組件通訊和通訊領域應用普遍,它部署簡單,管理界面內容豐富使用十分方便。筆者最近在研究RabbitMQ部署運維和代碼架構,本篇文章主要記錄下RabbitMQ存儲機制相關內容和源碼分析。node

1、RabbitMQ進程架構

Erlang是基於Actor模型的一門自然多進程、分佈式和高併發的語言。一個Erlang虛擬機對應一個操做系統進程,一個Erlang進程調度器對應一個操做系統線程,通常來講,有多少個CPU核就有多少個調度器。服務器

RabbitMQ是基於Erlang語言實現的一個分佈式消息中間件。下圖是RabbitMQ基本的進程模型:數據結構

 

  • tcp_acceptor:負責接受客戶端鏈接,而後爲客戶端鏈接建立rabbit_reader、rabbit_writer、rabbit_channel進程
  • rabbit_reader:負責解析客戶端AMQP幀,而後將請求發送給rabbit_channel進程
  • rabbit_writer:負責向客戶端返回數據
  • rabbit_channel:負責解析AMQP方法,以及對消息進行路由,而後發送給對應的隊列進程。
  • rabbit_amqqueue_process:rabbit隊列進程,該進程通常在rabbitmq建立隊列時被建立,其主要負責消息的接收/投遞邏輯
  • rabbit_msg_store:存儲服務器進程,主要負責消息的持久化存儲

上述進程中,tcp_acceptor和rabbit_msg_store只會有一個,rabbit_amqqueue_process進程的數量則和隊列數量保持一致,每一個客戶端鏈接對應一個rabbit_reader和rabbit_writer進程,每個鏈接的通道對應一個rabbit_channel進程。架構

一般來講,客戶端發起一條鏈接的同時,能夠打開多條channel,相對鏈接的open/close來講,對channel進行open和close的操做開銷會更小。最佳實踐是一個生產者/消費者進程對應一個connection,具體發送一個線程對應一個channel便可。併發

2、消息存在哪裏

RabbitMQ的消息持久化實際包括兩部分:隊列索引(rabbit_queue_index)和消息存儲(rabbit_msg_store)。rabbit_queue_index負責維護隊列中落盤消息的信息,包括消息的存儲地點、是否已經被交付給消費者、是否已被消費者ack等,每一個隊列都有一個與之對應的rabbit_queue_index。運維

rabbit_msg_store以鍵值對的形式存儲消息,每一個節點有且只有一個,全部隊列共享。從技術層面講rabbit_msg_store又能夠分爲msg_store_persistent和msg_store_transient,其中msg_store_persistent負責持久化消息的存儲,不會丟失,而msg_store_transient負責非持久化消息的存儲,重啓後消息會丟失。tcp

經過配置環境變量RABBITMQ_MNESIA_BASE能夠指定存儲目錄,通常配置RABBITMQ_MNESIA_BASE=/srv/rabbitmq。分佈式

$ cd /srv/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1
$ ls -l
drwxr-xr-x 2 nqs nqs 12288 Jun  1 14:43 msg_store_persistent
drwxr-xr-x 2 nqs nqs  4096 Jul 25  2016 msg_store_transient
drwxr-xr-x 4 nqs nqs  4096 Jul 27  2016 queues
...

其中msg_store_transient、queues和msg_store_persistent就是實際消息的存儲目錄。函數

2.1 rabbit_msg_store存儲

RabbitMQ經過配置queue_index_embed_msgs_below能夠指定根據消息存儲位置,默認queue_index_embed_msgs_below是4096字節(包含消息體、屬性及headers),小於該值的消息存在rabbit_queue_index中。

$ ls msg*
msg_store_persistent:
82680.rdq  97666.rdq

msg_store_transient:
0.rdq

通過rabbit_msg_store處理的消息都會以追加的方式寫入到文件中,文件名從0開始累加,後綴是.rdq,當一個文件的大小超過指定的限制(file_size_limit)後,關閉這個文件再建立一個新的文件存儲。 消息以如下格式存在於文件中:

<<Size:64, MsgId:16/binary, MsgBody>>

MsgId爲RabbitMQ經過rabbit_guid:gen()每個消息生成的GUID,MsgBody會包含消息對應的exchange,routing_keys,消息的內容,消息對應的協議版本,消息內容格式。

在進行消息存儲時,RabbitMQ會在ETS表中記錄消息在文件中的位置映射和文件的相關信息。讀取消息的時候先根據消息的msg_id找到對應的文件,若是文件存在且未被鎖住則直接打開文件,若是文件不存在或者鎖住了則發請求到rabbit_msg_store處理。

2.2 索引文件

查看索引信息

$ cd queues/DMX3PGVA4ZG3HHCXA0ULNIM6P
$ ls 
70083.idx  70084.idx  88155.idx  journal.jif

rabbit_queue_index順序存儲段文件,文件編號從0開始,後綴.idx,且每一個段文件包含固定的SEGMENT_ENTRY_COUNT條記錄。SEGMENT_ENTRY_COUNT默認是16384,每一個rabbit_queue_index從磁盤讀取消息的時候至少讀取一個段文件。

2.3 過時消息刪除

消息的刪除只是從ETS表刪除執行消息的相關信息,同時更新對應的存儲文件的相關信息,並不當即對文件中的消息進程刪除,後續會有專門的垃圾回收進程負責合併待回收消息文件。

當全部文件中的垃圾消息(已經被刪除的消息)比例大於閾值(GARBAGE_FRACTION = 0.5)時,會觸發文件合併操做(至少有三個文件存在的狀況下),以提升磁盤利用率。

publish消息時寫入內容,ack消息時刪除內容(更新該文件的有用數據大小),當一個文件的有用數據等於0時,刪除該文件。

3、消息存儲過程源碼分析

消息流轉示意圖:

rabbit_channel進程肯定了消息將要投遞的目標隊列,rabbit_amqqueue_process是隊列進程,每一個隊列都有一個對應的進程,實際上rabbit_amqqueue_process進程只是提供了邏輯上對隊列的相關操做,他的真正操做是經過調用指定的backing_queue模塊提供的相關接口實現的,默認狀況該backing_queue的實現模塊爲rabbit_variable_queue。 RabbitMQ隊列中的消息隨着系統的負載會不斷的變化,一個消息可能會處於如下4種狀態:

%% Definitions:
%%
%% alpha: this is a message where both the message itself, and its
%%        position within the queue are held in RAM(消息自己和消息位置索引都只在內存中)
%%
%% beta:  this is a message where the message itself is only held on
%%        disk (if persisted to the message store) but its position
%%        within the queue is held in RAM.(消息自己存儲在磁盤中,可是消息的位置索引存在內存中)
%%
%% gamma: this is a message where the message itself is only held on
%%        disk, but its position is both in RAM and on disk.(消息自己存儲在磁盤中,可是消息的位置索引存在內存中和磁盤中)
%%
%% delta: this is a collection of messages, represented by a single
%%        term, where the messages and their position are only held on
%%        disk.(消息自己和消息的位置索引都值存儲在磁盤中)

對於普通的沒有設置優先級和鏡像的隊列來講,backing_queue的默認實現是rabbit_variable_queue,其內部經過5個子隊列Q一、Q二、Delta、Q3和Q4來實現這4個狀態的轉換,其關係以下圖所示:

 

其中Q一、Q4只包含alpha狀態的消息,Q2和Q3包含Beta和gamma狀態的消息,Delta只包含delta狀態的消息。具體消息的狀態轉換後續會進行源碼分析。

3.1 消息入隊分析

rabbit_amqqueue_process對消息的主要處理邏輯位於deliver_or_enqueue函數,該方法將消息直接傳遞給消費者,或者將消息存儲到隊列當中。

總體處理邏輯以下:

  1. 首先處理消息的mandory標誌,和confirm屬性。mandatory標誌告訴服務器至少將該消息route到一個隊列中,不然將消息返還給生產者。confirm則是消息的發佈確認。
  2. 而後判斷隊列中是否有消費者正在等待,若是有則直接調用backing_queue的接口給客戶端發送消息。
  3. 若是隊列上沒有消費者,根據當前相關設置判斷消息是否須要丟棄,不須要丟棄的狀況下調用backing_queue的接口將消息入隊。

deliver_or_enqueue函數代碼:

deliver_or_enqueue(Delivery = #delivery{message = Message,
                                        sender  = SenderPid,
                                        flow    = Flow},
                   Delivered, State = #q{backing_queue       = BQ,
                                         backing_queue_state = BQS}) ->
    %% 若是當前消息mandatory字段爲true,則馬上通知該消息對應的rabbit_channel進程
    send_mandatory(Delivery), %% must do this before confirms
    %% 消息隊列記錄要confirm的消息,若是confirm爲false,則不記錄要confirm(若是消息須要進行confirm,則將該消息的信息存入msg_id_to_channel字段中)
    {Confirm, State1} = send_or_record_confirm(Delivery, State),
    %% 獲得消息特性特性數據結構
    Props = message_properties(Message, Confirm, State1),
    %% 讓backing_queue去判斷當前消息是否重複(rabbit_variable_queue沒有實現,直接返回的false)
    {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
    State2 = State1#q{backing_queue_state = BQS1},
    case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
                                             State2) of
        true ->
            State2;
        %% 已經將消息發送給消費者的狀況
        {delivered, State3} ->
            State3;
        %% The next one is an optimisation
        %% 沒有消費者來取消息的狀況(discard:拋棄)
        %% 當前消息沒有發送到對應的消費者,同時當前隊列中設置的消息過時時間爲0,同時從新發送的exchange交換機爲undefined,則馬上將該消息丟棄掉
        {undelivered, State3 = #q{ttl = 0, dlx = undefined,
                                  backing_queue_state = BQS2,
                                  msg_id_to_channel   = MTC}} ->
            %% 直接將消息丟棄掉,若是須要confirm的消息則馬上通知rabbit_channel進程進行confirm操做
            {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
            State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
        %% 沒有消費者來取消息的狀況
        {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
            %% 將消息發佈到backing_queue中
            BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
            %% 判斷當前隊列中的消息數量超過上限或者消息的佔的空間大小超過上限
            {Dropped, State4 = #q{backing_queue_state = BQS4}} =
                maybe_drop_head(State3#q{backing_queue_state = BQS3}),
            %% 獲得當前隊列中的消息數量
            QLen = BQ:len(BQS4),
            %% optimisation: it would be perfectly safe to always
            %% invoke drop_expired_msgs here, but that is expensive so
            %% we only do that if a new message that might have an
            %% expiry ends up at the head of the queue. If the head
            %% remains unchanged, or if the newly published message
            %% has no expiry and becomes the head of the queue then
            %% the call is unnecessary.
            case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
                %% 該狀況是頭部沒有變化,同時消息隊列消息樹立不爲一,則無論當前加入的消息是否設置有超時時間,都不執行drop_expired_msgs函數
                {false, false,         _} -> State4;
                %% 有丟棄消息,同時當前隊列中只有當前這個新的消息,同時消息本身的特性過時時間沒有定義,則不檢查消息過時
                %% 此時消息的頭部有變化,可是消息隊列中只有一個消息,該消息尚未設置超時時間,則不執行drop_expired_msgs函數
                {true,  true,  undefined} -> State4;
                %% 當向隊列中插入消息後須要作檢查消息過時,同時設置定時器的操做只有三種狀況
                %% 1.當消息頭部根據隊列上限有變化,同時消息插入後當前隊列消息數量爲一,且該消息設置有過時時間,則須要作一次操做(該狀況是消息頭部有刪除消息,都會進行一次消息過時檢查)
                %% 2.當消息頭部根據隊列上限有變化,同時消息插入後當前隊列消息數量不爲一,且該消息設置有過時時間,則須要作一次操做(該狀況是消息頭部有刪除消息,都會進行一次消息過時檢查)
                %% 3.當消息頭部根據隊列上限沒有變化,同時消息插入後當前隊列消息數量爲一,無論消息有沒有過時時間,都要作一次操做(該狀況下是當前隊列進入第一條消息)
                %% 最重要的是隻要消息隊列的頭部消息有變化,則馬上執行drop_expired_msgs函數,將隊列頭部超時的消息刪除掉
                {_,     _,             _} -> drop_expired_msgs(State4)
            end
    end.

若是調用到該方法的BQ:publish則說明當前隊列沒有消費者正在等待,消息將進入到隊列。backing_queue實現了消息的存儲,他會盡力會durable=true的消息作持久化存儲。初始默認狀況下,非持久化消息直接進入內存隊列,此時效率最高,當內存佔用逐漸達到一個閾值時,消息和消息索引逐漸往磁盤中移動,隨着消費者的不斷消費,內存佔用的減小,消息逐漸又從磁盤中被轉到內存隊列中。

消息在這些Queue中傳遞的"通常"過程q1->q2->delta->q3->q4,通常負載較輕的狀況消息不須要走完每一個Queue,大部分均可以跳過。rabbit_variable_queue中消息的入隊接口源碼以下:

%% 消息的發佈接口
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
        MsgProps = #message_properties { needs_confirming = NeedsConfirming },
        IsDelivered, _ChPid, _Flow,
        State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
                           next_seq_id      = SeqId,
                           in_counter       = InCount,
                           durable          = IsDurable,
                           unconfirmed      = UC }) ->
    %% 只有持久化隊列和消息持久化纔會對消息進行持久化
    IsPersistent1 = IsDurable andalso IsPersistent,
    %% 組裝消息狀態(該數據結構是實際存儲在隊列中的數據)
    MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
    %% 若是隊列和消息都是持久化類型,則將消息內容和消息在隊列中的索引寫入磁盤
    {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
    %% 將消息狀態數據結構存入內存(若是Q3隊列不爲空,則將新消息存入Q1隊列,若是爲空則將新消息存入Q4隊列)
    State2 = case ?QUEUE:is_empty(Q3) of
                 %% 若是Q3隊列不爲空,則將當前的消息寫入Q1隊列
                 false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
                 %% 若是Q3隊列爲空,則將當前的消息寫入Q4隊列
                 true  -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
             end,
    %% 進入隊列中的消息數量加一
    InCount1 = InCount + 1,
    %% 若是消息須要確認,將該消息加入unconfirmed字段
    UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
    %% 更新隊列進程中的狀態信息
    State3 = stats({1, 0}, {none, MsgStatus1},
                   %% 更新下一個消息在消息中的位置
                   State2#vqstate{ next_seq_id = SeqId + 1,
                                   in_counter  = InCount1,
                                   unconfirmed = UC1 }),
    %% RabbitMQ系統中使用的內存過多,此操做是將內存中的隊列數據寫入到磁盤中
    a(reduce_memory_use(maybe_update_rates(State3))).

消息入隊時先判斷Q3是否爲空,若是Q3爲空,則直接進入Q4,不然進入Q1,這裏思考下爲何?

假如Q3爲空,Delta必定爲空,由於假如Delta不爲空,那麼Q3取出最後一個消息的時候Delta已經把消息轉移到Q3了,這樣Q3就不是空了,先後矛盾所以Delta必定是空的。同理能夠推測出Q二、Q1都是空的,直接把消息放入Q4便可。

消息入隊後,須要判斷內存使用,調用reduce_memory_use函數:

reduce_memory_use(State = #vqstate {
                                    ram_pending_ack  = RPA,
                                    ram_msg_count    = RamMsgCount,
                                    target_ram_count = TargetRamCount,
                                    rates            = #rates { in      = AvgIngress,
                                                                out     = AvgEgress,
                                                                ack_in  = AvgAckIngress,
                                                                ack_out = AvgAckEgress } }) ->
    State1 = #vqstate { q2 = Q2, q3 = Q3 } =
                          %% 獲得當前在內存中的數量超過容許在內存中的最大數量的個數
                          case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
                              0  -> State;
                              %% Reduce memory of pending acks and alphas. The order is
                              %% determined based on which is growing faster. Whichever
                              %% comes second may very well get a quota of 0 if the
                              %% first manages to push out the max number of messages.
                              S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
                                                     (AvgIngress - AvgEgress)) of
                                               %% ack操做進入的流量大於消息進入的流量,則優先將等待ack的消息寫入磁盤文件
                                               true  -> [
                                                         %% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁盤文件)
                                                         fun limit_ram_acks/2,
                                                         %% 將Quota個alphas類型的消息轉化爲betas類型的消息(Q1和Q4隊列都是alphas類型的消息)
                                                         fun push_alphas_to_betas/2
                                                        ];
                                               %% 消息進入的流量大於ack操做進入的消息流量,則優先將非等待ack的消息寫入磁盤文件
                                               false -> [
                                                         %% 將Quota個alphas類型的消息轉化爲betas類型的消息(Q1和Q4隊列都是alphas類型的消息)
                                                         fun push_alphas_to_betas/2,
                                                         %% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁盤文件)
                                                         fun limit_ram_acks/2
                                                        ]
                                           end,
                                    %% 真正執行轉化的函數
                                    {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
                                                                       ReduceFun(QuotaN, StateN)
                                                              end, {S1, State}, Funs),
                                    State2
                          end,
    %% 當前beta類型的消息大於容許的beta消息的最大值,則將beta類型多餘的消息轉化爲deltas類型的消息
    case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
                    permitted_beta_count(State1)) of
        S2 when S2 >= ?IO_BATCH_SIZE ->
            %% 將S2個betas類型的消息轉化爲deltas類型的消息
            push_betas_to_deltas(S2, State1);
        _  ->
            State1
    end.

%% 將Quota個alphas類型的消息轉化爲betas類型的消息(Q1和Q4隊列都是alphas類型的消息)
push_alphas_to_betas(Quota, State) ->
    %% 將Q1隊列中消息轉化爲betas類型的消息
    %% 若是磁盤中沒有消息,則將Q1中的消息存儲到Q3隊列,若是磁盤中有消息則將Q3隊列中的消息存儲到Q2隊列(將Q1隊列頭部的元素放入到Q2或者Q3隊列的尾部)
    {Quota1, State1} =
        push_alphas_to_betas(
          fun ?QUEUE:out/1,
          fun (MsgStatus, Q1a,
               %% 若是delta類型的消息的個數爲0,則將該消息存入存入Q3隊列
               State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
                   State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
             %% 若是delta類型的消息個數不爲0,則將該消息存入Q2隊列
             (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
                  State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
          end,
          Quota, State #vqstate.q1, State),
    %% 將Q4隊列中消息轉化爲betas類型的消息(Q4 -> Q3)(將Q4隊列尾部的元素不斷的放入到Q3隊列的頭部)
    {Quota2, State2} =
        push_alphas_to_betas(
          fun ?QUEUE:out_r/1,
          fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
                   State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
          end,
          Quota1, State1 #vqstate.q4, State1),
    {Quota2, State2}.

%% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁盤文件)
limit_ram_acks(0, State) ->
    {0, State};

limit_ram_acks(Quota, State = #vqstate { ram_pending_ack  = RPA,
                                         disk_pending_ack = DPA }) ->
    case gb_trees:is_empty(RPA) of
        true ->
            {Quota, State};
        false ->
            %% 拿到隊列索引最大的消息
            {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
            %% 內存不足,強制性的將等待ack的SeqId消息內容寫入磁盤
            {MsgStatus1, State1} =
                maybe_write_to_disk(true, false, MsgStatus, State),
            %% 若是成功的將消息寫入磁盤,則將內存中的消息體字段清空
            MsgStatus2 = m(trim_msg_status(MsgStatus1)),
            %% 更新存儲在磁盤中等待ack的消息字段disk_pending_ack,將剛纔從存儲在內存中等待ack的消息字段ram_pending_ack中的SeqId存儲到disk_pending_ack字段中
            DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
            %% 更新隊列狀態,同時更新最新的ram_pending_ack和disk_pending_ack字段
            limit_ram_acks(Quota - 1,
                           %% 主要是更新內存中保存的消息大小(ram_bytes減去當前寫入磁盤的消息的大小)
                           stats({0, 0}, {MsgStatus, MsgStatus2},
                                 State1 #vqstate { ram_pending_ack  = RPA1,
                                                   disk_pending_ack = DPA1 }))
    end.

每次入隊消息後,判斷RabbitMQ系統中使用的內存是否過多,此操做是嘗試將內存中的隊列數據寫入到磁盤中.
內存中的消息數量(RamMsgCount)及內存中的等待ack的消息數量(RamAckIndex)的和大於容許的內存消息數量(TargetRamCount)時,多餘數量的消息內容會被寫到磁盤中.

3.2 消息出隊源碼分析

獲取消息:

  1. 嘗試從q4隊列中獲取一個消息,若是成功,則返回獲取到的消息,若是失敗,則嘗試經過試用fetch_from_q3/1從q3隊列獲取消息,成功則返回,若是爲空則返回空;
  2. 注意fetch_from_q3從Q3獲取消息,若是Q3爲空,則說明整個隊列都是空的,無消息,消費者等待便可。

 

取出消息後:

  1. 若是Q4不爲空,取出消息後直接返回;
  2. 若是Q4爲空,Q3不爲空,從Q3取出消息後,判斷Q3是否爲空,若是Q3爲空,Delta不爲空,則將Delta中的消息轉移到Q3中,下次直接從Q3消費;
  3. 若是Q3和Delta都是空的,則能夠任務Delta和Q2的消息都是空的,此時將Q1的消息轉移到Q4,下次直接從Q4消費便可。
%% 從隊列中獲取消息
queue_out(State = #vqstate { q4 = Q4 }) ->
    %% 首先嚐試從Q4隊列中取得元素(Q4隊列中的消息類型爲alpha)
    case ?QUEUE:out(Q4) of
        {empty, _Q4} ->
            %% 若是Q4隊列爲空則從Q3隊列中取得元素(若是Q3也爲空,則直接返回空)
            case fetch_from_q3(State) of
                {empty, _State1} = Result     -> Result;
                {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
            end;
        {{value, MsgStatus}, Q4a} ->
            {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
    end.

%% 從隊列Q3中讀取消息
fetch_from_q3(State = #vqstate { q1    = Q1,
                                 q2    = Q2,
                                 delta = #delta { count = DeltaCount },
                                 q3    = Q3,
                                 q4    = Q4 }) ->
    %% 先從Q3隊列中取元素(若是爲空,則直接返回爲空)
    case ?QUEUE:out(Q3) of
        {empty, _Q3} ->
            {empty, State};
        {{value, MsgStatus}, Q3a} ->
            State1 = State #vqstate { q3 = Q3a },
            State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
                         {true, true} ->
                             %% 當這兩個隊列都爲空時,能夠確認q2也爲空,也就是這時候,q2,q3,delta,q4都爲空,那麼,q1隊列的消息能夠直接轉移到q4,下次獲取消息時就能夠直接從q4獲取
                             %% q3 is now empty, it wasn't before;
                             %% delta is still empty. So q2 must be
                             %% empty, and we know q4 is empty
                             %% otherwise we wouldn't be loading from
                             %% q3. As such, we can just set q4 to Q1.
                             %% 當Q3隊列爲空,且磁盤中的消息數量爲空,則斷言Q2隊列爲空
                             true = ?QUEUE:is_empty(Q2), %% ASSERTION
                             %% 當Q3隊列爲空,且磁盤中的消息數量爲空,則斷言Q4隊列爲空
                             true = ?QUEUE:is_empty(Q4), %% ASSERTION
                             %% 從Q3隊列中取走消息後發現Q3隊列爲空,同時磁盤中沒有消息,則將Q1隊列中的消息放入Q4隊列
                             State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
                         {true, false} ->
                             %% 從Q3隊列中取走消息後發現Q3隊列爲空,q3空,delta非空,這時候就須要從delta隊列(內容與索引都在磁盤上,經過maybe_deltas_to_betas/1調用)讀取消息,並轉移到q3隊列
                             maybe_deltas_to_betas(State1);
                         {false, _} ->
                             %% q3非空,直接返回,下次獲取消息還能夠從q3獲取
                             %% q3 still isn't empty, we've not
                             %% touched delta, so the invariants
                             %% between q1, q2, delta and q3 are
                             %% maintained
                             State1
                     end,
            {loaded, {MsgStatus, State2}}
    end.
轉移Delta消息到Q3源碼分析:

%% 從磁盤中讀取隊列數據到內存中來(從隊列消息中最小索引ID讀取出一個索引磁盤文件大小的消息索引信息)
%% 從隊列索引的磁盤文件將單個磁盤文件中的消息索引讀取出來
%% 該操做是將單個隊列索引磁盤文件中的deltas類型消息轉換爲beta類型的消息
maybe_deltas_to_betas(State = #vqstate {
                                        q2                   = Q2,
                                        delta                = Delta,
                                        q3                   = Q3,
                                        index_state          = IndexState,
                                        ram_msg_count        = RamMsgCount,
                                        ram_bytes            = RamBytes,
                                        ram_pending_ack      = RPA,
                                        disk_pending_ack     = DPA,
                                        qi_pending_ack       = QPA,
                                        disk_read_count      = DiskReadCount,
                                        transient_threshold  = TransientThreshold }) ->
    #delta { start_seq_id = DeltaSeqId,
             count        = DeltaCount,
             end_seq_id   = DeltaSeqIdEnd } = Delta,
    %% 根據delta中的開始DeltaSeqId獲得存在索引磁盤的最小的磁盤索引號
    DeltaSeqId1 =
        lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
                   DeltaSeqIdEnd]),
    %% 從隊列索引中讀取消息索引(從隊列索引的磁盤文件將單個磁盤文件中的消息索引讀取出來)
    {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
                                                  IndexState),
    %% 過濾掉從rabbit_queue_index中讀取過來的消息隊列索引(若是該消息不是持久化的則須要刪除掉),最後獲得當前內存中準備好的消息個數以及內存中的消息的總的大小
    {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
        %% RabbitMQ系統關閉之前非持久化消息存儲到磁盤中的索引信息再從磁盤讀取出來的時候必須將他們完全從RabbitMQ系統中刪除
        betas_from_index_entries(List, TransientThreshold,
                                 RPA, DPA, QPA, IndexState1),
    %% 更新隊列消息索引結構,內存中隊列中的消息個數,隊列內存中消息佔的大小,以及從磁盤文件讀取的次數
    State1 = State #vqstate { index_state       = IndexState2,
                              ram_msg_count     = RamMsgCount   + RamCountsInc,
                              ram_bytes         = RamBytes      + RamBytesInc,
                              disk_read_count   = DiskReadCount + RamCountsInc},
    case ?QUEUE:len(Q3a) of
        0 ->
            %% we ignored every message in the segment due to it being
            %% transient and below the threshold
            %% 若是讀取的當前消息隊列索引磁盤文件中的操做項爲空,則繼續讀下一個消息索引磁盤文件中的操做項
            maybe_deltas_to_betas(
              State1 #vqstate {
                               delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
        Q3aLen ->
            %% 將從索引中讀取出來的消息索引存儲到Q3隊列(將新從磁盤中讀取的消息隊列添加到老的Q3隊列的後面)
            Q3b = ?QUEUE:join(Q3, Q3a),
            case DeltaCount - Q3aLen of
                0 ->
                    %% 若是讀取出來的長度和隊列索引的總長度相等,則delta信息被重置爲消息個數爲0,同時q2中的消息轉移到q3隊列
                    %% delta is now empty, but it wasn't before, so
                    %% can now join q2 onto q3
                    State1 #vqstate { q2    = ?QUEUE:new(),
                                      delta = ?BLANK_DELTA,
                                      %% 若是磁盤中已經沒有消息,則將Q2隊列中的消息放入Q3隊列
                                      q3    = ?QUEUE:join(Q3b, Q2) };
                N when N > 0 ->
                    %% 獲得最新的隊列消息磁盤中的信息
                    Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
                                        count        = N,
                                        end_seq_id   = DeltaSeqIdEnd }),
                    %% 更新最新的q3隊列和磁盤信息結構
                    State1 #vqstate { delta = Delta1,
                                      q3    = Q3b }
            end
    end.

問題1:爲何Q4,Q3空,隊列就爲空?

消費Q3最後一條消息的時候,會調用函數maybe_deltas_to_betas,將磁盤上Delta狀態的消息轉移到Q3,如今Q3是空的,那麼Delta狀態的消息必定是空的,不然消息會轉移到Q3;

Delta消息是空的,上述代碼中:

State1 #vqstate { q2  = ?QUEUE:new(),
delta = ?BLANK_DELTA,
%% 若是磁盤中已經沒有消息,則將Q2隊列中的消息放入Q3隊列
q3    = ?QUEUE:join(Q3b, Q2) };

會將Q2隊列的消息轉移到Q3,如今Q3是空的,那麼Q2中消息確定是空的;

如今Q二、Q三、Delta和Q4都是空的,看代碼:

State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
                         {true, true} ->
                             true = ?QUEUE:is_empty(Q2), 
                             true = ?QUEUE:is_empty(Q4), 
                             %% 從Q3隊列中取走消息後發現Q3隊列爲空,同時磁盤中沒有消息,則將Q1隊列中的消息放入Q4隊列
                             State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };

會將Q1消息轉移到Q4,如今Q4是空的,Q1確定沒有消息了。

綜上所述,Q3和Q4都是空的,那該隊列無消息!

 

問題2:爲何q4,q3,delta爲空的時候,q2必空?

在問題1中已經分析了,Delta消息爲空的時候會將Q2放入Q3中,如今Q3是空的,能夠反向推出Q2確定是空的。

 

問題3:爲何Q四、Q3和delta爲空的時候,q1不爲空會直接轉移到q4?

根據定義Q1和Q4存儲的消息是處於內存中的alpha狀態的消息,這時候直接從Q1轉到Q4就不須要通過磁盤,減小IO延遲;

rabbit_variable_queue.erl源碼關於轉換狀態還有不少細節,這裏再也不介紹。後續深刻學習源碼後再分析。

 

4、總結

節點消息堆積較多時,這些堆積的消息很快就會進入很深的隊列中去,這樣會增長處理每一個消息的平均開銷,整個系統的處理能力就會下降。由於要花更多的時間和資源處理堆積的消息,後流入的消息又被擠壓到很深的隊列中了,系統負載愈來愈惡化。

所以RabbitMQ使用時必定要注意磁盤佔用監控和流控監控,這些在控制檯上均可以看到,通常來講若是消息堆積過多建議增長消費者或者加強每一個消費者的消費能力(好比調高prefetch_count消費者一次收到的消息能夠提升單個消費者消費能力)。

 

參考文章:
RabbitMQ源碼分析 - 隊列機制

RabbitMQ實戰指南

RabbitMQ官方文檔

 

本文已由做者李海燕受權網易雲社區發佈。

相關文章
相關標籤/搜索