來了解RabbitMQ一個重要的概念:Exchange交換機java
藍色框:客戶端發送消息至交換機,經過路由鍵路由至指定的隊列。 黃色框:交換機和隊列經過路由鍵有一個綁定的關係。 綠色框:消費端經過監聽隊列來接收消息。git
Name
:交換機名稱 Type
:交換機類型——direct、topic、fanout、headers、sharding(此篇不講) Durability
:是否須要持久化,true爲持久化 Auto Delete
:當最後一個綁定到Exchange上的隊列刪除後,自動刪除該Exchange Internal
:當前Exchange是否用於RabbitMQ內部使用,默認爲false Arguments
:擴展參數,用於擴展AMQP協議自定製化使用github
注意:Direct模式可使用RabbitMQ自帶的Exchange:default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RouteKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。面試
重點:routing key與隊列queues 的key保持一致,便可以路由到對應的queue中。編程
生產端:服務器
/** * * @ClassName: Producer4DirectExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午22:15:52 * */ public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2建立Channel Channel channel = connection.createChannel(); //3 聲明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //4 發送 String msg = "Coder編程 Hello World RabbitMQ 4 Direct Exchange Message ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
消費端:微信
/** * * @ClassName: Consumer4DirectExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午22:18:52 * */ public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { //建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //聲明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示聲明瞭一個交換機 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示聲明瞭一個隊列 channel.queueDeclare(queueName, false, false, false, null); //創建一個綁定關係: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,若是沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
測試結果:app
注意須要routingKey保持一致。能夠本身嘗試修改routingkey,是否能收到消息。學習
注意:可使用通配符進行模糊匹配 符號 "#" 匹配一個或多個詞 符號 "" 匹配很少很多一個詞 例如:"log.#" 可以匹配到 "log.info.oa" "log." 只會匹配到 "log.error"測試
在一堆消息中,每一個不一樣的隊列只關心本身須要的消息。
生產端:
/** * * @ClassName: Producer4TopicExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午22:32:41 * */ public class Producer4TopicExchange { public static void main(String[] args) throws Exception { //1建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2建立Channel Channel channel = connection.createChannel(); //3聲明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //4發送 String msg = "Coder編程 Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); channel.close(); connection.close(); } }
消費端:
/** * * @ClassName: Consumer4TopicExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午22:37:12 * */ public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { //建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 聲明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; //String routingKey = "user.*"; String routingKey = "user.*"; // 1 聲明交換機 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 聲明隊列 channel.queueDeclare(queueName, false, false, false, null); // 3 創建交換機和隊列的綁定關係: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,若是沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
測試結果:
注意一個問題:須要進行解綁
生產端:
/** * * @ClassName: Producer4FanoutExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午23:01:16 * */ public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { //1建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2 建立Channel Channel channel = connection.createChannel(); //3 聲明 String exchangeName = "test_fanout_exchange"; //4 發送 for(int i = 0; i < 10; i ++) { String msg = "Coder 編程 Hello World RabbitMQ 4 FANOUT Exchange Message ..."; channel.basicPublish(exchangeName, "", null , msg.getBytes()); } channel.close(); connection.close(); } }
消費端:
/** * * @ClassName: Consumer4FanoutExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午23:21:18 * */ public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { //建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 聲明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; //不設置路由鍵 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,若是沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
測試結果:
content_type、content_encoding、priority
correlation_id、reply_to、expiration、message_id
timestamp、type、user_id、app_id、cluster_id
RabbitMQ的概念、安裝與使用、管控臺操做、結合RabbitMQ的特性、Exchange、Queue、Binding 、RoutingKey、Message進行覈銷API的講解,經過本章的學習,但願你們對RabbitMQ有一個初步的認識。
歡迎關注我的微信公衆號:Coder編程 獲取最新原創技術文章和免費學習資料,更有大量精品思惟導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識! 新建了一個qq羣:315211365,歡迎你們進羣交流一塊兒學習。謝謝了!也能夠介紹給身邊有須要的朋友。
文章收錄至 Github: https://github.com/CoderMerlin/coder-programming Gitee: https://gitee.com/573059382/coder-programming 歡迎關注並star~
參考文章:
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!