RabbitMQ拉模式批量消費消息

實現RabbitMQ的消費者有兩種模式,推模式(Push)和拉模式(Pull)。html

實現推模式推薦的方式是繼承 DefaultConsumer 基類,也能夠使用Spring AMQP的 SimpleMessageListenerContainer 。java

推模式是最經常使用的,可是有些狀況下推模式並不適用的,好比說:sql

  • 因爲某些限制,消費者在某個條件成立時才能消費消息架構

  • 須要批量拉取消息進行處理併發

 

實現拉模式分佈式

RabbitMQ的Channel提供了 basicGet 方法用於拉取消息。ide

/**
 * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
 * @see com.rabbitmq.client.AMQP.Basic.Get
 * @see com.rabbitmq.client.AMQP.Basic.GetOk
 * @see com.rabbitmq.client.AMQP.Basic.GetEmpty
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @return a {@link GetResponse} containing the retrieved message data
 * @throws java.io.IOException if an error is encountered
 */
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

basicGet 返回 GetResponse 類。高併發

public class GetResponse {
    private final Envelope envelope;
    private final BasicProperties props;
    private final byte[] body;
    private final int messageCount;
    
    // ...

rabbitmq-client版本4.0.3性能

使用 basicGet 拉取消息須要注意:學習

basicGet
DefaultConsumer

示例代碼:

private void consume(Channel channel) throws IOException, InterruptedException {
    while (true) {
        if (!isConditionSatisfied()) {
            TimeUnit.MILLISECONDS.sleep(1);
            continue;
        }
        GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false);
        if (response == null) {
            TimeUnit.MILLISECONDS.sleep(1);
            continue;
        }
        String data = new String(response.getBody());
        logger.info("Get message <= {}", data);
        channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
    }
}

 

批量拉取消息

RabbitMQ支持客戶端批量拉取消息,客戶端能夠連續調用 basicGet 方法拉取多條消息,處理完成以後一次性ACK。須要注意:

basicGet
basicAck

示例代碼:

String bridgeQueueName = extractorProperties.getBridgeQueueName();
int batchSize = extractorProperties.getBatchSize();
List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize);
long tag = 0;
while (responseList.size() < batchSize) {
    GetResponse getResponse = channel.basicGet(bridgeQueueName, false);
    if (getResponse == null) {
        break;
    }
    responseList.add(getResponse);
    tag = getResponse.getEnvelope().getDeliveryTag();
}
if (responseList.isEmpty()) {
    TimeUnit.MILLISECONDS.sleep(1);
} else {
    logger.info("Get <{}> responses this batch", responseList.size());
    // handle messages
    channel.basicAck(tag, true);
}

 

關於QueueingConsumer

QueueingConsumer 在客戶端本地使用 BlockingQueue 緩衝消息,其nextDelivery方法也能夠用於實現拉模式(其本質上是 BlockingQueue.take ),可是 QueueingConsumer 如今已經標記爲Deprecated。

歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索