rabbitmq——prefetch count

消費者在開啓acknowledge的狀況下,對接收到的消息能夠根據業務的須要異步對消息進行確認。 java

然而在實際使用過程當中,因爲消費者自身處理能力有限,從rabbitmq獲取必定數量的消息後,但願rabbitmq再也不將隊列中的消息推送過來,當對消息處理完後(即對消息進行了ack,而且有能力處理更多的消息)再接收來自隊列的消息。在這種場景下,咱們能夠經過設置basic.qos信令中的prefetch_count來達到這種效果。 異步

先直觀的看看設置了prefetch_count的效果,: 測試

1) 對比測試:兩個消費者都訂閱同一隊列,no_ack均設置爲false即開啓acknowledge機制,且均未設置prefetch_count,向隊列發佈5條消息 fetch

結果:無論消息是否被ack,rabbitmq會輪流向兩個消費者投遞消息,第一個消費者收到"1","3","5"三條消息, 第二個消費者收到"2","4"兩條消息。 ui

2)prefetch_count設置測試:兩個消費者都訂閱同一隊列,開啓acknowledge機制,第一個消費者prefetch_count設置爲1,另外一個消費者未設置prefetch_count,一樣向隊列發佈5條消息 spa

結果:rabbitmq向第一個消費者投遞了一條消息後,消費者未對該消息進行ack,rabbitmq不會再向該消費者投遞消息,剩下的四條消息均投遞給了第二個消費者 code

看完效果後,再來看看rabbitmq裏的一些實現。 rabbitmq

1. rabbitmq對basic.qos信令的處理 隊列

首先,basic.qos是針對channel進行設置的,也就是說只有在channel創建以後才能發送basic.qos信令。 進程

在rabbitmq的實現中,每一個channel都對應會有一個rabbit_limiter進程,當收到basic.qos信令後,在rabbit_limiter進程中記錄信令中prefetch_count的值,同時記錄的還有該channel未ack的消息個數。

注:其實basic.qos裏還有另外兩個參數可進行設置(global和prefetch_size),但rabbitmq沒有相應的實現。

2. 隊列中的消息投遞給消費者時的處理

當rabbitmq要將隊列中的一條消息投遞給消費者時,會遍歷該隊列上的消費者列表,選一個合適的消費者,而後將消息投遞出去。其中挑選消費者的一個依據就是看消費者對應的channel上未ack的消息數是否達到設置的prefetch_count個數,若是未ack的消息數達到了prefetch_count的個數,則不符合要求。當挑選到合適的消費者後,中斷後續的遍歷。

rabbit_amqqueue_process.erl

deliver_msgs_to_consumers(_DeliverFun, true, State) ->
    {true, State};
deliver_msgs_to_consumers(DeliverFun, false,
                          State = #q{active_consumers =
                                     ActiveConsumers}) ->
    case priority_queue:out_p(ActiveConsumers) of
        {empty, _} ->
            {false, State};
        {{value, QEntry, Priority}, Tail} ->
            {Stop, State1} =
                deliver_msg_to_consumer(DeliverFun, QEntry,
                                        Priority,
                                        State#q{active_consumers =
                                                Tail}),
            %%若是處理結果爲false,遍歷下一個消費者
            deliver_msgs_to_consumers(DeliverFun, Stop, State1)
    end.

deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
                        Priority, State) ->
    ...
    %%判斷是否能夠將消息投遞給該消費者
    case rabbit_limiter:can_send(C#cr.limiter,
                                 Consumer#consumer.ack_required,
                                 Consumer#consumer.tag) of
        %%能夠投遞,再將該消費者放到隊列的尾部
        {continue, Limiter} ->
            AC1 = priority_queue:in(E, Priority,
                                    State#q.active_consumers),
            %%將消息投遞給消費者
            deliver_msg_to_consumer0(DeliverFun, Consumer,
                                     C#cr{limiter = Limiter},
                                     State#q{active_consumers = AC1})
    ...

rabbit_limiter.erl

handle_call({can_send, QPid, AckRequired}, _From,
            State = #lim{volume = Volume}) ->
    case prefetch_limit_reached(State) of
        %%未ack的消息數達到prefetch_count設置的個數
        true  -> {reply, false, limit_queue(QPid, State)};
        false -> {reply, true,
                  %%消息須要被ACK, volume加1
                  State#lim{volume = if AckRequired -> Volume + 1;
                                        true        -> Volume
                                     end}}
    end

prefetch_limit_reached(#lim{prefetch_count = Limit, 
                            volume = Volume}) ->
    Limit =/= 0 andalso Volume >= Limit.



3. 消費者對消息ack後的處理

當消費者對消息進行ack後,最終會修改該消費者對應channel中未ack的消息數,這樣隊列又能夠將消息投遞給該消費者。

rabbit_limiter.erl

handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
    NewVolume = if Volume == 0 -> 0;
                   true        -> Volume - Count
                end,
    {noreply, maybe_notify(State, State#lim{volume = NewVolume})};



4. 擴展

在AMQP協議(0-9-1)中,有這麼一段話

對於rabbitmq來講,最後一句話其實說的就是使用了acknowledge機制狀況下,使用prefetch_count進行流量控制。另外在實際研究過程當中發現還有channel.flow以及basic.credit(應該屬於AMQP 1.0協議)能夠進行一些控制,這裏沒有展開,有時間會研究下相應的機制以及可能使用的場景。

相關文章
相關標籤/搜索