RabbitMQ 總結

Connection & Channel

Connection 表明一個 TCP 鏈接,Channel 是創建在 Connection 上的虛擬鏈接。RabbitMQ 每條指令都是經過 Channel 完成的。java

對於 OS 而言,建立和銷燬 TCP 鏈接的代價很是高,在高峯期很容易遇到瓶頸。程序中通常會有多個線程須要與 RabbitMQ創建通訊,消費或生產消息,經過 TCP 鏈接複用來減小性能開銷。node

Connection 能夠建立多個 Channel ,可是 Channel 不是線程安全的因此不能在線程間共享。web

Connection 在建立時能夠傳入一個 ExecutorService ,這個線程池時給該 Connection 上的 Consumer 用的。 正則表達式

Channel.isOpen 以及 Connection.isOpen 方法是同步的,所以若是在發送消息時頻繁調用會產生競爭。咱們能夠認爲在 createChannel 方法後 Channel 以及處於開啓狀態。若在使用過程當中 Channel 關閉了,那麼只要捕獲拋出的 ShutDownSignalException 就能夠了,同時建議捕獲 IOException 以及 SocketException 防止鏈接意外關閉。docker

Exchange & Queue

消費者和生產者均可以聲明一個已經存在的 Exchange 或者 Queue ,前提是參數徹底匹配現有的 Exchange 或者 Queue,不然會拋出異常。安全

QueueDeclare 參數:
exclusive: 排他隊列,只有同一個 ConnectionChannel 能夠訪問,且在 Connection 關閉或者客戶端退出後自動刪除,即便 durabletruebash

queuePurge(String queue):清空隊列服務器

Exchange 能夠綁定另外一個 ExchangeexchangeBind(String destination, String source, String routeKey), 從 sourcedestinationcookie

若業務容許,則最好預先建立好 Exchange 以及 Queue 並進行綁定(rabbitmqadmin),防止 Exchange 沒有綁定 Queue 或 綁定錯誤的 Queue 而致使消息丟失(關鍵信息應當使用 mandatory 參數)。

Alternate Exchange: 在 Channel.exchangeDeclare 時添加 alternate-exchange 參數或在 Policy 中聲明。mandatorytrue 時,未被路由的消息會被髮送到 Alternate Exchange 。建議 Exchange Type 設置爲 fanout ,不然當 RoutingKey 依然不匹配就會被返回 Producerapp

P.S. 有些書上講備份交換器和 mandatory 參數一塊兒使用 mandatory 參數失效是錯的,當 RoutingKey 不匹配 Alternate Exchange 依然會被返回 Producer
(rabbitmq v3.7 測試)
Map<String, Object> arg = new HashMap<String, Object>() {{
    put("alternate-exchange", "alt");
}};
channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
channel.exchangeDeclare("alt", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueDeclare("notSend", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "key");
channel.queueBind("notSend", "alt", "");

Publish & Consume

Publish Confirm

消息發送到服務器後可能還沒來的及刷到磁盤中,服務器就掛掉,從而形成消息丟失。 Publish Confirm 可以在消息確實到達服務器(開啓持久化的消息會在刷入磁盤以後)以後返回一個確認給 Publisher。

經過 channel.confirmSelectedChannel 設置爲 Confirm 模式,併爲 Channel 添加一個 ConfirmLister 來監聽返回的確認。

SortedSet<Long> unconfirmedSet = new TreeSet<>();
channel.confirmSelect();
channel.addConfirmListener((deliveryTag, multiple) -> {

    System.out.println("handleAck: " + deliveryTag + " " + multiple);
    if (multiple) {
        unconfirmedSet.headSet(deliveryTag - 1).clear();
    } else {
        unconfirmedSet.remove(deliveryTag);
    }
}, (deliveryTag, multiple) -> {

    System.out.println("handleNack: " + deliveryTag + " " + multiple);
    if (multiple) {
        unconfirmedSet.headSet(deliveryTag - 1).clear();
    } else {
        unconfirmedSet.remove(deliveryTag);
    }
});
while (true) {
    long seq = channel.getNextPublishSeqNo();
    channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8));
    unconfirmedSet.add(seq);
    Thread.sleep(1000);
}

除了異步處理的方式以外還有批量確認以及事務的方法。批量確認的速度在大量連續發送的狀況下和異步的方法差很少。無論怎樣這兩種消息確認的方法都要比事務的方式快7倍左右。

Consumer

通常應當實現 Consumer 接口或者繼承 DefaultConsumer ,Consumer 經過 consumerTag 來進行區分。

消費消息有兩種方式,一種是 Push ,一種是 Get。

Push 是由 RabbitMQ 以輪詢的方式將消息推送到 Consumer ,方法爲 basicConsume 。通常一個 Channel 對應一個 Consumer

Get 由客戶端主動從 RabbitMQ 拉取一條消息,方法爲 basicGet 。__不能循環執行 basicGet 來代替 basicConsume ,否則會嚴重影響性能。__

消息確認:autoAckfalseRabbitMQ 會等待 basicAck 的顯式確認。除非 Consumer 鏈接斷開不然一直等待確認。當 Consumer 顯式調用 basicReject 或者 basicNack 並將 requeue 設爲 true 後會將消息從新入隊投遞。通常咱們在業務處理完以後再 ack .
mandatory : 當 Exchange 沒法匹配 QueueExchange 時,mandatorytrue 的消息會被返回給 Producer,不然會被丟棄。 經過 Channel.addReturnListener 來添加 ReturnListener 監視器。

TTL

  1. queueDeclare 時添加 x-message-ttl 參數,單位毫秒。

    Map<String, Object> arg = new HashMap<String, Object>() {{
        put("x-message-ttl", "1000000");
    }};
    channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
  2. 使用 AMQP.BasicProperties.Builder 建立 AMQP.BasicProperties 並設置 expiration 參數。

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    builder.expiration("100000");
    channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));

Dead Letter Exchange (DLX)

Dead Letter(死信):

  • Basic.Reject / Basic.Nack 而且 requeuetrue
  • 消息 TTL 過時
  • 隊列達到最大長度

當消息成爲 Dead Letter 以後, RabbitMQ 會自動把這個消息發到 DLX 上。

// 當發送到 normalQueue 中的消息成爲 Dead Letter 以後會自動以
// dead-letter 爲 routingKey 發送到 dlxQueue Exchange
Map<String, Object> arg = new HashMap<String, Object>() {{
    put("x-dead-letter-exchange", "dlx");
    put("x-dead-letter-routing-key", "dead-letter");
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("dlx", "direct", true, false, false, null);
channel.queueDeclare("dlxQueue", true, false, false, null);

DLX 其餘用法:延遲隊列,消息 發送到一個暫存的、沒有 ConsumerQueue 並設置 TTL,Consumer 消費 DLX 綁定的 Queue 的消息,建議給暫存的 Queue 設置一個最大的 TTL,防止消息沒有設置 TTL 而一直堆積在 Queue 中。

Priority

消息的消費能夠有優先級,Queue 的最大優先級能夠經過 x-max-priority 進行設置。

Map<String, Object> arg = new HashMap<String, Object>() {{
    put("x-max-priority", 5);
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("normalExchange", "direct", true, false, null);

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(2);
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));

Durability

Exchange , Queue , 消息均可以進行持久化。在消息發送到 Exchange 以後會馬上路由到 Queue 中,所以未持久化的 Exchange 在重啓後會丟失 Exchange 元數據以及綁定,對 Queue 和消息的持久化無影響。

未持久化的 Queue 在重啓後會丟失,包括 Queue 中的消息,無論消息是否設置了持久化。

未持久化的消息在重啓後會丟失,即便所在的 Queue 已持久化。

channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化
channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化
channel.queueBind("normalQueue", "normalExchange", "key");
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);    // 消息持久化
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));

Qos

Qos 的做用時負載均衡。當一個隊列有兩個 Consumer ,一個性能很好 A,另外一個不那麼好 B,RabbitMQ 會輪詢,將消息平均地分給這兩個 Consumer。可見 B 上的堆積的消息會愈來愈多,而 A 上的線程可能會空閒。 Qos 的做用就是防止一個 Consumer 堆積了過多的消息,把這些消息分給其餘 Consumer。

global 參數:

channel.basicQos(3, false);  // each Consumer limit 3
channel.basicQos(5, true);   // this channel limit to 5

global 參數會讓 RabbitMQ 調用更多資源,儘可能不要設置(默認值爲 false)。

Relibility

RabbitMQ 支持最少一次和最多一次。
最少一次:

- 啓用 Publisher Confirm 或者 事務保證消息可以到達服務器。
- 啓用 mandatory 參數保證消息不回被 Exchange 丟掉。
- 消息和 Queue 開啓持久化。
- Consumer autoAck off, 並確保消息在處理完以後再 ack

