大廠如何用RabbitMQ作消費端限流

1 消息過載

假設RabbitMQ服務器有上萬條未處理消息,隨便打開一個消費端,會形成巨量消息瞬間所有推送過來,然而咱們單個客戶端沒法同時處理這麼多數據。java

還好比說單個Pro一分鐘產生了幾百條數據,可是單個Con一分鐘可能只能處理60條,這時Pro-Con不平衡。一般Pro沒辦法作限制,因此Con就須要作一些限流措施,不然若是超出最大負載,可能致使Con性能降低,服務器卡頓甚至崩潰。服務器

所以,咱們須要Con限流。markdown

2 Con限流機制

RabbitMQ提供了一種qos (服務質量保證)功能,在非自動確認消息的前提下,若必定數目的消息 (經過基於Con或者channel設置Qos的值) 未被確認前,不消費新的消息。性能

不能設置自動簽收功能(autoAck = false) 若是消息未被確認,就不會到達Con,目的就是給Pro減壓fetch

限流設置API

basicQos

  • QoS

請求特定設置「服務質量(quality of service)」。 這些設置強加數據的服務器將須要確認以前,爲消費者發送的消息數量限制。 所以,他們提供消費者發起的流量控制的一種手段。 ui

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
複製代碼
  • prefetchSize: 單條消息的大小限制,Con一般設置爲0,表示不作限制
  • prefetchCount: 一次最多能處理多少條消息
  • global: 是否將上面設置true應用於channel級別仍是取false表明Con級別

prefetchSize和global這兩項,RabbitMQ沒有實現,不研究 prefetchCount在 autoAck=false 的狀況下生效,即在自動應答的狀況下該值無效,因此必須手工ACK。spa

void basicAck(Integer deliveryTag,boolean multiple) 複製代碼

調用該方法就會主動回送給Broker一個應答,表示這條消息我處理完了,你能夠給我下一條了。參數multiple表示是否批量簽收,因爲咱們是一次處理一條消息,因此設置爲false。code

3 代碼實戰

  • 自定義Con

  • Con

  • Pro

  • 啓動Con,查看管控臺

  • 啓動Pro,開始發送消息,Con接收消息

  • 實現限流,僅僅處理一條消息,其他的都在等待
  • 如今,咱們開啓ACK應答處理

  • 從新啓動Con,發現剩餘的2條消息也全都發送並接收了!
  • 咱們以前是註釋掉手工ACK方法,而後啓動消費端和生產端,當時Con只打印一條消息,這是由於咱們設置了手工簽收,而且設置了一次只處理一條消息,當咱們沒有回送ACK應答時,Broker端就認爲Con尚未處理完這條消息,基於這種限流機制就不會給Con發送新的消息了,因此Con那時只打印了一條消息

相關文章
相關標籤/搜索