RabbitMQ AMQP 消息模型攻略

近期對消息隊列比較感興趣, 所以特地看了一下 RabbitMQ 相關的知識, 不過在學 RabbitMQ 時, 對 AMQP 的消息模型老是理解的不透徹, 因而在官網上找了一篇介紹 AMQP 消息模型的文章, 詳細地看了一下.
仍是要感嘆一下啊, 官網的文章果真是最權威的, 看了之後有了不小的收穫.
下面是我學習 AMQP 消息模型時的記錄, 其內容大部分是翻譯自官網, 部分添加了本身的理解.html

原文

https://www.rabbitmq.com/tuto...算法

AMQP 消息模型簡介

AMQP 的消息模型以下圖所示:segmentfault

clipboard.png

經過此圖咱們能夠知道, 一個消息的發送流程有以下幾個步驟:緩存

  1. 消息生產者將消息發佈(Public)到 Exchange 中.多線程

  2. Exchange 根據隊列的綁定關係將消息分發到不一樣的 Queue 中.負載均衡

  3. AMQP broker 根據訂閱規則將消息發送給消費者 或 消費者自行根據須要從消息隊列中獲取消息.ide

Exchange 和 Exchange 類型

Exchange 的主要任務是接收消息並將消息路由到0個或多個 Queue 中, 而路由的算法受 Exchange 類型和綁定(binding) 關係的影響. AMQP 0-9-1 broker 提供以下四個 exchange 類型:學習

類型 默認預約義的名字
Direct Exchange 空字符串和 amq.direct
Fanout Exchange amq.fanout
Topic Exchange amq.topic
Headers Exchange amq.match(在 RabbitMQ 中, 額外提供amq.headers)

每一個 Exchange 都有以下幾個屬性:spa

  • Name, Exchange 的 名字線程

  • Durability, 是不是持久的 Exchange, 當爲真時, broker 重啓後也會保留此 Exchange

  • Auto-delete, 當爲真時, 若是全部綁定的的 Queue 都再也不使用時, 此 Exchange 會自動刪除

關於默認 Exchange

默認的 exchange 是一個由 broker 預建立的匿名的(即名字是空字符串) direct exchagne. 對於簡單的程序來講, 默認的 exchange 有一個實用的屬性: 若是沒有顯示地綁定 Exchnge, 那麼建立的每一個 queue 都會自動綁定到這個默認的 exchagne 中, 而且此時這個 queue 的 route key 就是這個queue 的名字.

例如當咱們聲明瞭一個名爲 "search-indexing-online" 的 queue, 那麼 AMQP broker 會以 "search-indexing-online" 做爲 route key 將此 queue 綁定到默認的 exchange 中. 所以當一個消息以 route key 爲 "search-indexing-online" 投遞到默認的 exchange 中時, 此消息就會被路由到這個 queue 中去. 換句話說, 因爲有默認的 exchagne 的存在, 咱們就好像能夠直接將消息投遞到指定的 queue 中去而不須要通過 exchange 同樣.
例如:
Send:

public class Send {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
        
        channel.close();
        connection.close();
    }
}

Recv:

public class Recv {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

在這個例子中, 咱們並無定義 exchange, 也沒有顯示地將 queue 綁定到 exchange 中, 所以 queue "hello" 就自動綁定到默認的 exchange 中了, 而且在默認的 exchange 中, 其 route key 和 queue 名一致, 即 "hello".
因爲這個緣由, 咱們就可使用:

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

來發送消息. 調用 channel.basicPublish 時, 第一個參數是 exchange 名, 爲空就是默認的 exchange, 第二個參數是 route key, 和 queue 名相同.

direct exchange

direct exchange 可使用以下圖表示:

clipboard.png

direct exchange 根據消息的 route key 來將消息分發到不一樣的 queue 中. direct exchange 適合用於消息的單播發送. direct exchange 的工做流程以下:

