7、消費消息與性能權衡(讀書筆記與我的實踐)

摘要

主要介紹消費消息時的幾種方式:前端

  • 平衡消息消費的可靠性與性能;
  • 死信交換器;
  • 設置自動刪除隊列、持久化隊列、TTL等;

消費性能

使用no-ack模式 

在消費消息時,負責消費的應用程序會發送一個Basic.Consumer請求,與該請求一塊兒發送的還有一個no-ack標誌。當這個標誌啓用時,它會告訴RabbitMQ消費者在接收到消息時不會進行確認,RabbitMQ只管儘快的發送消息。java

使用no-ack標誌消費消息是讓RabbitMQ將消費投遞給消費者的最快方式,但這也是最不可靠的方式。

若是使用no-ack,那麼當有新的可用消息時,RabbitMQ將會發送該消息給消費者,而不用等待。實際上,若是有可用消息,RabbitMQ會持續向消費者發送它們,直到套接字緩衝區被填滿爲止。spring

目前沒有找到RabbitTemplate如何開啓no-ack的方法,若是有用過的朋友,請留言告訴我,謝謝。bash

消息確認模式

開啓消息確認模式,每次接收到消息後,都要向RabbitMQ返回一個Basic.Ack。服務器

消息確認有三種確認方式:性能

  • Ack;
  • Reject;
  • Nack;

基於RabbitTemplate,下面這段代碼,有對這幾種確認方式的實現,在配置文件中開啓手動確認模式,acknowledge-mode屬性爲manual(默認爲自動確認):測試

spring:
  #消息隊列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000ms
    listener:
      simple:
        acknowledge-mode: manual
/**
 * 消費者監聽消息隊列
 */
@Component
@Slf4j
@RabbitListener(queues = "DIRECT_QUEUE")
public class DirectQueueListener {

    @RabbitHandler
    public void process(String message,
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
        log.info("消費消息成功: {}", message);
        Thread.sleep(1000);
        switch (message) {
            case "nack":
                channel.basicNack(tag, true, false); // 第二個參數控制是否開啓批量拒絕,第三個參數表示是否requeue
                break;
            case "nack-requeue":
                channel.basicNack(tag, true, true);
                break;
            case "reject":
                channel.basicReject(tag, false);
                break;
            case "reject-requeue": // 啓用了requeue,若是隻有一個消費者,容易形成死循環
                channel.basicReject(tag, true);
                break;
            default:
                channel.basicAck(tag, true);
                break;
        }
    }

}

channel.basicAck:當正常消費消息時,調用該方法。fetch

咱們看到除了basicAck,還有basicReject和basicNack。這兩種,顧名思義,是用來拒絕消費的。ui

channel.basicReject:從協議層面上,reject是發送一個Basic.Reject響應,告知RabbitMQ沒法對這條消息進行處理,當拒絕時,能夠指定是否丟棄消息或使用requeue標誌從新發送消息。當啓用requeue時,RabbitMQ將會把這條消息從新放回到隊列中。spa

不能使用basicReject一次拒絕多個消息。

channel.basicNack:Basic.Nack實現與Basic.Reject相同的行爲,但添加了批量拒絕的功能。

設置multiple或requeue如圖所示:

服務質量確認模式

AMQP規範要求信道要有服務質量設置,即在確認消息接收以前能夠預先接收必定數量的消息。能夠設置一個預取數量來實現高效的發送消息。

若是消費者應用程序在確認消息以前崩潰,在套接字關閉時,全部預取的消息將返回到隊列。

若是設置了no-ack,那麼預取大小將被忽略。

使用RabbitTemplate時,能夠在消費者應用程序的配置文件中配置預取大小:

spring:
  #消息隊列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000ms
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1000

其中prefetch就是預取大小,消費者應用程序運行後,能夠在RabbitMQ的控制檯看到這個設置:

若是熟悉抓包軟件的朋友,能夠試着抓包看看:

我預先發送了2條消息到RabbitMQ,能夠看到上圖中最後兩行是兩個Ack。

