消息中間件之RabbitMQ

初識RabbitMQ

RabbitMQ簡述

  • RabbitMQ——Rabbit Message Queue的簡寫,但不能僅僅理解其爲消息隊列,消息代理更合適。RabbitMQ 是一個由 Erlang 語言開發的AMQP(高級消息隊列協議)的開源實現
  • 消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合異步消息流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。
  • RabbitMQ做爲一個消息代理,主要和消息打交道,負責接收並轉發消息。
  • RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集羣和分佈式部署。活適用於排隊算法、秒殺動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景

RabbitMQ支持的協議

  • AMQP 0-9-10-90-8,**和擴展協議:**RabbitMQ最先開發就是爲了支持AMQP,因此該協議是Broker支持的最核心的協議。全部的版本基本上是相似的,但最新的版本描述比較不清晰或相比先前版本沒有多大改善。RabbitMQ用不一樣的方式擴展了AMQP 0-9-1。
  • STOMPSTOMP是一個很是簡單的基於文本的消息協議。它定義了不多的消息語法,它很是容易實現,而且實現一部分將會更容易(它是僅有的能夠手動使用telnet來操做的協議)。
  • MQTTMQTT是一個輕量級的 發佈/訂閱 消息機制的二進制協議,旨在用於低端設備的客戶端上。它很好的定義了 發佈/訂閱消息機制,介不支持其它消息機制。
  • HTTP:HTTP固然不是一個消息協議。RabbitMQ能夠經過如下三種方式來傳輸消息:
    • 管理插件支持一個簡單的HTTP API用於發送和接收消息。主要用於測試診斷的目 的,可是針對少許的消息來講仍是可靠的。
    • Web-STOMP插件使得,在瀏覽器上可 使用基於WebSockets、或者SockJS來控制消息。
    • JSON-RPC插件使瀏覽器經過 JSON-RPC和基於AMQP 0-9-1協議的消息進行通訊。注意JSON RPC是一個同步的協議, 基於異步傳輸的AMQP的一些功能將使用polling方式進行模擬。

RabbitMQ 特色

  • 可靠性(Reliability):RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。html

  • 靈活的路由(Flexible Routing):在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。java

  • 消息集羣(Clustering):多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。git

  • 高可用(Highly Available Queues):隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。github

  • 多種協議(Multi-protocol):RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。算法

  • 多語言客戶端(Many Clients):RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。數據庫

  • 管理界面(Management UI):RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。編程

  • 跟蹤機制(Tracing):若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。數組

  • 插件機制(Plugin System): RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。瀏覽器

AMQP協議

AMQP簡述

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件同產品,不一樣的開發語言等條件的限制。AMQP協議這種下降耦合的機制是基於與上層產品,語言無關的協議。是一種二進制協議,提供客戶端應用與消息中間件之間多通道、協商、異步、安全、中立和高效地交互。從總體來看,AMQP協議可劃分爲兩層:安全

  • Functional Layer(功能層)

    功能層,位於協議上層主要定義了一組命令(基於功能的邏輯分類),用於應用程序調用實現自身所需的業務邏輯。例如:應用程序能夠經過功能層定義隊列名稱,生產消息到指定隊列,消費指定隊列消息等基於(Message queues 模型)

    • AMQ 功能層設計驅動基於以下要求:
      • 使用二進制數據流壓縮和解壓,提升效率;
      • 能夠處理任意大小的消息,且不作任何限制;
      • 單個鏈接支持多個通訊通道;
      • 客戶端和服務端基於長連接實現,且無特殊限制;
      • 容許異步指令基於管道通訊;
      • 易擴展,基於新的需求和變化支持擴展;
      • 新版本向下兼容老版本;
      • 基於斷言模型,異常能夠快速定位修復;
      • 對編程語言保持中立;
      • 適應代碼發展演變;
  • Transport Layer(傳輸層)

    傳輸層,基於二進制數據流傳輸,用於將應用程序調用的指令傳回服務器,並返回結果,同時能夠處理信道複用,幀處理,內容編碼,心跳傳輸,數據傳輸和異常處理。傳輸層能夠被任意傳輸替換,只要不改變應用可見的功能層相關協議,也可使用相同的傳輸層,同時使用不一樣的高級協議

    • AMQP 傳輸層設計驅動給予以下要求:
      • 使用二進制數據流壓縮和解壓,提升效率;
      • 能夠處理任意大小的消息,且不作任何限制;
      • 單個鏈接支持多個通訊通道;
      • 客戶端和服務端基於長連接實現,且無特殊限制;
      • 容許異步指令基於管道通訊;
      • 易擴展,基於新的需求和變化支持擴展;
      • 新版本向下兼容老版本;
      • 基於斷言模型,異常能夠快速定位修復;
      • 對編程語言保持中立;
      • 適應代碼發展演變;

AMQP 模型

  • Broker: 接收和分發消息的應用,RabbitMQ Server就是Message Broker。
  • Virtual host: 出於多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,相似於網絡中的namespace概念。當多個不一樣的用戶使用同一個RabbitMQ server提供的服務時,能夠劃分出多個vhost,每一個用戶在本身的vhost建立exchange/queue等。
  • Connection: publisher/consumer和broker之間的TCP鏈接。斷開鏈接的操做只會在client端進行,Broker不會斷開鏈接,除非出現網絡故障或broker服務出現問題。
  • Channel: 若是每一次訪問RabbitMQ都創建一個Connection,在消息量大的時候創建TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部創建的邏輯鏈接,若是應用程序支持多線程,一般每一個thread建立單獨的channel進行通信,AMQP method包含了channel id幫助客戶端和message broker識別channel,因此channel之間是徹底隔離的。Channel做爲輕量級的Connection極大減小了操做系統創建TCP connection的開銷。
  • Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。經常使用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue: 消息最終被送到這裏等待consumer取走。一個message能夠被同時拷貝到多個queue中。
  • Binding: exchange和queue之間的虛擬鏈接,binding中能夠包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。

工做流程

  • 發佈者(Publisher)發佈消息(Message),交給交換機(Exchange)。
  • 交換機根據路由規則將收到的消息分發給與該交換機綁定的隊列(Queue)。
  • 最後 AMQP 代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。

交換機類型

  • 交換機能夠有兩個狀態:持久(durable)、暫存(transient)。
  • 持久化的交換機會在消息代理(broker)重啓後依舊存在,而暫存的交換機則不會(它們須要在代理再次上線後從新被聲明)。
  • 並非全部的應用場景都須要持久化的交換機。
默認交換機

默認交換機(default exchange)其實是一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。

它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。

舉個栗子:當你聲明瞭一個名爲 「search-indexing-online」 的隊列,AMQP 代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是爲 「search-indexing-online」。所以,當攜帶着名爲 「search-indexing-online」 的路由鍵的消息被髮送到默認交換機的時候,此消息會被默認交換機路由至名爲 「search-indexing-online」 的隊列中。換句話說,默認交換機看起來貌似可以直接將消息投遞給隊列,儘管技術上並無作相關的操做。

直連型交換機

當生產者(P)發送消息 Rotuing key=booking 時,這時候將消息傳送給 Exchange,Exchange 獲取到生產者發送過來消息後,會根據自身的規則進行與匹配相應的 Queue,這時發現 Queue1 和 Queue2 都符合,就會將消息傳送給這兩個隊列。

若是咱們以 Rotuing key=create 和 Rotuing key=confirm 發送消息時,這時消息只會被推送到 Queue2 隊列中,其餘 Routing Key 的消息將會被丟棄

Rotuing key爲booking時的示意圖

Rotuing key爲create時的示意圖

client代碼
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 16:54
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 設置鏈接
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 聲明隊列(隊列屬性可看下面)
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String message = "hello";

            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
            channel.close();
            connection.close();
        } catch (Exception e){
            System.out.println("連接異常、、、、");
        }
    }

}
複製代碼
server代碼
/**
 * @Author: Young
 * @Description: 模擬一個隊列同時綁定兩個binding
 * @Create: 2019-09-23 17:44
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Work {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        final Channel channel1 = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        channel1.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 會告訴RabbitMQ不要同時給一個消費者推送多於N個消息
        channel.basicQos(1);
        channel1.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x1] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x1] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 同一個會話, consumerTag 是固定的 能夠作此會話的名字, deliveryTag 每次接收消息+1,能夠作此消息處理通道的名字。
        // 所以 deliveryTag 能夠用來回傳告訴 rabbitmq 這個消息處理成功 清除此消息(basicAck方法)。
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
        channel1.basicConsume(TASK_QUEUE_NAME, false, deliverCallback1, consumerTag -> { });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

複製代碼
扇型交換機

扇型交換機(funout exchange)將消息路由給綁定到它身上的全部隊列,而不理會綁定的路由鍵。若是 N 個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這全部的 N 個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。

由於扇型交換機投遞消息的拷貝到全部綁定到它的隊列,因此他的應用案例都極其類似:

大規模多用戶在線(MMO)遊戲可使用它來處理排行榜更新等全局事件

體育新聞網站能夠用它來近乎實時地將比分更新分發給移動客戶端

分發系統使用它來廣播各類狀態和配置更新

在羣聊的時候,它被用來分發消息給參與羣聊的用戶。(AMQP 沒有內置 presence 的概念,所以 XMPP 可能會是個更好的選擇)

示意圖

client 代碼
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 18:16
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 聲明交換機及他的類型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);
            //
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

複製代碼
server 代碼
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 19:12
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs1 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        // 建立連接
        Connection connection = factory.newConnection();
        // 建立信道
        Channel channel = connection.createChannel();
        // 生命交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 將隊列與交換機綁定
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 回調函數
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        // 開始等待消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
複製代碼
主題交換機

前面提到的 direct 規則是嚴格意義上的匹配,換言之 Routing Key 必須與 Binding Key 相匹配的時候纔將消息傳送給 Queue.

而Topic 的路由規則是一種模糊匹配,能夠經過通配符知足一部分規則就能夠傳送。

它的約定是: 1)binding key 中能夠存在兩種特殊字符 「 與「#」,用於作模糊匹配,其中 「」 用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)。 2)routing key 爲一個句點號 「.」 分隔的字符串(咱們將被句點號 「. 」 分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」 binding key 與 routing key 同樣也是句點號 「.」 分隔的字符串。

當生產者發送消息 Routing Key=F.C.E 的時候,這時候只知足 Queue1,因此會被路由到 Queue1 中,若是 Routing Key=A.C.E 這時候會被同是路由到 Queue1 和 Queue2 中,若是 Routing Key=A.F.B 時,這裏只會發送一條消息到 Queue2 中。

binding key 分別爲
  • A.B.C

  • *.B.*

  • #.*.C

Rotuing key爲 A.B.C 時的示意圖

Rotuing key爲 E.B.C 時的示意圖

Rotuing key爲 B.C 時的示意圖

client代碼
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 20:26
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
//        String[] strings={"A.B.C", "ABC"};
        String[] strings={"E.B.G", "ABC"};
//        String[] strings={"A.B", "AB"};
//        String[] strings={"B", "B"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 聲明交換機及其類型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = strings[0];
            String message = strings[1];

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }
}

複製代碼
server 代碼
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 20:33
 **/

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        String[] strings = {"A.#", "*.*.C"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.3");
        factory.setPort(5672);
        factory.setUsername("young");
        factory.setPassword("young");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();


        // 同一個通道綁定多個 bindingKey
        for (String bindingKey : strings) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}
複製代碼
頭交換機

headers 類型的 Exchange 不依賴於 routing key 與 binding key 的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。

頭交換機能夠視爲直連交換機的另外一種表現形式。但直連交換機的路由鍵必須是一個字符串,而頭屬性值則沒有這個約束,它們甚至能夠是整數或者哈希值(字典)等。靈活性更強(但實際上咱們不多用到頭交換機)。工做流程:

1)、綁定一個隊列到頭交換機上時,會同時綁定多個用於匹配的頭(header)。

2)、傳來的消息會攜帶header,以及會有一個 「x-match」 參數。當 「x-match」 設置爲 「any」 時,消息頭的任意一個值被匹配就能夠知足條件,而當 「x-match」 設置爲 「all」 的時候,就須要消息頭的全部值都匹配成功。