Policy

Policy 能夠很方便的批量設置 Exchange 以及 Queue 的屬性,可是 Policy 的優先級較低,請注意。

Policy 能夠經過 HTTP API, web console,以及 cli 的方式。

rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
  • vhost : 指定 vhost
  • proiority : 若是一個 Queue 或者 Exchange 有多個 Policy 的狀況下,只有 priority 最大的那個 Policy 纔會生效。
  • apply-to : 應用到

    • Exchange and Queue
    • Exchange
    • Queue
  • name : Policy 的名字
  • pattern : Exchange 或者 Queue 名字的正則表達式
  • defination : 屬性值,能夠經過 management > Admin > Policies 的查看。

Cluster

RabbitMQ 會把全部的元數據存儲到全部的節點上,可是隊列是分散在集羣中全部的節點上的。

Build A Cluster with docker

咱們嘗試使用 Docker Compose 建立一個由 3 個服務組成的集羣

version: "3"
services:
  node1:
    image: rabbitmq:3.7-management-alpine
    container_name: node1
    hostname: node1
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5673:5672"
      - "15673:15672"
  node2:
    image: rabbitmq:3.7-management-alpine
    container_name: node2
    hostname: node2
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5674:5672"
      - "15674:15672"
  node3:
    image: rabbitmq:3.7-management-alpine
    container_name: node3
    hostname: node3
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5675:5672"
      - "15675:15672"

經過設置 hostname ,容器內部的 rabbitmq 的 nodename 就變成相似 rabbitmq@node1。同時集羣中的 RabbitMQ 須要相同的 RABBITMQ_ERLANG_COOKIE 來進行互相認證。

啓動服務:

docker-compose up -d

而後將 node2 , node3 加入 node1 ,注意,加入集羣以前 RabbitMQ 必須中止:

# 中止 rabbitmq
docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node3 rabbitmqctl stop_app
# 加入 node1
docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1
docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1
# 從新啓動 
docker-compose exec node2 rabbitmqctl start_app
docker-compose exec node3 rabbitmqctl start_app

在任意一個節點上查詢集羣狀態:

docker-compose exec node2 rabbitmqctl cluster_status

能夠看到以下狀態:

Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]

手動下線節點

將節點從在線狀態下線, 首先中止節點,而後重置節點。

docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node2 rabbitmqctl reset
docker-compose exec node2 rabbitmqctl stop_app

在從新啓動服務器以後能夠發現該節點已經脫離了集羣。

Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node2]}]},
 {running_nodes,[rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]}]}]

節點類型

RabbitMQ 的節點類型有兩種,一種是 disc , 第二種是 ram。 RabbitMQ 要求集羣中至少要有一個磁盤節點,儲存了全部的元數據。當集羣中的惟一一個磁盤節點崩潰後,集羣能夠繼續收發消息,可是不能建立隊列等操做。

RabbitMQ 在加入集羣時默認爲磁盤模式,若是要之內存模式加入:

docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram

更改節點類型:

docker-compose exec node 2 rabbitmqctl change cluster_node_type desc

Mirror Queue

RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 機制。請注意,開啓 Publisher Confirmed 或者事務的狀況下,只有全部的 Slave 都 ACK 以後纔會返回 ACK 給客戶端。

開啓 Mirror Queue 主要經過設置 Policy 其中最主要的是 defination

  • ha-mode: Mirror Queue 的模式

    • all : 默認的模式,表示在集羣中的全部節點上進行鏡像
    • exactly : 在指定數量的節點上進行鏡像,數量由 ha-params 指定。
    • nodes : 在指定的節點上進行鏡像,節點名稱由 ha-params 指定。
  • ha-params : 如上所述
  • ha-sync-mode : 消息的同步模式

    • automatic : 當新的 Slave 加入集羣以後會自動同步消息。
    • manual: 默認,當加入新的 Slave 以後不會自動把消息同步到新的 Slave 上。指導調用命令顯式同步。
  • ha-promote-on-shutdown:

    • when-synced: 默認,若是主動中止 master ,那麼 slave 不會自動接管。也就是說會指望 master 會重啓啓動,這能夠保證消息不會丟失。
    • always: 無論 master 是由於什麼緣由中止的,slave 會馬上接管,有可能有一部分數據沒有從 master 同步到 slave.
  • ha-promote-on-failure: 默認 always ,不推薦設置爲 when-synced
相關文章
相關標籤/搜索