有一種方式能夠一次確認多個消息,Basic.Ack響應具備一個multiple屬性,當把它設置爲true時就能確認之前未確認的消息。

若是使用multiple,當成功的接收了一些消息,而且應用程序在回覆Ack以前就發生了異常,則全部爲確認的消息將返回隊列。

死信交換器

RabbitMQ的死信交換器是一種能夠拒絕已投遞消息的可選行爲,通常有三種狀況的消息會進入死信隊列:

  • 當拒絕了一個不從新發送的消息時,會進入死信;
  • 當消息的TTL到期時,會進入死信;
  • 當隊列已滿時,會進入死信;
死信與備用交換器不一樣,過時或被拒絕的消息經過死信交換器進行投遞,而備用交換器則路由那些沒法由RabbitMQ路由的消息。

在RabbitMQ中,在聲明隊列時,指定死信交換器:

/**
     * 聲明隊列。
     * 同時指定死信隊列。
     *
     * @return Queue對象。
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

死信交換器還容許使用預先指定的值覆蓋路由鍵,這樣能夠容許同一個交換器同時處理死信和非死信消息,但須要確保死信消息不被投遞到相同的隊列。設置預約義路由鍵的關鍵字是:x-dead-letter-routing-key。

測試死信隊列,當消費者拒絕時,查看消息是否會進入死信隊列:

控制隊列

定義隊列時,有多個設置能夠肯定隊列的行爲:

  • 自動刪除本身;
  • 只容許一個消費者進行消費;
  • 自動過時消息;
  • 保持有限數量的消息;
  • 將舊消息推出堆棧;

更改隊列的設置,必須刪除隊列並從新建立它。

臨時隊列

也能夠叫作自動刪除的隊列。

一旦消費者完成鏈接和檢索消息,在斷開鏈接時隊列將被刪除。

@Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").autoDelete()
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

隊列只會在沒有消費者監聽的時候自行刪除。

只容許單個消費者

在須要確保只有單個消費者可以消費隊列中的消息時,在建立隊列時設置exclusive屬性,啓用後在消費者斷開鏈接後,隊列也會自動刪除。

@Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").exclusive()
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

聲明exclusive的隊列,只能被聲明時所指定的同一個鏈接和信道所消費,當建立隊列的信道關閉時,獨佔隊列也將會刪除。在信道關閉之間,能夠屢次聲明和取消exclusive隊列的消費者。

自動過時隊列

若是一段時間沒有使用該隊列,就刪除它。

建立一個自動過時的隊列很是簡單,要作的事情就是使用x-expires參數聲明一個隊列。該參數以毫秒爲單位設置隊列的生存時間(Time To Live,TTL)。

自動過時隊列有一些嚴格的約定:

  • 隊列只有在沒有消費者的狀況下才會過時。若是有鏈接消費者,則只有發出Basic.Cancel或斷開鏈接以後才自動刪除;
  • 隊列只有在TTL週期以內沒有收到Basic.Get請求時纔會到期。一旦一個Basic.Get請求中已經包含了一個具備過時值的隊列,那麼過時設置無效,隊列不會被自動刪除(不要使用Get);
  • 不能從新聲明或更改x-expires屬性;
  • RabbitMQ不保證過時刪除隊列這一過程的時效性;

永久隊列

使用durable標誌告訴RabbitMQ但願隊列配置被保存在服務器:

@Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

消息級別的TTL

消息級別的TTL設置容許服務器對消息的最大生存時間進行限制。聲明隊列的同時指定死信交換器和TTL值將致使該隊列中已到期的消息成爲死信消息。

可使用x-message-ttl設置隊列的消息TTL時間。

最大長度隊列

從RabbitMQ3.1.0開始,能夠在聲明隊列時指定最大長度。若是在隊列上設置列x-max-length參數,一旦消息到達最大值,RabbitMQ會在添加新消息時刪除位於隊列前端的消息,若是聲明隊列時候,指定列死信交換器,則從隊列前端刪除的消息會進入死信隊列。

相關文章
相關標籤/搜索