【RabbitMQ】Concurrency、Prefetch、exclusive

分佈式消息中間件

RabbitMQ是用Erlang語言編寫的分佈式消息中間件,經常用在大型網站中做爲消息隊列來使用,主要目的是各個子系統之間的解耦和異步處理。消息中間件的基本模型是典型的生產者-消費者模型,生產者發送消息到消息隊列,消費者監聽消息隊列,收到消息後消費處理。java

在使用RabbitMQ作消息分發時,主要有三個概念要注意:Exchange,RoutingKey,Queue。spring

Exchange能夠理解爲交換器,RoutingKey能夠理解爲路由,Queue做爲真實存儲消息的隊列和某個Exchange綁定,具體如何路由到感興趣的Queue則由Exchange的三種模式決定:多線程

  • fanout
  • topic
  • direct

Exchange爲fanout時,生產者往此Exchange發送的消息會發給每一個和其綁定的Queue,此時RoutingKey並不起做用;Exchange爲topic時,生產者能夠指定一個支持通配符的RoutingKey(如demo.*)發向此Exchange,凡是Exchange上RoutingKey知足此通配符的Queue就會收到消息;direct類型的Exchange是最直接最簡單的,生產者指定Exchange和RoutingKey,而後往其發送消息,消息只能被綁定的知足RoutingKey的Queue接受消息。(一般若是不指定RoutingKey的具體名字,那麼默認的名字實際上是Queue的名字)併發

Concurrency與Prefetch

在一般的使用中(Java項目),咱們通常會結合spring-amqp框架來使用RabbitMQ,spring-amqp底層調用RabbitMQ的java client來和Broker交互,好比咱們會用以下配置來創建RabbitMQ的鏈接池、聲明Queue以及指明監聽者的監聽行爲:框架

<rabbit:connection-factory id="connectionFactory" /> <!-- template非必須,主要用於生產者發送消息--> <rabbit:template id="template" connection-factory="connectionFactory" /> <rabbit:queue name="remoting.queue" /> <rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3"> <rabbit:listener ref="listener" queue-names="remoting.queue" /> </rabbit:listener-container> 

listener-container能夠設置消費者在監聽Queue的時候的各類參數,其中concurrency和prefetch是本篇文章比較關心的兩個參數,如下是spring-amqp文檔的解釋:異步

prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSizesocket

concurrentConsumers(concurrency)分佈式

The number of concurrent consumers to initially start for each listener.ide

簡單解釋下就是concurrency設置的是對每一個listener在初始化的時候設置的併發消費者的個數,prefetch是每次從一次性從broker裏面取的待消費的消息的個數,上面的配置在監控後臺看到的效果以下:函數


 
 

圖中能夠看出有兩個消費者同時監聽Queue,可是注意這裏的消息只有被一個消費者消費掉就會自動ack,另一個消費者就不會再獲取到此消息,Prefetch Count爲配置設置的值3,意味着每一個消費者每次會預取3個消息準備消費。每一個消費者對應的listener有個Exclusive參數,默認爲false, 若是設置爲true,concurrency就必須設置爲1,即只能單個消費者消費隊列裏的消息,適用於必須嚴格執行消息隊列的消費順序(先進先出)。

源碼剖析

這裏concurrency的實現方式不看源碼也能猜到,確定是用多線程的方式來實現的,此時同一進程下打開的本地端口都是56278.下面看看listener-contaner對應的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源碼:

protected int initializeConsumers() { int count = 0; synchronized (this.consumersMonitor) { if (this.consumers == null) { this.cancellationLock.reset(); this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers); for (int i = 0; i < this.concurrentConsumers; i++) { BlockingQueueConsumer consumer = createBlockingQueueConsumer(); this.consumers.put(consumer, true); count++; } } } return count; } 

container啓動的時候會根據設置的concurrency的值(同時不超過最大值)建立n個BlockingQueueConsumer。

protected void doStart() throws Exception { //some code synchronized (this.consumersMonitor) { int newConsumers = initializeConsumers(); //some code Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>(); for (BlockingQueueConsumer consumer : this.consumers.keySet()) { AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); this.taskExecutor.execute(processor); } //some code } } 

在doStart()方法中調用initializeConsumers來初始化全部的消費者,AsyncMessageProcessingConsumer做爲真實的處理器包裝了BlockingQueueConsumer,而AsyncMessageProcessingConsumer其實實現了Runnable接口,由this.taskExecutor.execute(processor)來啓動消費者線程。

private final class AsyncMessageProcessingConsumer implements Runnable { private final BlockingQueueConsumer consumer; private final CountDownLatch start; private volatile FatalListenerStartupException startupException; private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) { this.consumer = consumer; this.start = new CountDownLatch(1); } //some code @Override public void run() { //some code } } 

那麼prefetch的值意味着什麼呢?其實從名字上大體能看出,BlockingQueueConsumer內部應該維護了一個阻塞隊列BlockingQueue,prefetch應該是這個阻塞隊列的長度,看下BlockingQueueConsumer內部有個queue,這個queue不是對應RabbitMQ的隊列,而是Consumer本身維護的內存級別的隊列,用來暫時存儲從RabbitMQ中取出來的消息:

private final BlockingQueue<Delivery> queue; public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean exclusive, String... queues) { //some code this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount); } 

BlockingQueueConsumer的構造函數清楚說明了每一個消費者內部的隊列大小就是prefetch的大小。

業務問題

前面說過,設置併發的時候,要考慮具體的業務場景,對那種對消息的順序有苛刻要求的場景不適合併發消費,而對於其餘場景,好比用戶註冊後給用戶發個提示短信,是不太在乎哪一個消息先被消費,哪一個消息後被消費,由於每一個消息是相對獨立的,後註冊的用戶先收到短信也並無太大影響。

設置併發消費除了能提升消費的速度,還有另一個好處:當某個消費者長期阻塞,此時在當前消費者內部的BlockingQueue的消息也會被一直阻塞,可是新來的消息仍然能夠投遞給其餘消費者消費,這種狀況頂多會致使prefetch個數目的消息消費有問題,而不至於單消費者狀況下整個RabbitMQ的隊列會由於一個消息有問題而所有堵死。全部在合適的業務場景下,須要合理設置concurrency和prefetch值。

轉自:https://www.jianshu.com/p/04a1d36f52ba

相關文章
相關標籤/搜索