來了解RabbitMQ一個重要的概念:Exchange交換機java
藍色框:客戶端發送消息至交換機,經過路由鍵路由至指定的隊列。 黃色框:交換機和隊列經過路由鍵有一個綁定的關係。 綠色框:消費端經過監聽隊列來接收消息。git
Name
:交換機名稱 Type
:交換機類型——direct、topic、fanout、headers、sharding(此篇不講) Durability
:是否須要持久化,true爲持久化 Auto Delete
:當最後一個綁定到Exchange上的隊列刪除後,自動刪除該Exchange Internal
:當前Exchange是否用於RabbitMQ內部使用,默認爲false Arguments
:擴展參數,用於擴展AMQP協議自定製化使用github
注意:Direct模式可使用RabbitMQ自帶的Exchange:default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RouteKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。面試
重點:routing key與隊列queues 的key保持一致,便可以路由到對應的queue中。編程
生產端:服務器
/** * * @ClassName: Producer4DirectExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午22:15:52 * */
public class Producer4DirectExchange {
public static void main(String[] args) throws Exception {
//1建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
//2建立Channel
Channel channel = connection.createChannel();
//3 聲明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
//4 發送
String msg = "Coder編程 Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
複製代碼
消費端:微信
/** * * @ClassName: Consumer4DirectExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午22:18:52 * */
public class Consumer4DirectExchange {
public static void main(String[] args) throws Exception {
//建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示聲明瞭一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示聲明瞭一個隊列
channel.queueDeclare(queueName, false, false, false, null);
//創建一個綁定關係:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
複製代碼
測試結果:app
注意須要routingKey保持一致。能夠本身嘗試修改routingkey,是否能收到消息。學習
注意:可使用通配符進行模糊匹配 符號 "#" 匹配一個或多個詞 符號 "" 匹配很少很多一個詞 例如:"log.#" 可以匹配到 "log.info.oa" "log." 只會匹配到 "log.error"測試
在一堆消息中,每一個不一樣的隊列只關心本身須要的消息。
生產端:
/** * * @ClassName: Producer4TopicExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午22:32:41 * */
public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
//1建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
//2建立Channel
Channel channel = connection.createChannel();
//3聲明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//4發送
String msg = "Coder編程 Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
複製代碼
消費端:
/** * * @ClassName: Consumer4TopicExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午22:37:12 * */
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
//建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 聲明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.*";
// 1 聲明交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 2 聲明隊列
channel.queueDeclare(queueName, false, false, false, null);
// 3 創建交換機和隊列的綁定關係:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
複製代碼
測試結果:
注意一個問題:須要進行解綁
生產端:
/** * * @ClassName: Producer4FanoutExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午23:01:16 * */
public class Producer4FanoutExchange {
public static void main(String[] args) throws Exception {
//1建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
//2 建立Channel
Channel channel = connection.createChannel();
//3 聲明
String exchangeName = "test_fanout_exchange";
//4 發送
for(int i = 0; i < 10; i ++) {
String msg = "Coder 編程 Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
channel.close();
connection.close();
}
}
複製代碼
消費端:
/** * * @ClassName: Consumer4FanoutExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午23:21:18 * */
public class Consumer4FanoutExchange {
public static void main(String[] args) throws Exception {
//建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 聲明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; //不設置路由鍵
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
複製代碼
測試結果:
content_type、content_encoding、priority
correlation_id、reply_to、expiration、message_id
timestamp、type、user_id、app_id、cluster_id
RabbitMQ的概念、安裝與使用、管控臺操做、結合RabbitMQ的特性、Exchange、Queue、Binding 、RoutingKey、Message進行覈銷API的講解,經過本章的學習,但願你們對RabbitMQ有一個初步的認識。
歡迎關注我的微信公衆號:Coder編程 獲取最新原創技術文章和免費學習資料,更有大量精品思惟導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識! 新建了一個qq羣:315211365,歡迎你們進羣交流一塊兒學習。謝謝了!也能夠介紹給身邊有須要的朋友。
文章收錄至 Github: github.com/CoderMerlin… Gitee: gitee.com/573059382/c… 歡迎關注並star~
參考文章: ![]()
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!