交換機小結

Queue 隊列

AMQP 中的隊列(queue)跟其餘消息隊列或任務隊列中的隊列是很類似的:它們存儲着即將被應用消費掉的消息。

隊列屬性

隊列跟交換機共享某些屬性,可是隊列也有一些另外的屬性。

  • Name
  • Durable(消息代理重啓後,隊列依舊存在)
  • Exclusive(只被一個鏈接(connection)使用,並且當鏈接關閉後隊列即被刪除)
  • Auto-delete(當最後一個消費者退訂後即被刪除)
  • Arguments(一些消息代理用他來完成相似與 TTL 的某些額外功能)
隊列建立

隊列在聲明(declare)後才能被使用。若是一個隊列尚不存在,聲明一個隊列會建立它。若是聲明的隊列已經存在,而且屬性徹底相同,那麼這次聲明不會對原有隊列產生任何影響。若是聲明中的屬性與已存在隊列的屬性有差別,那麼一個錯誤代碼爲 406 的通道級異常就會被拋出。

隊列持久化

持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啓的時候,它依舊存在。沒有被持久化的隊列稱做暫存隊列(Transient queues)。並非全部的場景和案例都須要將隊列持久化。

持久化的隊列並不會使得路由到它的消息也具備持久性。假若消息代理掛掉了,從新啓動,那麼在重啓的過程當中持久化隊列會被從新聲明,不管怎樣,只有通過持久化的消息才能被從新恢復。

消息機制

消息確認

消費者應用(Consumer applications) - 用來接受和處理消息的應用 - 在處理消息的時候偶爾會失敗或者有時會直接崩潰掉。並且網絡緣由也有可能引發各類問題。這就給咱們出了個難題,AMQP 代理在何時刪除消息纔是正確的?AMQP 0-9-1 規範給咱們兩種建議:

  • 自動確認模式:當消息代理(broker)將消息發送給應用後當即刪除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
  • 顯式確認模式:待應用(application)發送一個確認回執(acknowledgement)後再刪除消息。(使用 AMQP 方法:basic.ack)

若是一個消費者在還沒有發送確認回執的狀況下掛掉了,那麼AMQP代理會將消息從新投遞給另外一個消費者。若是當時沒有可用的消費者了,消息代理會死等下一個註冊到此隊列的消費者,而後再次嘗試投遞。

拒絕消息

當一個消費者接收到某條消息後,處理過程有可能成功,有可能失敗。應用能夠向消息代理代表,本條消息因爲 「拒絕消息(Rejecting Messages)」 的緣由處理失敗了(或者未能在此時完成)。

當拒絕某條消息時,應用能夠告訴消息代理如何處理這條消息——銷燬它或者從新放入隊列。

當此隊列只有一個消費者時,請確認不要因爲拒絕消息而且選擇了從新放入隊列的行爲而引發消息在同一個消費者身上無限循環的狀況發生。

在 AMQP 中,basic.reject 方法用來執行拒絕消息的操做。但 basic.reject 有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的消息。可是若是你使用的是 RabbitMQ,那麼你可使用被稱做 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 擴展來解決這個問題。

預取消息

在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每一個消費者一次能夠接受多少條消息是很是有用的。這能夠在試圖批量發佈消息的時候起到簡單的負載均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生產應用每分鐘才發送一條消息,這說明處理工做尚在運行。)

注意,RabbitMQ 只支持通道級的預取計數,而不是鏈接級的或者基於大小的預取。

消息屬性

AMQP 模型中的消息(Message)對象是帶有屬性(Attributes)的。有些屬性及其常見,以致於 AMQP 0-9-1 明確的定義了它們,而且應用開發者們無需費心思思考這些屬性名字所表明的具體含義。例如:

  • Content type(內容類型)
  • Content encoding(內容編碼)
  • Routing key(路由鍵)
  • Delivery mode (persistent or not) 投遞模式(持久化 或 非持久化)
  • Message priority(消息優先權)
  • Message publishing timestamp(消息發佈的時間戳)
  • Expiration period(消息有效期)
  • Publisher application id(發佈應用的 ID)

