AMQP消息路由必須包含三部分,交換器、隊列、綁定。以下圖所示,生產者把消息發送給交換器,交換器再路由到符合條件的隊列上,最終被消費者接收。綁定決定了消息如何從路由器路由到相應的隊列。這一篇,主要是瞭解一下隊列。web
當新增隊列的時候,須要定義一下4中屬性,分佈是Name、Durability、Auto delete、Arguments。服務器
amq.
開頭命名。定義一個隊列的方法以下,exclusive的參數,下面的臨時隊列會說明。負載均衡
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
咱們在使用隊列以前,須要先聲明隊列。若是隊列不存在,則建立隊列。若是已存在則不會建立,可是和已存在的隊列屬性不一致,則會有406 (PRECONDITION_FAILED)的通道級異常。異步
參數的設置有兩種,一種是經過分組,一個是一個個隊列設置。分組的方式更加靈活、非侵入性,不須要修改和從新部署應用程序,是官方推薦的方式。參數的描述以下:spa
參數 | 描述 |
---|---|
x-message-ttl | 消息的存活時間,單位爲毫秒 |
x-expires | 隊列的存活時間,單位爲毫秒 |
x-max-length | 隊列的最大消息數 |
x-max-length-bytes | 消息的最大字節數 |
x-overflow | 消息達到最大數的策略,drop-head或者reject-publish |
x-dead-letter-exchange | 死信隊列的交換器 |
x-dead-letter-routing-key | 死信隊列的路由鍵 |
x-max-priority | 消息的優先級設置 |
x-queue-mode | 消息的延遲 |
x-queue-master-locator | 用於主從 |
當咱們須要一個臨時隊列的時候,咱們能夠先定義隊列,使用完再刪除,或者直接定義Durability的屬性爲transient,等broker重啓的時候就消失,可是感受沒有很方便。特別是使用後刪除,若是客戶端失敗,這個隊列就一直存在。咱們能夠用如下方法來自動刪除:code
public static void main(String[] args) throws IOException, TimeoutException { // 聲明一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 建立一個與rabbitmq服務器的鏈接 Connection connection = factory.newConnection(); // 建立一個Channel Channel channel = connection.createChannel(); // 經過Channel定義隊列 channel.queueDeclare("queue1", false, true, false, null); channel.queueDeclare("queue2", false, false, true, null); channel.basicConsume("queue2", true, null, consumerTag -> { }); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-expires",5000); channel.queueDeclare("queue3", false, false, false, arguments); }
queue1是獨佔隊列,queue2是自動刪除,queue3設置了5秒的過時時間。
運行後以下圖,五秒後queue3消失,中止程序運行,queue1和queue2消失。
須要注意的是,若是把queue2的basicConsume方法調用註釋掉,因爲沒有消費者,隊列並不會消失。
獨佔隊列只能由其聲明鏈接使用(從聲明鏈接使用、清除、刪除等)。其餘隊列若是想使用獨佔隊列將致使通道級異常RESOURCE_LOCKED,該異常帶有一條錯誤消息,代表沒法得到對鎖定隊列的獨佔訪問。rabbitmq
消費者經過兩種方式來接收消息:隊列
咱們先往隊列裏發送消息,其中queue1發送4條,queue2發送3條。這邊還沒講到如何發送消息,能夠經過http://127.0.0.1:15672/
的web控制檯來發送消息。
queue1是拉模式,queue2是推模式。
queue1的代碼:路由
public static void main(String[] args) throws IOException, TimeoutException { // 聲明一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 建立一個與rabbitmq服務器的鏈接 Connection connection = factory.newConnection(); // 建立一個Channel Channel channel = connection.createChannel(); // 經過Channel定義隊列 channel.queueDeclare("queue1", false, false, false, null); GetResponse response = channel.basicGet("queue1", true); System.out.println("queue1 Received '" + new String(response.getBody()) + "'"); GetResponse response2 = channel.basicGet("queue1", true); System.out.println("queue1 Received '" + new String(response.getBody()) + "'"); }
運行結果以下,調用get兩次。
queue2的代碼:部署
public static void main(String[] args) throws IOException, TimeoutException { // 聲明一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 建立一個與rabbitmq服務器的鏈接 Connection connection = factory.newConnection(); // 建立一個Channel Channel channel = connection.createChannel(); // 經過Channel定義隊列 channel.queueDeclare("queue2", false, false, false, null); // 異步回調處理 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("queue2 Received '" + message + "'"); }; // 接收消息 channel.basicConsume("queue2", true, deliverCallback, consumerTag -> { }); }
調用consume一次。
最終結果以下:queue1由於get兩次,因此還有2條消息。queue2的3條消息都消費完。
在上面的例子中,咱們先把消息發送給隊列,此時沒有消費者,消息就會在隊列裏一直等。那若是有消費者,且有多個消費者,消息是如何發佈的呢?
咱們啓動上面的queue2應用程序兩次,再發送消息,能夠看到,兩個應用程序是交替消費數據的。整個步驟以下:
隊列的建立究竟是消費者仍是生產者呢?咱們上面的例子都是消費者建立的,可是若是隊列還沒建立,生產者就開始往不存在的隊列發送消息,消息就會丟失。因此爲了消息可以正確的到達隊列,須要生產者和消費者都要嘗試去建立隊列,除非消息不那麼重要,能夠消費者來建立。
隊列是AMQP是消息通訊的基礎模塊: