RabbitMQ源代碼分析系列三:消息存儲

今天分析RabbitMQ消息的持久化,即客戶端發送一條持久化的MQ消息後,服務端作了哪些事情。緩存

下面是客戶端的發送代碼:網絡

$client = new Client('127.0.0.1', 5672, 'guest', 'guest');app

//設置正常交換機、隊列
    $type = 'topic';
    $routingKey = 'hello';
    $exchangeName = 'hello_exchange'
    $exchange = new Exchange($client, $exchangeName, $type);
    $exchange->setDurable(true);

    //隊列
    $queue = new Queue(
        $client, $this->queueName, [
            new Consumer(
                function (AMQPMessage $msg) {
                    var_dump($msg);
                }
            ),
        ]
    );

    $binding = new Binding($exchange, $queue);
    $binding->setRoutingKey($routingKey);

    $client->register($binding);

    $message = new Message("hello" . str_repeat('123456789', 13));
    $res     = $exchange->publish($message, $routingKey);

分析下網絡包,發送消息的時候,其實是往服務端發送basic.publish命令。框架

調用鏈分析 異步

入口在rabbit_channel文件:性能

handle_method(#'basic.publish'{exchange = ExchangeNameBin,this

routing_key = RoutingKey,
                           mandatory   = Mandatory},
          Content, State = #ch{virtual_host    = VHostPath,
                               tx              = Tx,
                               channel         = ChannelNum,
                               confirm_enabled = ConfirmEnabled,
                               trace_state     = TraceState,
                               user            = #user{username = Username},
                               conn_name       = ConnName,
                               delivery_flow   = Flow}) ->

……spa

case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
    {ok, Message} ->
        Delivery = rabbit_basic:delivery(
                     Mandatory, DoConfirm, Message, MsgSeqNo),
        QNames = rabbit_exchange:route(Exchange, Delivery),
        DQ = {Delivery#delivery{flow = Flow}, QNames},
        {noreply, case Tx of
                      none         -> deliver_to_queues(DQ, State1);
                      {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
                                      State1#ch{tx = {Msgs1, Acks}}
                  end};
end;

上面刪除了一些非關鍵代碼,這裏看是否有事務,若是沒事務則 經過 deliver_to_queues發送, 有事務先進隊列,今天主要分析無事務的處理過程。hibernate

deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{設計

exchange_name = XName},
                                    mandatory  = Mandatory,
                                    confirm    = Confirm,
                                    msg_seq_no = MsgSeqNo},
               DelQNames}, State = #ch{queue_names    = QNames,
                                       queue_monitors = QMons}) ->
Qs = rabbit_amqqueue:lookup(DelQNames),
DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),

後者調用 rabbit_amqqueue:deliver來處理:

deliver(Qs, Delivery = #delivery{flow = Flow}) ->

{MPids, SPids} = qpids(Qs),
QPids = MPids ++ SPids,

MMsg = {deliver, Delivery, false},
SMsg = {deliver, Delivery, true},
delegate:cast(MPids, MMsg),
delegate:cast(SPids, SMsg),
QPids.

deliver的邏輯就 比較簡單,分主、從進程ID,若是沒有開啓鏡像隊列,從進程ID是空的,今天先不分析鏡像隊列。 

發送deliver消息到主進程,這個進程是rabbit-amqueue-process。

再來看rabbit-amqueue-process是如何處理的:

handle_cast({deliver, Delivery = #delivery{sender = Sender,

flow   = Flow}, SlaveWhenPublished},
        State = #q{senders = Senders}) ->

%% SlaveWhenPublished 只有在從的時候才爲true

noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));

中間的代碼還比較多,就不一一貼了,大概說下,deliver_or_enqueue會調用attempt_delivery,而後調用到rabbit-variable-queue:publish

publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },

MsgProps = #message_properties { needs_confirming = NeedsConfirming },
    IsDelivered, _ChPid, _Flow,
    State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
                       qi_embed_msgs_below = IndexMaxSize,
                       next_seq_id         = SeqId,
                       in_counter          = InCount,
                       durable             = IsDurable,
                       unconfirmed         = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
 {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),

調用maybe_write_to_disk 進行消息的持久化:

maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->

maybe_write_msg_to_disk用來將消息持久化,maybe_write_index_to_disk用來將索引持久化。

maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->

{MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).

若是消息大小小於配置文件中的queue_index_embed_msgs_below,

則persist_to返回queue_index,反之返回 msg_store,這個參數默認是4096,即若是消息體大小小於4096,是不會將消息寫到消息持久化文件,而是寫到索引文件中。