  • 一個 queue 使用 K 做爲 route key 綁定到 direct exchange 中.

  • 當direct exchange 收到一個 route key 爲 R 的消息時, 若是 R == K, 則此 exchange 會將此消息路由到此 queue 中.

direct exchange 常常用於在多個 worker 中分配任務(即一個 Master 和多個相同的 Slave). 當使用這個模型時, 須要注意的是:

When doing so, it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers and not between queues.

即 AMQP 0-9-1 的負載均衡是以consumer爲單位的, 而不是以 queue 爲單位.

fanout exchange

一個 fanout exchange 會將消息分發給全部綁定到此 exchange 的 queue 中, 而不會考慮 queue 的 route key. 即若是有 N 個 Queue 綁定到一個 fanout exchange 時, 那麼當此 exchange 收到消息時, 會將此消息分發到這 N 個 queue 中. 因爲此性質, fanout exchange 也經常使用消息的廣播(broadcast).
fanout 可使用下圖表示:

clipboard.png

topic exchange

topic exchange 會根據 route key 將消息分發到與此消息的 route key 相匹配的而且綁定到此 exchagne 中的 queue 中(若是有多個 queue 使用了相同的 route key 綁定到此 exchange, 那麼這些 queue 都會收到消息). 根據此性質, topic exchange 常常用於實現 publish/subscribe 模型, 即消息的多播模型.

header exchange

header exchange 不使用 route key 做爲路由的依據, 而是使用消息頭屬性來路由消息.

Queue

AMQP 中的 隊列 的概念和其餘消息隊列中 隊列 的概念相似, 它有以下幾個重要的概念:

  • Name, 名字

  • Durable, 是不是持久的. 當爲真時, 即便 broker 重啓時, 此 queue 也不會被刪除.

  • Exclusive, 是不是獨佔的, 當爲真時, 表示此 queue 只能有一個消費者, 而且當此消費者的鏈接斷開時, 此 queue 會被刪除.

  • Auto-delete, 當爲真時, 此 隊列 會在最後一個消費者取消訂閱時被刪除.

在使用一個 隊列 時, 須要先進行聲明. 若是咱們聲明的隊列不存在, 那麼 broker 就會自動建立它. 不過若是此隊列已經存在時, 咱們就須要注意了, 若咱們聲明的隊列的屬性和已存在的隊列的屬性一致, 則不會有任何的問題, 可是若是前後兩次聲明的隊列的屬性不一致, 則會有 PRECONDITION_FAILED 錯誤(錯誤碼爲406).

關於隊列名

AMQP 的隊列名不能以 "amq." 開頭, 由於這樣的隊列名是 AMQP broker 內部所使用的. 當咱們使用了這樣的隊列名時, 那麼會有一個 ACCESS_REFUSED 錯誤(錯誤碼爲 403)

關於持久隊列

持久隊列會被持久化到磁盤中, 所以即便 broker 重啓了, 持久隊列也依然存在.
不過須要注意的是, 不要將持久隊列和消息的持久化混淆. 當 broker 重啓時, 持久隊列會自動從新聲明, 然而只有隊列中的持久化消息(persistent message)纔會被恢復.

隊列的綁定

隊列的綁定關係是 exchagne 用於消息路由的規則, 即一個 exchange 可以將消息路由到某個隊列的前提是此隊列已經綁定到這個 exchange 中了. 當隊列綁定到一個 exchange 中時, 咱們還能夠設置一個額外的參數, 即 route key, 這個 key 會被 direct exchange 和 topic exchange 做爲額外的路由信息而使用, 換句話說, route key 扮演着過濾器的角色.
當一個消息沒有被路由到任意的隊列時(例如此 exchange 沒有任何的 queue 綁定着), 那麼此時會根據消息的屬性來決定是將此消息丟棄仍是返回給生產者.

消費者

AMQP 0-9-1 支持兩種消息分發模式:

  • push 模式, 即 broker 主動推送消息給消費者

