可靠性(Reliability):RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。html
靈活的路由(Flexible Routing):在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。java
消息集羣(Clustering):多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。git
高可用(Highly Available Queues):隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。github
多種協議(Multi-protocol):RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。算法
多語言客戶端(Many Clients):RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。數據庫
管理界面(Management UI):RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。編程
跟蹤機制(Tracing):若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。數組
插件機制(Plugin System): RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。瀏覽器
AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件同產品,不一樣的開發語言等條件的限制。AMQP協議這種下降耦合的機制是基於與上層產品,語言無關的協議。是一種二進制協議,提供客戶端應用與消息中間件之間多通道、協商、異步、安全、中立和高效地交互。從總體來看,AMQP協議可劃分爲兩層:安全
Functional Layer(功能層)
功能層,位於協議上層主要定義了一組命令(基於功能的邏輯分類),用於應用程序調用實現自身所需的業務邏輯。例如:應用程序能夠經過功能層定義隊列名稱,生產消息到指定隊列,消費指定隊列消息等基於(Message queues 模型)
Transport Layer(傳輸層)
傳輸層,基於二進制數據流傳輸,用於將應用程序調用的指令傳回服務器,並返回結果,同時能夠處理信道複用,幀處理,內容編碼,心跳傳輸,數據傳輸和異常處理。傳輸層能夠被任意傳輸替換,只要不改變應用可見的功能層相關協議,也可使用相同的傳輸層,同時使用不一樣的高級協議
默認交換機(default exchange)其實是一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。
它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
舉個栗子:當你聲明瞭一個名爲 「search-indexing-online」 的隊列,AMQP 代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是爲 「search-indexing-online」。所以,當攜帶着名爲 「search-indexing-online」 的路由鍵的消息被髮送到默認交換機的時候,此消息會被默認交換機路由至名爲 「search-indexing-online」 的隊列中。換句話說,默認交換機看起來貌似可以直接將消息投遞給隊列,儘管技術上並無作相關的操做。
若是咱們以 Rotuing key=create 和 Rotuing key=confirm 發送消息時,這時消息只會被推送到 Queue2 隊列中,其餘 Routing Key 的消息將會被丟棄
/**
* @Author: Young
* @Description: young.thrift.test_thrift.test
* @Create: 2019-09-23 16:54
**/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 設置鏈接
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("***");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明隊列(隊列屬性可看下面)
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "hello";
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
} catch (Exception e){
System.out.println("連接異常、、、、");
}
}
}
複製代碼
/**
* @Author: Young
* @Description: 模擬一個隊列同時綁定兩個binding
* @Create: 2019-09-23 17:44
**/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Work {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("***");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
final Channel channel1 = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel1.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 會告訴RabbitMQ不要同時給一個消費者推送多於N個消息
channel.basicQos(1);
channel1.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x1] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x1] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 同一個會話, consumerTag 是固定的 能夠作此會話的名字, deliveryTag 每次接收消息+1,能夠作此消息處理通道的名字。
// 所以 deliveryTag 能夠用來回傳告訴 rabbitmq 這個消息處理成功 清除此消息(basicAck方法)。
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
channel1.basicConsume(TASK_QUEUE_NAME, false, deliverCallback1, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
複製代碼
由於扇型交換機投遞消息的拷貝到全部綁定到它的隊列,因此他的應用案例都極其類似:
大規模多用戶在線(MMO)遊戲可使用它來處理排行榜更新等全局事件
體育新聞網站能夠用它來近乎實時地將比分更新分發給移動客戶端
分發系統使用它來廣播各類狀態和配置更新
在羣聊的時候,它被用來分發消息給參與羣聊的用戶。(AMQP 沒有內置 presence 的概念,所以 XMPP 可能會是個更好的選擇)
/**
* @Author: Young
* @Description: young.thrift.test_thrift.test
* @Create: 2019-09-23 18:16
**/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("***");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 聲明交換機及他的類型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
//
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
複製代碼
/**
* @Author: Young
* @Description: young.thrift.test_thrift.test
* @Create: 2019-09-23 19:12
**/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("***");
// 建立連接
Connection connection = factory.newConnection();
// 建立信道
Channel channel = connection.createChannel();
// 生命交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 將隊列與交換機綁定
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 回調函數
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 開始等待消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
複製代碼
而Topic 的路由規則是一種模糊匹配,能夠經過通配符知足一部分規則就能夠傳送。
它的約定是: 1)binding key 中能夠存在兩種特殊字符 「」 與「#」,用於作模糊匹配,其中 「」 用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)。 2)routing key 爲一個句點號 「.」 分隔的字符串(咱們將被句點號 「. 」 分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」 binding key 與 routing key 同樣也是句點號 「.」 分隔的字符串。
當生產者發送消息 Routing Key=F.C.E 的時候,這時候只知足 Queue1,因此會被路由到 Queue1 中,若是 Routing Key=A.C.E 這時候會被同是路由到 Queue1 和 Queue2 中,若是 Routing Key=A.F.B 時,這裏只會發送一條消息到 Queue2 中。
A.B.C
*.B.*
#.*.C
/**
* @Author: Young
* @Description: young.thrift.test_thrift.test
* @Create: 2019-09-23 20:26
**/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
// String[] strings={"A.B.C", "ABC"};
String[] strings={"E.B.G", "ABC"};
// String[] strings={"A.B", "AB"};
// String[] strings={"B", "B"};
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("***");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 聲明交換機及其類型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = strings[0];
String message = strings[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
複製代碼
/**
* @Author: Young
* @Description: young.thrift.test_thrift.test
* @Create: 2019-09-23 20:33
**/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
String[] strings = {"A.#", "*.*.C"};
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.3");
factory.setPort(5672);
factory.setUsername("young");
factory.setPassword("young");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 同一個通道綁定多個 bindingKey
for (String bindingKey : strings) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
複製代碼
headers 類型的 Exchange 不依賴於 routing key 與 binding key 的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。
頭交換機能夠視爲直連交換機的另外一種表現形式。但直連交換機的路由鍵必須是一個字符串,而頭屬性值則沒有這個約束,它們甚至能夠是整數或者哈希值(字典)等。靈活性更強(但實際上咱們不多用到頭交換機)。工做流程:
1)、綁定一個隊列到頭交換機上時,會同時綁定多個用於匹配的頭(header)。
2)、傳來的消息會攜帶header,以及會有一個 「x-match」 參數。當 「x-match」 設置爲 「any」 時,消息頭的任意一個值被匹配就能夠知足條件,而當 「x-match」 設置爲 「all」 的時候,就須要消息頭的全部值都匹配成功。
AMQP 中的隊列(queue)跟其餘消息隊列或任務隊列中的隊列是很類似的:它們存儲着即將被應用消費掉的消息。
隊列跟交換機共享某些屬性,可是隊列也有一些另外的屬性。
隊列在聲明(declare)後才能被使用。若是一個隊列尚不存在,聲明一個隊列會建立它。若是聲明的隊列已經存在,而且屬性徹底相同,那麼這次聲明不會對原有隊列產生任何影響。若是聲明中的屬性與已存在隊列的屬性有差別,那麼一個錯誤代碼爲 406 的通道級異常就會被拋出。
持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啓的時候,它依舊存在。沒有被持久化的隊列稱做暫存隊列(Transient queues)。並非全部的場景和案例都須要將隊列持久化。
持久化的隊列並不會使得路由到它的消息也具備持久性。假若消息代理掛掉了,從新啓動,那麼在重啓的過程當中持久化隊列會被從新聲明,不管怎樣,只有通過持久化的消息才能被從新恢復。
消費者應用(Consumer applications) - 用來接受和處理消息的應用 - 在處理消息的時候偶爾會失敗或者有時會直接崩潰掉。並且網絡緣由也有可能引發各類問題。這就給咱們出了個難題,AMQP 代理在何時刪除消息纔是正確的?AMQP 0-9-1 規範給咱們兩種建議:
若是一個消費者在還沒有發送確認回執的狀況下掛掉了,那麼AMQP代理會將消息從新投遞給另外一個消費者。若是當時沒有可用的消費者了,消息代理會死等下一個註冊到此隊列的消費者,而後再次嘗試投遞。
當一個消費者接收到某條消息後,處理過程有可能成功,有可能失敗。應用能夠向消息代理代表,本條消息因爲 「拒絕消息(Rejecting Messages)」 的緣由處理失敗了(或者未能在此時完成)。
當拒絕某條消息時,應用能夠告訴消息代理如何處理這條消息——銷燬它或者從新放入隊列。
當此隊列只有一個消費者時,請確認不要因爲拒絕消息而且選擇了從新放入隊列的行爲而引發消息在同一個消費者身上無限循環的狀況發生。
在 AMQP 中,basic.reject 方法用來執行拒絕消息的操做。但 basic.reject 有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的消息。可是若是你使用的是 RabbitMQ,那麼你可使用被稱做 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 擴展來解決這個問題。
在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每一個消費者一次能夠接受多少條消息是很是有用的。這能夠在試圖批量發佈消息的時候起到簡單的負載均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生產應用每分鐘才發送一條消息,這說明處理工做尚在運行。)
注意,RabbitMQ 只支持通道級的預取計數,而不是鏈接級的或者基於大小的預取。
AMQP 模型中的消息(Message)對象是帶有屬性(Attributes)的。有些屬性及其常見,以致於 AMQP 0-9-1 明確的定義了它們,而且應用開發者們無需費心思思考這些屬性名字所表明的具體含義。例如:
有些屬性是被 AMQP 代理所使用的,可是大多數是開放給接收它們的應用解釋器用的。有些屬性是可選的也被稱做消息頭(headers)。他們跟 HTTP 協議的 X-Headers 很類似。消息屬性須要在消息被髮布的時候定義。
AMQP 的消息除屬性外,也含有一個有效載荷 - Payload(消息實際攜帶的數據),它被 AMQP 代理看成不透明的字節數組來對待。
消息代理不會檢查或者修改有效載荷。消息能夠只包含屬性而不攜帶有效載荷。它一般會使用相似 JSON 這種序列化的格式數據,爲了節省,協議緩衝器和 MessagePack 將結構化數據序列化,以便以消息的有效載荷的形式發佈。AMQP 及其同行者們一般使用 「content-type」 和 「content-encoding」 這兩個字段來與消息溝通進行有效載荷的辨識工做,但這僅僅是基於約定而已。
消息可以以持久化的方式發佈,AMQP代理會將此消息存儲在磁盤上。若是服務器重啓,系統會確認收到的持久化消息未丟失。
簡單地將消息發送給一個持久化的交換機或者路由給一個持久化的隊列,並不會使得此消息具備持久化性質:它徹底取決與消息自己的持久模式(persistence mode)。將消息以持久化方式發佈時,會對性能形成必定的影響(就像數據庫操做同樣,健壯性的存在一定形成一些性能犧牲)。
RPC(Remote Procedure Call Protocol,遠程過程調用協議),通常都稱爲「遠程過程調用」。關於RPC協議自己,很少介紹,這裏只介紹Openstack如何利用AMQP來實現RPC。以下圖所示。
/**
* @Author: Young
* @Description: young.thrift.test_thrift.test
* @Create: 2019-09-23 20:45
**/
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
// 創建connection和channel。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("***");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
// 求0-32的斐波那契數列之和
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
// call方法來發送RPC請求
public String call(String message) throws IOException, InterruptedException {
// 生成correlationId
final String corrId = UUID.randomUUID().toString();
// 生成默認名字的queue用於reply,並訂閱它
String replyQueueName = channel.queueDeclare().getQueue();
// 發送request message,設置參數replyTo和correlationId.
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
// 由於消費者發送response是在另外一個線程中,咱們須要讓main線程阻塞,在這裏咱們使用BlockingQueue
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
// 消費者進行簡單的處理,爲每個response message檢查其correlationId,若是是,則將response添加進阻塞隊列
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
// 在隊列爲空時,獲取元素的線程會等待隊列變爲非空
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}
複製代碼
/**
* @Author: Young
* @Description: young.thrift.test_thrift.test
* @Create: 2019-09-23 20:46
**/
import com.rabbitmq.client.*;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
// 斐波那契函數
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 設置鏈接參數
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("***");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
// 清空隊列
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
// 對消息進行應答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 喚醒正在消費的進程
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// 在收到消息前,本線程進入等待狀態
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
複製代碼
參考:
若有不當之處,歡迎留言(手動滑稽)。。。