消費者在開啓acknowledge的狀況下,對接收到的消息能夠根據業務的須要異步對消息進行確認。 java
然而在實際使用過程當中,因爲消費者自身處理能力有限,從rabbitmq獲取必定數量的消息後,但願rabbitmq再也不將隊列中的消息推送過來,當對消息處理完後(即對消息進行了ack,而且有能力處理更多的消息)再接收來自隊列的消息。在這種場景下,咱們能夠經過設置basic.qos信令中的prefetch_count來達到這種效果。 異步
先直觀的看看設置了prefetch_count的效果,: 測試
1) 對比測試:兩個消費者都訂閱同一隊列,no_ack均設置爲false即開啓acknowledge機制,且均未設置prefetch_count,向隊列發佈5條消息 fetch
結果:無論消息是否被ack,rabbitmq會輪流向兩個消費者投遞消息,第一個消費者收到"1","3","5"三條消息, 第二個消費者收到"2","4"兩條消息。 ui
2)prefetch_count設置測試:兩個消費者都訂閱同一隊列,開啓acknowledge機制,第一個消費者prefetch_count設置爲1,另外一個消費者未設置prefetch_count,一樣向隊列發佈5條消息 spa
結果:rabbitmq向第一個消費者投遞了一條消息後,消費者未對該消息進行ack,rabbitmq不會再向該消費者投遞消息,剩下的四條消息均投遞給了第二個消費者 code
看完效果後,再來看看rabbitmq裏的一些實現。 rabbitmq
1. rabbitmq對basic.qos信令的處理 隊列
首先,basic.qos是針對channel進行設置的,也就是說只有在channel創建以後才能發送basic.qos信令。 進程
在rabbitmq的實現中,每一個channel都對應會有一個rabbit_limiter進程,當收到basic.qos信令後,在rabbit_limiter進程中記錄信令中prefetch_count的值,同時記錄的還有該channel未ack的消息個數。
注:其實basic.qos裏還有另外兩個參數可進行設置(global和prefetch_size),但rabbitmq沒有相應的實現。
2. 隊列中的消息投遞給消費者時的處理
當rabbitmq要將隊列中的一條消息投遞給消費者時,會遍歷該隊列上的消費者列表,選一個合適的消費者,而後將消息投遞出去。其中挑選消費者的一個依據就是看消費者對應的channel上未ack的消息數是否達到設置的prefetch_count個數,若是未ack的消息數達到了prefetch_count的個數,則不符合要求。當挑選到合適的消費者後,中斷後續的遍歷。
rabbit_amqqueue_process.erl deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> {false, State}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer(DeliverFun, QEntry, Priority, State#q{active_consumers = Tail}), %%若是處理結果爲false,遍歷下一個消費者 deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) -> ... %%判斷是否能夠將消息投遞給該消費者 case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of %%能夠投遞,再將該消費者放到隊列的尾部 {continue, Limiter} -> AC1 = priority_queue:in(E, Priority, State#q.active_consumers), %%將消息投遞給消費者 deliver_msg_to_consumer0(DeliverFun, Consumer, C#cr{limiter = Limiter}, State#q{active_consumers = AC1}) ... rabbit_limiter.erl handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case prefetch_limit_reached(State) of %%未ack的消息數達到prefetch_count設置的個數 true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, %%消息須要被ACK, volume加1 State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} end prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit.
3. 消費者對消息ack後的處理
當消費者對消息進行ack後,最終會修改該消費者對應channel中未ack的消息數,這樣隊列又能夠將消息投遞給該消費者。
rabbit_limiter.erl handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
在AMQP協議(0-9-1)中,有這麼一段話
對於rabbitmq來講,最後一句話其實說的就是使用了acknowledge機制狀況下,使用prefetch_count進行流量控制。另外在實際研究過程當中發現還有channel.flow以及basic.credit(應該屬於AMQP 1.0協議)能夠進行一些控制,這裏沒有展開,有時間會研究下相應的機制以及可能使用的場景。