消息的持久化由文件rabbit_msgstore負責,msgstorewrite會調用writemessage進行消息的保存:

maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {

msg = Msg, msg_id = MsgId,
                             is_persistent = IsPersistent },
                    State = #vqstate{ msg_store_clients = MSCState,
                                      disk_write_count  = Count})

when Force orelse IsPersistent ->

case persist_to(MsgStatus) of
    msg_store   -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
                                        prepare_to_store(Msg)),
                   {MsgStatus#msg_status{msg_in_store = true},
                    State#vqstate{disk_write_count = Count + 1}};
    queue_index -> {MsgStatus, State}
end;

這裏的邏輯就比較簡單了,將消息內容到當前文件,再判斷當前文件的大小,若是須要,則建立一個新的持久化文件。

這裏講一下segment,每一個segment對應一個文件(所在的目錄在mnesia數據目錄下的msg_store_persistent)。每一個文件最多能夠保存SEGMEN_ENTRY_COUNT(16384)個消息索引信息。

這些文件是以整數來命名的,某條消息對應哪一個segment文件呢?用消息索引自己對SEGMENT_ENTRY_COUNT取整,相關代碼能夠看下 

rabbit_queue_index:add_to_journal。

最後再看索引的持久化

maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {

msg = Msg, msg_id = MsgId,
                             is_persistent = IsPersistent },
                    State = #vqstate{ msg_store_clients = MSCState,
                                      disk_write_count  = Count})

when Force orelse IsPersistent ->

case persist_to(MsgStatus) of
    msg_store   -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
                                        prepare_to_store(Msg)),
                   {MsgStatus#msg_status{msg_in_store = true},
                    State#vqstate{disk_write_count = Count + 1}};
    queue_index -> {MsgStatus, State}
end;

索引經過rabbit_queue_index:publish 來落盤:

publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,

State = #qistate{unconfirmed     = UC,
                     unconfirmed_msg = UCM}) ->
MsgId = case MsgOrId of
            #basic_message{id = Id} -> Id;
            Id when is_binary(Id)   -> Id
        end,
?MSG_ID_BYTES = size(MsgId),

%%JournalHd1對應journal.jif

{JournalHdl, State1} =
    get_journal_handle(
      case {MsgProps#message_properties.needs_confirming, MsgOrId} of
          {true,  MsgId} -> UC1  = gb_sets:add_element(MsgId, UC),
                            State#qistate{unconfirmed     = UC1};
          {true,  _}     -> UCM1 = gb_sets:add_element(MsgId, UCM),
                            State#qistate{unconfirmed_msg = UCM1};
          {false, _}     -> State
      end),
file_handle_cache_stats:update(queue_index_journal_write),
{Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
ok = file_handle_cache:append(
       JournalHdl, [<<(case IsPersistent of
                           true  -> ?PUB_PERSIST_JPREFIX;
                           false -> ?PUB_TRANS_JPREFIX
                       end):?JPREFIX_BITS,
                      SeqId:?SEQ_BITS, Bin/binary,
                      (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]),
  maybe_flush_journal(
  JournalSizeHint,
  add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).

索引文件會先寫到 journal緩存中,再按期刷到磁盤中,相關參數爲

queue_index_max_journal_entries,

判斷當前寫入次數是否達到queue_index_max_journal_entries,是則進行刷盤到索引持久化文件。

實際刷盤是在 rabbit_variable_queue:handle_pre_hibernate中異步去刷的,這裏不詳述。

索引持久化文件在mnesia目錄的queues目錄下,文件擴展名爲idx。

如何保證消息的不丟呢,即若是寫入journal文件成功了,但沒有刷新到索引的持久化文件中如何恢復,能夠看下代碼 rabbit_variable_queue:init, RabbitMQ啓動的時候啓動每一個隊列以前會調用它來從journal中恢復索引和消息。

最後總結

持久化分消息體和索引的持久化,若是消息體小於queue_index_embed_msgs_below,則將消息寫入到索引文件中,只進行1次磁盤操做,反之要寫2次磁盤:消息體+索引,消息體寫入到segment文件中,一個segment能夠保存16384條消息。

爲了加快寫入的性能,寫入消息體時是追加方式進行的;索引的持久化則是先追加到journal文件中,再異步刷新到索引文件中。

RabbitMQ網絡框架代碼分析二:命令分發
RabbitMQ網絡框架代碼分析
從RabbitMQ Channel設計看鏈接複用

image

相關文章
相關標籤/搜索