Connection
表明一個 TCP 鏈接,Channel
是創建在 Connection
上的虛擬鏈接。RabbitMQ 每條指令都是經過 Channel
完成的。java
對於 OS 而言,建立和銷燬 TCP 鏈接的代價很是高,在高峯期很容易遇到瓶頸。程序中通常會有多個線程須要與 RabbitMQ創建通訊,消費或生產消息,經過 TCP 鏈接複用來減小性能開銷。node
Connection
能夠建立多個 Channel
,可是 Channel
不是線程安全的因此不能在線程間共享。web
Connection
在建立時能夠傳入一個 ExecutorService
,這個線程池時給該 Connection
上的 Consumer
用的。 正則表達式
Channel.isOpen
以及 Connection.isOpen
方法是同步的,所以若是在發送消息時頻繁調用會產生競爭。咱們能夠認爲在 createChannel
方法後 Channel
以及處於開啓狀態。若在使用過程當中 Channel
關閉了,那麼只要捕獲拋出的 ShutDownSignalException
就能夠了,同時建議捕獲 IOException
以及 SocketException
防止鏈接意外關閉。docker
消費者和生產者均可以聲明一個已經存在的 Exchange
或者 Queue
,前提是參數徹底匹配現有的 Exchange
或者 Queue,不然會拋出異常。
安全
QueueDeclare
參數:exclusive
: 排他隊列,只有同一個 Connection
的 Channel
能夠訪問,且在 Connection
關閉或者客戶端退出後自動刪除,即便 durable
爲 true
。bash
queuePurge(String queue)
:清空隊列服務器
Exchange
能夠綁定另外一個 Exchange
:exchangeBind(String destination, String source, String routeKey)
, 從 source
到 destination
cookie
若業務容許,則最好預先建立好Exchange
以及Queue
並進行綁定(rabbitmqadmin),防止 Exchange 沒有綁定Queue
或 綁定錯誤的Queue
而致使消息丟失(關鍵信息應當使用mandatory
參數)。
Alternate Exchange
: 在 Channel.exchangeDeclare
時添加 alternate-exchange
參數或在 Policy
中聲明。mandatory
爲 true
時,未被路由的消息會被髮送到 Alternate Exchange
。建議 Exchange Type
設置爲 fanout
,不然當 RoutingKey
依然不匹配就會被返回 Producer
。app
P.S. 有些書上講備份交換器和mandatory
參數一塊兒使用mandatory
參數失效是錯的,當RoutingKey
不匹配Alternate Exchange
依然會被返回Producer
。
(rabbitmq v3.7 測試)
Map<String, Object> arg = new HashMap<String, Object>() {{ put("alternate-exchange", "alt"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg); channel.exchangeDeclare("alt", "fanout", true, false, null); channel.queueDeclare("normalQueue", true, false, false, null); channel.queueDeclare("notSend", true, false, false, null); channel.queueBind("normalQueue", "normalExchange", "key"); channel.queueBind("notSend", "alt", "");
消息發送到服務器後可能還沒來的及刷到磁盤中,服務器就掛掉,從而形成消息丟失。 Publish Confirm 可以在消息確實到達服務器(開啓持久化的消息會在刷入磁盤以後)以後返回一個確認給 Publisher。
經過 channel.confirmSelected
把 Channel
設置爲 Confirm
模式,併爲 Channel
添加一個 ConfirmLister
來監聽返回的確認。
SortedSet<Long> unconfirmedSet = new TreeSet<>(); channel.confirmSelect(); channel.addConfirmListener((deliveryTag, multiple) -> { System.out.println("handleAck: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }, (deliveryTag, multiple) -> { System.out.println("handleNack: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }); while (true) { long seq = channel.getNextPublishSeqNo(); channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8)); unconfirmedSet.add(seq); Thread.sleep(1000); }
除了異步處理的方式以外還有批量確認以及事務的方法。批量確認的速度在大量連續發送的狀況下和異步的方法差很少。無論怎樣這兩種消息確認的方法都要比事務的方式快7倍左右。
通常應當實現 Consumer
接口或者繼承 DefaultConsumer
,Consumer
經過 consumerTag
來進行區分。
消費消息有兩種方式,一種是 Push
,一種是 Get。
Push
是由 RabbitMQ
以輪詢的方式將消息推送到 Consumer
,方法爲 basicConsume
。通常一個 Channel
對應一個 Consumer
。
Get
由客戶端主動從 RabbitMQ
拉取一條消息,方法爲 basicGet
。__不能循環執行 basicGet
來代替 basicConsume
,否則會嚴重影響性能。__
消息確認:autoAck
爲 false
,RabbitMQ
會等待 basicAck
的顯式確認。除非 Consumer
鏈接斷開不然一直等待確認。當 Consumer
顯式調用 basicReject
或者 basicNack
並將 requeue
設爲 true
後會將消息從新入隊投遞。通常咱們在業務處理完以後再 ack
.mandatory
: 當 Exchange
沒法匹配 Queue
或 Exchange
時,mandatory
爲 true
的消息會被返回給 Producer
,不然會被丟棄。 經過 Channel.addReturnListener
來添加 ReturnListener
監視器。
queueDeclare
時添加 x-message-ttl
參數,單位毫秒。
Map<String, Object> arg = new HashMap<String, Object>() {{ put("x-message-ttl", "1000000"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
使用 AMQP.BasicProperties.Builder
建立 AMQP.BasicProperties
並設置 expiration
參數。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("100000"); channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));
Dead Letter(死信):
Basic.Reject
/ Basic.Nack
而且 requeue
爲 true
當消息成爲 Dead Letter 以後, RabbitMQ 會自動把這個消息發到 DLX 上。
// 當發送到 normalQueue 中的消息成爲 Dead Letter 以後會自動以 // dead-letter 爲 routingKey 發送到 dlxQueue Exchange Map<String, Object> arg = new HashMap<String, Object>() {{ put("x-dead-letter-exchange", "dlx"); put("x-dead-letter-routing-key", "dead-letter"); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("dlx", "direct", true, false, false, null); channel.queueDeclare("dlxQueue", true, false, false, null);
DLX 其餘用法:延遲隊列,消息 發送到一個暫存的、沒有 Consumer
的 Queue
並設置 TTL,Consumer
消費 DLX 綁定的 Queue
的消息,建議給暫存的 Queue
設置一個最大的 TTL,防止消息沒有設置 TTL 而一直堆積在 Queue
中。
消息的消費能夠有優先級,Queue
的最大優先級能夠經過 x-max-priority
進行設置。
Map<String, Object> arg = new HashMap<String, Object>() {{ put("x-max-priority", 5); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("normalExchange", "direct", true, false, null); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(2); channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Exchange
, Queue
, 消息均可以進行持久化。在消息發送到 Exchange
以後會馬上路由到 Queue
中,所以未持久化的 Exchange
在重啓後會丟失 Exchange
元數據以及綁定,對 Queue
和消息的持久化無影響。
未持久化的 Queue
在重啓後會丟失,包括 Queue
中的消息,無論消息是否設置了持久化。
未持久化的消息在重啓後會丟失,即便所在的 Queue
已持久化。
channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化 channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化 channel.queueBind("normalQueue", "normalExchange", "key"); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); // 消息持久化 channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Qos 的做用時負載均衡。當一個隊列有兩個 Consumer ,一個性能很好 A,另外一個不那麼好 B,RabbitMQ 會輪詢,將消息平均地分給這兩個 Consumer。可見 B 上的堆積的消息會愈來愈多,而 A 上的線程可能會空閒。 Qos 的做用就是防止一個 Consumer 堆積了過多的消息,把這些消息分給其餘 Consumer。
global 參數:
channel.basicQos(3, false); // each Consumer limit 3 channel.basicQos(5, true); // this channel limit to 5
global 參數會讓 RabbitMQ 調用更多資源,儘可能不要設置(默認值爲 false)。
RabbitMQ 支持最少一次和最多一次。
最少一次:
- 啓用 Publisher Confirm 或者 事務保證消息可以到達服務器。 - 啓用 mandatory 參數保證消息不回被 Exchange 丟掉。 - 消息和 Queue 開啓持久化。 - Consumer autoAck off, 並確保消息在處理完以後再 ack
Policy 能夠很方便的批量設置 Exchange 以及 Queue 的屬性,可是 Policy 的優先級較低,請注意。
Policy 能夠經過 HTTP API, web console,以及 cli 的方式。
rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
vhost
: 指定 vhostproiority
: 若是一個 Queue 或者 Exchange 有多個 Policy 的狀況下,只有 priority 最大的那個 Policy 纔會生效。apply-to
: 應用到
name
: Policy 的名字pattern
: Exchange 或者 Queue 名字的正則表達式defination
: 屬性值,能夠經過 management > Admin > Policies 的查看。RabbitMQ 會把全部的元數據存儲到全部的節點上,可是隊列是分散在集羣中全部的節點上的。
咱們嘗試使用 Docker Compose 建立一個由 3 個服務組成的集羣
version: "3" services: node1: image: rabbitmq:3.7-management-alpine container_name: node1 hostname: node1 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5673:5672" - "15673:15672" node2: image: rabbitmq:3.7-management-alpine container_name: node2 hostname: node2 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5674:5672" - "15674:15672" node3: image: rabbitmq:3.7-management-alpine container_name: node3 hostname: node3 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5675:5672" - "15675:15672"
經過設置 hostname ,容器內部的 rabbitmq 的 nodename 就變成相似 rabbitmq@node1。同時集羣中的 RabbitMQ 須要相同的 RABBITMQ_ERLANG_COOKIE 來進行互相認證。
啓動服務:
docker-compose up -d
而後將 node2 , node3 加入 node1 ,注意,加入集羣以前 RabbitMQ 必須中止:
# 中止 rabbitmq docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node3 rabbitmqctl stop_app # 加入 node1 docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1 docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1 # 從新啓動 docker-compose exec node2 rabbitmqctl start_app docker-compose exec node3 rabbitmqctl start_app
在任意一個節點上查詢集羣狀態:
docker-compose exec node2 rabbitmqctl cluster_status
能夠看到以下狀態:
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]}, {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]
將節點從在線狀態下線, 首先中止節點,而後重置節點。
docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node2 rabbitmqctl reset docker-compose exec node2 rabbitmqctl stop_app
在從新啓動服務器以後能夠發現該節點已經脫離了集羣。
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node2]}]}, {running_nodes,[rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node2,[]}]}]
RabbitMQ 的節點類型有兩種,一種是 disc , 第二種是 ram。 RabbitMQ 要求集羣中至少要有一個磁盤節點,儲存了全部的元數據。當集羣中的惟一一個磁盤節點崩潰後,集羣能夠繼續收發消息,可是不能建立隊列等操做。
RabbitMQ 在加入集羣時默認爲磁盤模式,若是要之內存模式加入:
docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram
更改節點類型:
docker-compose exec node 2 rabbitmqctl change cluster_node_type desc
RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 機制。請注意,開啓 Publisher Confirmed 或者事務的狀況下,只有全部的 Slave 都 ACK 以後纔會返回 ACK 給客戶端。
開啓 Mirror Queue 主要經過設置 Policy 其中最主要的是 defination
:
ha-mode
: Mirror Queue 的模式
all
: 默認的模式,表示在集羣中的全部節點上進行鏡像exactly
: 在指定數量的節點上進行鏡像,數量由 ha-params
指定。nodes
: 在指定的節點上進行鏡像,節點名稱由 ha-params
指定。ha-params
: 如上所述ha-sync-mode
: 消息的同步模式
automatic
: 當新的 Slave 加入集羣以後會自動同步消息。manual
: 默認,當加入新的 Slave 以後不會自動把消息同步到新的 Slave 上。指導調用命令顯式同步。ha-promote-on-shutdown
:
when-synced
: 默認,若是主動中止 master ,那麼 slave 不會自動接管。也就是說會指望 master 會重啓啓動,這能夠保證消息不會丟失。always
: 無論 master 是由於什麼緣由中止的,slave 會馬上接管,有可能有一部分數據沒有從 master 同步到 slave.ha-promote-on-failure
: 默認 always
,不推薦設置爲 when-synced