有些屬性是被 AMQP 代理所使用的,可是大多數是開放給接收它們的應用解釋器用的。有些屬性是可選的也被稱做消息頭(headers)。他們跟 HTTP 協議的 X-Headers 很類似。消息屬性須要在消息被髮布的時候定義。

消息主體

AMQP 的消息除屬性外,也含有一個有效載荷 - Payload(消息實際攜帶的數據),它被 AMQP 代理看成不透明的字節數組來對待。

消息代理不會檢查或者修改有效載荷。消息能夠只包含屬性而不攜帶有效載荷。它一般會使用相似 JSON 這種序列化的格式數據,爲了節省,協議緩衝器和 MessagePack 將結構化數據序列化,以便以消息的有效載荷的形式發佈。AMQP 及其同行者們一般使用 「content-type」 和 「content-encoding」 這兩個字段來與消息溝通進行有效載荷的辨識工做,但這僅僅是基於約定而已。

消息持久化

消息可以以持久化的方式發佈,AMQP代理會將此消息存儲在磁盤上。若是服務器重啓,系統會確認收到的持久化消息未丟失。

簡單地將消息發送給一個持久化的交換機或者路由給一個持久化的隊列,並不會使得此消息具備持久化性質:它徹底取決與消息自己的持久模式(persistence mode)。將消息以持久化方式發佈時,會對性能形成必定的影響(就像數據庫操做同樣,健壯性的存在一定形成一些性能犧牲)。

簡單應用

RabbitMQ RPC

RPC(Remote Procedure Call Protocol,遠程過程調用協議),通常都稱爲「遠程過程調用」。關於RPC協議自己,很少介紹,這裏只介紹Openstack如何利用AMQP來實現RPC。以下圖所示。

示例圖

RPC是一種Client/Server通訊模型。圖中上半部分是RPC的一種表現形式,表面上看起來好像是Client調用了Server的一個函數(f1),實際上,Client與Server之間是有一來(request)一往(response)兩個消息(圖中的下半部分)。 在request消息中,RPC Client擔任Producer的角色,RPC Server擔任Consumer的角色。當RPC Server接到RPC Client發送過來的request消息時,它會作相應的處理,而後,發送response消息給RPC Client,這個時候,RPC Server將擔任Producer的角色,而RPC Client擔任Consumer的角色。 所以,基於AMQP實現RPC的原理,以下圖:

代碼
client代碼
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 20:45
 **/

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        // 創建connection和channel。
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
        //  求0-32的斐波那契數列之和
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    // call方法來發送RPC請求
    public String call(String message) throws IOException, InterruptedException {
        // 生成correlationId
        final String corrId = UUID.randomUUID().toString();

        // 生成默認名字的queue用於reply,並訂閱它
        String replyQueueName = channel.queueDeclare().getQueue();

        // 發送request message,設置參數replyTo和correlationId.
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        // 由於消費者發送response是在另外一個線程中,咱們須要讓main線程阻塞,在這裏咱們使用BlockingQueue
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        // 消費者進行簡單的處理,爲每個response message檢查其correlationId,若是是,則將response添加進阻塞隊列
        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });
        // 在隊列爲空時,獲取元素的線程會等待隊列變爲非空
        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

複製代碼
server代碼
/**
 * @Author: Young
 * @Description: young.thrift.test_thrift.test
 * @Create: 2019-09-23 20:46
 **/

import com.rabbitmq.client.*;

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    
    // 斐波那契函數
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 設置鏈接參數
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            // 清空隊列
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");
            
            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(delivery.getProperties().getCorrelationId())
                                .build();

                        String response = "";

                        try {
                            String message = new String(delivery.getBody(), "UTF-8");
                            int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    // 對消息進行應答
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // 喚醒正在消費的進程
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };
            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // 在收到消息前,本線程進入等待狀態
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

複製代碼

未完待續 。。。


參考:

RabbitMQ 官網

深刻理解AMQP協議

AMQP 協議詳解

AMQP的幾種通訊模式

消息隊列之 RabbitMQ

RabbitMQ與AMQP協議詳解


若有不當之處,歡迎留言(手動滑稽)。。。

相關文章
相關標籤/搜索