RabbitMQ - 隊列

AMQP消息路由必須包含三部分,交換器、隊列、綁定。以下圖所示,生產者把消息發送給交換器,交換器再路由到符合條件的隊列上,最終被消費者接收。綁定決定了消息如何從路由器路由到相應的隊列。這一篇,主要是瞭解一下隊列。
image.pngweb

相關概念

當新增隊列的時候,須要定義一下4中屬性,分佈是Name、Durability、Auto delete、Arguments。
image.png服務器

  • Name:隊列名稱,不能用amq.開頭命名。
  • Durability:持久化,若是爲durable,那broker重啓不會丟失,若是爲transient,broker重啓後會丟失。(win系統僅僅重啓rabbitmq是不行的)
  • Auto delete:最後一個消費者退訂時被自動刪除
  • Arguments:隊列的其餘參數,如上圖,好比消息的TTL等。

定義一個隊列的方法以下,exclusive的參數,下面的臨時隊列會說明。負載均衡

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

命名

咱們在使用隊列以前,須要先聲明隊列。若是隊列不存在,則建立隊列。若是已存在則不會建立,可是和已存在的隊列屬性不一致,則會有406 (PRECONDITION_FAILED)的通道級異常。異步

參數設置

參數的設置有兩種,一種是經過分組,一個是一個個隊列設置。分組的方式更加靈活、非侵入性,不須要修改和從新部署應用程序,是官方推薦的方式。參數的描述以下:spa

參數 描述
x-message-ttl 消息的存活時間,單位爲毫秒
x-expires 隊列的存活時間,單位爲毫秒
x-max-length 隊列的最大消息數
x-max-length-bytes 消息的最大字節數
x-overflow 消息達到最大數的策略,drop-head或者reject-publish
x-dead-letter-exchange 死信隊列的交換器
x-dead-letter-routing-key 死信隊列的路由鍵
x-max-priority 消息的優先級設置
x-queue-mode 消息的延遲
x-queue-master-locator 用於主從

臨時隊列

當咱們須要一個臨時隊列的時候,咱們能夠先定義隊列,使用完再刪除,或者直接定義Durability的屬性爲transient,等broker重啓的時候就消失,可是感受沒有很方便。特別是使用後刪除,若是客戶端失敗,這個隊列就一直存在。咱們能夠用如下方法來自動刪除:code

  • Exclusive:獨佔隊列
  • x-expires:設置隊列的過時時間
  • Auto delete:隊列設置自動刪除
public static void main(String[] args) throws IOException, TimeoutException {
    // 聲明一個鏈接工廠
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一個與rabbitmq服務器的鏈接
    Connection connection = factory.newConnection();
    // 建立一個Channel
    Channel channel = connection.createChannel();
    // 經過Channel定義隊列
    channel.queueDeclare("queue1", false, true, false, null);
    channel.queueDeclare("queue2", false, false, true, null);
    channel.basicConsume("queue2", true, null, consumerTag -> {
    });
    Map<String, Object> arguments = new HashMap<String, Object>();
    arguments.put("x-expires",5000);
    channel.queueDeclare("queue3", false, false, false, arguments);
}

queue1是獨佔隊列,queue2是自動刪除,queue3設置了5秒的過時時間。
運行後以下圖,五秒後queue3消失,中止程序運行,queue1和queue2消失。
須要注意的是,若是把queue2的basicConsume方法調用註釋掉,因爲沒有消費者,隊列並不會消失。
獨佔隊列只能由其聲明鏈接使用(從聲明鏈接使用、清除、刪除等)。其餘隊列若是想使用獨佔隊列將致使通道級異常RESOURCE_LOCKED,該異常帶有一條錯誤消息,代表沒法得到對鎖定隊列的獨佔訪問。
image.pngrabbitmq

推模式和拉模式

消費者經過兩種方式來接收消息:隊列

  • 經過consume訂閱,是隊列往消費端推送消息。只要隊列有消息,就能夠持續收到。
  • 經過get獲取消息,是消費者主動從隊列拉取消息,每次只能獲取一條,若是想獲取更多消息,能夠用while循環。可是吞吐量相對比較低。

咱們先往隊列裏發送消息,其中queue1發送4條,queue2發送3條。這邊還沒講到如何發送消息,能夠經過http://127.0.0.1:15672/的web控制檯來發送消息。
image.png
queue1是拉模式,queue2是推模式。
image.png
queue1的代碼:路由

public static void main(String[] args) throws IOException, TimeoutException {
    // 聲明一個鏈接工廠
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一個與rabbitmq服務器的鏈接
    Connection connection = factory.newConnection();
    // 建立一個Channel
    Channel channel = connection.createChannel();
    // 經過Channel定義隊列
    channel.queueDeclare("queue1", false, false, false, null);
    GetResponse response = channel.basicGet("queue1", true);
    System.out.println("queue1 Received '" + new String(response.getBody()) + "'");
    GetResponse response2 = channel.basicGet("queue1", true);
    System.out.println("queue1 Received '" + new String(response.getBody()) + "'");
}

運行結果以下,調用get兩次。
image.png
queue2的代碼:部署

public static void main(String[] args) throws IOException, TimeoutException {
    // 聲明一個鏈接工廠
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一個與rabbitmq服務器的鏈接
    Connection connection = factory.newConnection();
    // 建立一個Channel
    Channel channel = connection.createChannel();
    // 經過Channel定義隊列
    channel.queueDeclare("queue2", false, false, false, null);
    // 異步回調處理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("queue2 Received '" + message + "'");
    };
    // 接收消息
    channel.basicConsume("queue2", true, deliverCallback, consumerTag -> {
    });
}

調用consume一次。
image.png
最終結果以下:queue1由於get兩次,因此還有2條消息。queue2的3條消息都消費完。
image.png

多個消費者

在上面的例子中,咱們先把消息發送給隊列,此時沒有消費者,消息就會在隊列裏一直等。那若是有消費者,且有多個消費者,消息是如何發佈的呢?
咱們啓動上面的queue2應用程序兩次,再發送消息,能夠看到,兩個應用程序是交替消費數據的。整個步驟以下:

  1. 消息A發送到queue2隊列
  2. rabbitmq把消息A發送給消費者1
  3. 消費者1收到消息A並確認
  4. rabbitmq把消息A從queue2隊列刪除
  5. 消息B發送到queue2隊列
  6. rabbitmq把消息B發送給消費者2
  7. 消費者2收到消息B並確認
  8. rabbitmq把消息B從queue2隊列刪除

寫在最後

隊列的建立究竟是消費者仍是生產者呢?咱們上面的例子都是消費者建立的,可是若是隊列還沒建立,生產者就開始往不存在的隊列發送消息,消息就會丟失。因此爲了消息可以正確的到達隊列,須要生產者和消費者都要嘗試去建立隊列,除非消息不那麼重要,能夠消費者來建立。
隊列是AMQP是消息通訊的基礎模塊:

  • 爲消息提供了住所,消息在此等待消費
  • 對於負載均衡來講,消息是絕佳方案。只須要一直增長消費者就能夠。
  • 隊列是消息的終點
相關文章
相關標籤/搜索