原文博客地址: pjmike的博客html
RabbitMQ是採用 Erlang語言實現AMQP協議的消息中間件,AMQP全稱是 Advanced Message Queue Protocolg,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開放語言等條件的限制。java
全部MQ(消息中間件)通常有兩種傳遞模式:點對點模式和發佈/訂閱模式。git
點對點模式是基於隊列的,消息生產者建立消息,而後發送消息給隊列,消費者訂閱隊列,並從隊列中獲取消息。模型以下圖所示:github
點對點模型的特色:segmentfault
發佈訂閱模式定義瞭如何向一個內容節點發送和訂閱消息,消息發送者將消息發送到某一主題(Topic)上,消息訂閱者從主題中訂閱消息。發佈/訂閱在一對多廣播時使用。模型如圖所示:緩存
發佈/訂閱模型的特色:網絡
RabbitMQ是AMQP協議的一個開源實現,其內部模型實際上也是 AMQP的內部模型,以下圖所示:多線程
AMQP模型的工做流程以下:消息(Message) 被髮布者 (publisher) 發送給交換機(exchange),交換機經常被比喻成郵局或者郵箱,而後交換機將收到的消息根據路由規則分發給綁定的隊列(queue),最後AMQP代理會將消息投遞給訂閱此隊列的消費者,或者消費者按照需求從隊列中拉取消息。app
因爲網絡的不可靠,接收消息的應用也有可能在處理消息的時候失敗,基於此緣由,AMQP模型中有一個消息確認的概念:當一個消息從隊列中投遞給消費者後,消息者會通知一下消息代理(Broker),這個能夠是自動的也能夠是手動的。當"消息確認"被啓用的時候,消息代理不會徹底將消息從隊列中刪除,直到它收到來自消費者的確認回執(ACK)。負載均衡
在AMQP中,爲何不直接將消息傳到隊列中,而是先經過 Exchange轉發呢?在網上看到一則還不錯的回答:
AMQP協議中的核心思想就是生產者和消息者隔離,生產者從不直接將消息發送給隊列。生產者一般不知道是否一個消息會被髮送到隊列中,只是將消息發送到一個交換機。先由 Exchange 來接收,而後 Exchange 按照特定的路由規則轉發到 Queue 進行存儲。
交換器,生產者將消息發送到交換器,交換器根據路由規則將消息路由一個或多個隊列中。而路由規則受 Exchange 的類型和綁定(binding) 關係的影響。AMQP 0-9-1 broker 提供了以下 四個 exchange 類型:
類型 | 默認預約義的名字 |
---|---|
Direct Exchange | 空字符串和 amq.direct |
Fanout Exchange | amq.fanout |
Topic Exchange | amq.topic |
Headers Exchange | amq.match (在 RabbitMQ 中,額外提供 amq.headers) |
每一個Exchange 都有以下幾個屬性:
默認的 exchange 是一個由 broker 預建立的 匿名的 (即名字爲空字符串) direct Exchange,對於簡單的程序來講,默認的 exchange 有一個實用的屬性: 若是沒有顯示的綁定 Exchange ,那麼 建立 的每一個 queue 都會自動綁定到這個默認的 exchange中,而且此時這個 queue的route key 就是這個 queue的名字。
下面舉個例子來講明:
發送端:
public class RabbitmqProducer0 {
private static final String QUEUE_NAME = "hello";
private static final String IP_ADDRESS = "127.0.0.1";
/** * RabbitMQ服務端默認端口號爲5672 */
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
//鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置ip
factory.setHost(IP_ADDRESS);
//設置端口
factory.setPort(PORT);
//設置帳號
factory.setUsername("root");
//設置密碼
factory.setPassword("root");
//建立鏈接
Connection connection = factory.newConnection();
//建立信道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world ";
channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("root").build(), message.getBytes()
);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//關閉資源
channel.close();
connection.close();
}
}
複製代碼
接收端:
public class RabbitmqConsumer0 {
private static final String QUEUE_NAME = "hello";
private static final String IP_ADDRESS = "39.106.63.214";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection(addresses);
//建立信道
final Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
//消費者顯示調用Basic.Ack命令
//deliveryTag能夠看作是消息的編號,它是一個位的長整型值
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//能夠指定autoAck爲false,RabbitMQ會等待消費者顯式地回覆確認信號後才從內存中移去消息
channel.basicConsume(QUEUE_NAME, consumer);
TimeUnit.SECONDS.sleep(5);
}
}
複製代碼
在上面的例子中,咱們沒有定義Exchange,也沒有顯示地將 queue 綁定到 exchange 中,所以 名爲 "hello" 的queue 會自動綁定到默認的 exchange,即名字爲空字符串的 exchange中。而且在這個默認的 exchange中,其 route key 和 queue名字一致。簡言之,消息就以 route key爲 "hello" 投遞到默認的 exchange中,並被路由到 "hello" 這個queue中。
direct exchange 根據消息攜帶的 routing key 將消息投遞到不一樣的 queue中,direct exchange 適用於消息的單播發送。工做流程以下:
direct exchange 常常用於在 多個 worker 中分配任務,當這樣作時,需注意,在AMQP 0-9-1中,消息的負載均衡發生在 consumer之間,而不是在 queue之間。
direct exchange 圖示:
一個 fanout exchange 會將消息分發給全部綁定到此 exchange 的queue中,無論 queue中的 route key。若是有 N 個 Queue 綁定到 一個 fanout exchange 時,那麼此時 exchange 收到消息時,會將此消息分發到 這 N 個 queue中,因爲此性質, fanout exchange 也經常使用消息的廣播。
fanout exchange圖示:
topic exchange 會根據 route key 將消息分發到與此消息的 route key 相匹配的而且綁定此exchange的一個或多個 queue。這裏的**"相匹配"**與 direct exchange的徹底匹配的路由規則不同,topic exchange 在匹配規則上進行了擴展,規則以下:
topic exchange 常常用於實現 publish/subscribe模型,即消息的多播模型。前面介紹消息模型也曾提到過消息中間件通常有兩種模式:點對點模式和發佈/訂閱模式。這裏的Topic Exchange就適用於發佈/訂閱模型。RabbitMQ的一個原則就是,消息不能直接投遞到 Queue中,必須先將消息投遞到 Exchange中,而後由Exchange 按照路由規則將消息投遞到對應的 Queue中。至於點對點模型就能夠用 Direct Exchange來實現,利用徹底匹配的路由規則。
topic exchange 圖示:
header exchange 不依賴於路由器的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。
Queue: 隊列,是RabbitMQ的內部對象,用於存儲消息,RabbitMQ中的消息只能存儲在隊列中。它有幾個重要的屬性:
Name
: 名字Durable
: 是不是持久的,當爲真時,即便消息代理 重啓時,此 queue 也不會被刪除Exclusive
: 是不是獨佔的,當爲真時,表示此 queue只能有一個消費者,而且當此消費者的鏈接斷開時,此 queue 會被刪除Auto-delete
: 當爲真時,此隊列會在最後一個消費者取消訂閱時被刪除Arguments
: 可選屬性,由插件和消息代理的特定功能使用,例如消息TTL、隊列長度限制等在使用一個隊列時,須要先進行聲明,若是咱們聲明的隊列不存在,那麼 broker 會自動建立它,可是若是隊列已經存在,咱們須要注意的是咱們聲明的隊列的屬性和已存在的的隊列的屬性是否一致,若是一致,則不會有任何問題,若是先後不一致,那就會 PRECONDITION_FAILED
錯誤(錯誤碼 406)
AMQP的隊列名 不能爲 "amq." 開頭,由於這樣的隊列名是 AMQP broker 內部所使用的,當咱們使用了這樣的隊列名時,那麼會有一個 ACCESS_REFUSED
錯誤 (錯誤碼爲 403)
持久隊列會被持久化到磁盤中去,所以即便 broker 重啓了,持久隊列依然存在。持久隊列和消息的持久化不一樣,當broker 重啓時,持久隊列會自動從新聲明,而只有隊列中的持久化消息(persistent message) 纔會被恢復
隊列的綁定關係是 exchange 用於消息路由的規則,即一個 exchange 可以將消息路由到某個隊列的前提是隊列已經綁定到這個 exchange中了,當隊列綁定到一個 exchange中時,咱們設置了一個 route key,或者叫作綁定鍵,這個key 會被 direct exchange 和 topic exchange 做爲額外的路由信息使用。
當exchange 沒有任何的 queue 綁定時,那麼此時會根據消息的屬性來決定 是將此消息丟棄仍是返回給生產者。
AMQP 0-9-1 支持兩種消息分發模式:
在push模式中,消費者訂閱一個消息主題,當有消息傳遞到消息主題時,broker主動將消息推送給訂閱該主題的全部消費者。每一個消費者都有一個惟一的標識符,即 consumer tag。咱們也能夠用這個 tag來取消一個消費者對某個主題的訂閱。push模式下通常使用 Channel類的 basicConsume
方法:
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
...
複製代碼
該方法有幾種重載形式,更多的信息能夠參閱 API文檔。
而pull(拉)模式,消費者主動從 broker 中拉取消息,經過 channel.basicGet
方法能夠單條地獲取消息,其返回值是 GetResponse
:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
複製代碼
其中 queue 表明隊列的名稱,若是設置 autoAck 爲 true,即消息自動ACK模式,爲false,則爲消息手動確認模式,一樣須要調用 channel.basicAck
來確認消息已被成功接收。下面將仔細闡述消息的ACK.
AMQP 0-9-1 有兩種消息 ACK 模式:
在自動 ACK 模式下,當 broker 發送消息成功後,會當即將此消息 從消息隊列中 刪除,而不會消費者的 ACK回覆。示例程序以下:
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection(addresses);
//建立信道
final Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
複製代碼
在 channel.basicConsume(String queue, boolean autoAck, Consumer callback)
設置 autoAck參數。
而在手動的 ACK模式下,當 broker 發送消息給消費者時,不會當即將此消息刪除,而是須要等待消息的消費者的ACK回覆後纔會刪除消息,所以在手動 ACK模式下,當消費者收到消息並處理完成後,須要向 broker 顯示地發送 ACK指令。示例程序以下:
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection(addresses);
//建立信道
final Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
//消費者顯示調用Basic.Ack命令
//deliveryTag能夠看作是消息的編號,它是一個位的長整型值
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//這裏還能夠指定autoAck爲false,RabbitMQ會等待消費者顯式地回覆確認信號後才從內存中移去消息
channel.basicConsume(QUEUE_NAME,consumer);
複製代碼
在手動 ACK模式下,若是消費者 由於意外的 crash 而沒有發送 ACK 給 broker,那麼此時 broker會將消息轉發給其餘的消費者 (若是此時沒有消費者了,那麼 broker 會緩存 此消息,直到有新的消費者註冊)
當一個 消費者處理消息失敗或者此時不能處理消息時,那麼能夠給 broker 發送一個拒接消息的指令,而且能夠要求 broker 丟棄或者從新分發此消息。不過須要的注意的是,若是此時只有一個消費者,那麼此時消費者拒收消息並要求 broker 從新分發此消息時,那麼就會形成此消息不斷的分發和拒收,造成了死循環。拒收的方法以下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
複製代碼
經過調用 channel.basicReject
方法來告訴 RabbitMQ拒絕某個消息。其中 deliveryTag
能夠看作是消息的編號,它是一個 64位的長整型值。若是 requeue
爲 true, broker會從新將這條消息存入 隊列,以便發送給下一個訂閱的消費者。若是爲false,則 broker會當即把消息從隊列中移除,而不會把它發送給新的消費者。
經過 預讀消息機制,消費者能夠一次性批量取出消息,而後在處理後對這些消息進行統一的 ACK,這樣能夠提升消息的吞吐量。不過須要注意的是,RabbitMQ 僅支持 channel級別的預讀消息的數量配置,不支持基於鏈接的預讀消息數量配置。
AMQP的鏈接是長鏈接,它是一個使用 TCP做爲可靠傳輸的應用層協議。AMQP使用認證機制而且體用 TLS(SSL) 保護。當一個應用再也不須要鏈接到 AMQP代理的時候,須要優雅的釋放掉AMQP鏈接,而不是直接將TCP鏈接關閉。
有些應用須要與 AMQP代理(Broker)創建多個鏈接,不管怎樣,同時開啓多個 TCP鏈接都是不合適的,由於這樣作會消耗掉過多的系統資源而且使得防火牆的配置更加困難。AMQP 0-9-1提供了 Channel 來處理多個鏈接,能夠把Channel理解爲 共享 一個TCP鏈接的多個輕量化鏈接。(PS:這裏讓我想到了多路複用模型,原理類似)
在涉及多線程/進程的應用中,爲每一個線程/進程開啓一個通道 (channel) 是很常見的,而且這些通道不能被線程/進程共享。
爲了在一個單獨的代理上實現多個隔離的環境(用戶、用戶組、交換機、隊列等),AMQP提供了一個 虛擬主機 (virtual hosts -vhosts)的概念。這與 虛擬機的概念類似,這爲AMQP提供了徹底隔離的環境。當鏈接被創建時,AMQP客戶端指定使用哪一個虛擬主機。
AMQP模型中的消息 (Message)對象是帶有 屬性(Attributes)的。有些屬性很是常見,例如:
Content type
: 內容類型Content encoding
: 內容編碼Routing Key
: 路由鍵Delivery mode
: 投遞方式(持久化 or 非持久化)Message priority
: 消息優先權Message publishing timestamp
: 消息發佈的時間戳Expiration period
: 消息的有效期Publisher application id
: 發佈應用的id有些屬性是被 AMQP代理所使用的,好比 Routing Key
,可是大多數是對給接收消息的消費者使用的,有些屬性是可選爲作消息頭的。它們與HTTP協議的 X-headers
很類似,好比 Content type
、Content encoding
。
AMQP的消息除屬性外,還含有一個消息體,即消息實際攜帶的數據,它對AMQP代理不透明。broker 不會檢查或修改消息體,可是消息能夠只包含屬性而不攜帶消息體。
本文參考了 AMQP 0-9-1 Model Explained官方文檔及其相關譯本,果真,官方文檔是最權威的,也解決了我以前對於RabbitMQ的不少疑惑。這裏參照網上資料整理出來一篇文章,算是對 RabbiMQ及AMQP模型有一個大體的認識。光看文檔還不夠,還需多去實戰才能加深對RabbitMQ的理解和認識。