  • pull 模式, 即消費者主動從 broker 中拉取消息.

在 push 模式時, 應用程序須要告知 broker 它對哪些消息感興趣, 即也就是咱們所說的訂閱一個消息主題. 每一個消費者都有一個唯一的標識符, 即consumer tag, 咱們能夠用這個 tag 來取消一個消費者對某個主題的訂閱(unsubscribe).

消息的 ACK

AMQP 0-9-1 有兩種消息 ACK 模式:

  • 自動 ACK 模式

  • 手動 ACK 模式

在自動 ACK 模式下, 當 broker 發送消息成功後, 會當即將此消息從消息隊列中刪除, 而不會等待消費者的 ACK 回覆. 而在手動 ACK 模式下, 當 broker 發送消息給消費者時, 不會當即將此消息刪除, 而是須要等待消費者的 ACK 回覆後纔會刪除消息. 所以在手動 ACK 模式下, 當消費者收到消息並處理完成後, 須要向 broker 顯示地發送 ACK 指令.
在手動 ACK 模式下, 若是消費者由於意外的 crash 而沒有發送 ACK 給 broker, 那麼此時 broker 會將此消息轉發給其餘的消費者(若是此時沒有消費者了, 那麼 broker 會緩存此消息, 直到有新的消費者註冊).

拒絕消息

當一個消費者處理消息失敗或此時不能處理消息時, 那麼能夠給 broker 發送一個拒絕消息的指令, 而且能夠要求 broker 丟棄或從新分發此消息.
不過須要注意的是, 若是此時只有一個消費者, 那麼當此消費者拒收消息並要求 broker 從新分發此消息時, 那麼就會形成了此消息不斷地分發和拒收, 造成了死循環.

預取消息

經過預取消息機制, 消費者能夠一次性批量取出消息, 而後在處理後對這些批量消息進行統一的 ACK 回覆, 這樣能夠提升消息的吞吐量.
不過, 須要注意的時, RabbitMQ 僅支持 channel 級別的預取消息的數量配置, 不支持基於鏈接的預取消息數量配置.

鏈接

AMQP 的鏈接是長鏈接, 它是一個使用 TCP 做爲可靠傳輸的應用層協議.

通道(Channel)

AMQP 不推薦一個應用程序發起多個對 broker 的鏈接, 由於這樣會消耗系統資源而且也不利於防火牆的配置. 可是若是應用程序確實須要有多個不互相干擾的鏈接來進行不一樣的操做時該怎麼辦呢? 爲了解決這個問題, AMQP 引入了 Channel 的 概念. 在 AMQP 0-9-1 中, 一個與 broker 的鏈接是被多個 Channel 複用的, 所以咱們能夠將 channel 理解爲: 一個共享同一個 TCP 鏈接的輕量級的鏈接.

基於同一個 TCP 鏈接的兩個不一樣的 channel 直接是不會有任何的干擾的(在邏輯上能夠等效地理解爲兩個獨立的鏈接), 所以客戶端和 broker 之間交互時, 須要附帶上 channel id.
一般來講, 在一個多線程消費消息的模型中, 每一個線程單獨打開一個 channel 是一個推薦的作法, 而最好不要在各個線程中共享一個 channel.

Virtual host

爲了在一個 broker 中實現不一樣的相互隔離的環境(例如每一個環境中有不一樣的用戶, 不一樣的 exchange, 不一樣的隊列等), AMQP 引入了一個叫作 virtual host(vhost) 的概念. 在鏈接 broker 時, 客戶端能夠指定須要使用哪一個 vhost.

本文由 yongshun 發表於我的博客, 採用 署名-相同方式共享 3.0 中國大陸許可協議.
Email: yongshun1228@gmail.com
本文標題爲: RabbitMQ AMQP 消息模型攻略
本文連接爲: http://www.javashuo.com/article/p-dehffbkq-kd.html

相關文章
相關標籤/搜索