RabbitMQ】三種Exchange模式——訂閱、路由、通配符模式

這篇博客介紹訂閱、路由和通配符模式,之因此放在一塊兒介紹,是由於這三種模式都是用了Exchange交換機,消息沒有直接發送到隊列,而是發送到了交換機,通過隊列綁定交換機到達隊列。java

性能排序:fanout > direct >> topic。比例大約爲11:10:6     
服務器

1、訂閱模式(Fanout Exchange):

   一個生產者,多個消費者,每個消費者都有本身的一個隊列,生產者沒有將消息直接發送到隊列,而是發送到了交換機,每一個隊列綁定交換機,生產者發送的消息通過交換機,到達隊列,實現一個消息被多個消費者獲取的目的。須要注意的是,若是將消息發送到一個沒有隊列綁定的exchange上面,那麼該消息將會丟失,這是由於在rabbitMQ中exchange不具有存儲消息的能力,只有隊列具有存儲消息的能力。ide

      

         

任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上。性能

1.能夠理解爲路由表的模式url

2.這種模式不須要RouteKeyspa

3.這種模式須要提早將Exchange與Queue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定。.net

4.若是接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。3d

示例代碼:
orm

生產者:blog

[java] view plain copy

  1. public class Send {  

  2.   

  3.     private final static String EXCHANGE_NAME = "test_exchange_fanout";  

  4.   

  5.     public static void main(String[] argv) throws Exception {  

  6.         // 獲取到鏈接以及mq通道  

  7.         Connection connection = ConnectionUtil.getConnection();  

  8.         //從鏈接中建立通道  

  9.         Channel channel = connection.createChannel();  

  10.   

  11.         // 聲明exchange  

  12.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  

  13.   

  14.         // 消息內容  

  15.         String message = "商品已經新增,id = 1000";  

  16.         channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes());  

  17.           

  18.         System.out.println(" [x] Sent '" + message + "'");  

  19.   

  20.         channel.close();  

  21.         connection.close();  

  22.     }  

  23. }  


消費者1:

[java] view plain copy

  1. public class Recv {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_fanout_1";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_fanout";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 獲取到鏈接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 聲明隊列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 綁定隊列到交換機  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");  

  18.   

  19.         // 同一時刻服務器只會發一條消息給消費者  

  20.         channel.basicQos(1);  

  21.   

  22.         // 定義隊列的消費者  

  23.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  24.         // 監聽隊列,手動返回完成  

  25.         channel.basicConsume(QUEUE_NAME, true, consumer);  

  26.   

  27.         // 獲取消息  

  28.         while (true) {  

  29.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  30.             String message = new String(delivery.getBody());  

  31.             System.out.println(" 前臺系統: '" + message + "'");  

  32.             Thread.sleep(10);  

  33.   

  34.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  35.         }  

  36.     }  

  37. }  


  消費者2的代碼和消費者1的代碼大體相同,只是隊列的名稱不同,這樣兩個消費者有本身的隊列,均可以接收到生產者發送的消息


  可是若是生產者有新增商品,修改商品,刪除商品的消息,消費者包快前臺系統和搜索系統,要求前臺系統接收修改和刪除商品的消息,搜索系統接收新增商品、修改商品和刪除商品的消息。因此使用這種訂閱模式實現商品數據的同步並不合理。所以咱們介紹下一種模式:路由模式。


2、路由模式(Direct Exchange)

  這種模式添加了一個路由鍵,生產者發佈消息的時候添加路由鍵,消費者綁定隊列到交換機時添加鍵值,這樣就能夠接收到須要接收的消息。

       


任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue

1.通常狀況可使用rabbitMQ自帶的Exchange:」"(該Exchange的名字爲空字符串,下文稱其爲default Exchange)。

2.這種模式下不須要將Exchange進行任何綁定(binding)操做

3.消息傳遞時須要一個「RouteKey」,能夠簡單的理解爲要發送到的隊列名字。

4.若是vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。

示例代碼:

生產者:

[java] view plain copy

  1. public class Send {  

  2.   

  3.     private final static String EXCHANGE_NAME = "test_exchange_direct";  

  4.   

  5.     public static void main(String[] argv) throws Exception {  

  6.         // 獲取到鏈接以及mq通道  

  7.         Connection connection = ConnectionUtil.getConnection();  

  8.         Channel channel = connection.createChannel();  

  9.   

  10.         // 聲明exchange  

  11.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  

  12.   

  13.         // 消息內容  

  14.         String message = "刪除商品, id = 1001";  

  15.         channel.basicPublish(EXCHANGE_NAME, "delete"null, message.getBytes());  

  16.         System.out.println(" [x] Sent '" + message + "'");  

  17.   

  18.         channel.close();  

  19.         connection.close();  

  20.     }  

  21. }  


消費者1:接收更新和刪除消息

[java] view plain copy

  1. public class Recv {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_direct_1";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_direct";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 獲取到鏈接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 聲明隊列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 綁定隊列到交換機  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");  

  18.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");  

  19.   

  20.         // 同一時刻服務器只會發一條消息給消費者  

  21.         channel.basicQos(1);  

  22.   

  23.         // 定義隊列的消費者  

  24.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  25.         // 監聽隊列,手動返回完成  

  26.         channel.basicConsume(QUEUE_NAME, false, consumer);  

  27.   

  28.         // 獲取消息  

  29.         while (true) {  

  30.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  31.             String message = new String(delivery.getBody());  

  32.             System.out.println(" 前臺系統: '" + message + "'");  

  33.             Thread.sleep(10);  

  34.   

  35.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  36.         }  

  37.     }  

  38. }  


消費者2:接收insert,update,delete的消息

[java] view plain copy

  1. public class Recv2 {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_direct_2";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_direct";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 獲取到鏈接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 聲明隊列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 綁定隊列到交換機  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");  

  18.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");  

  19.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");  

  20.   

  21.         // 同一時刻服務器只會發一條消息給消費者  

  22.         channel.basicQos(1);  

  23.   

  24.         // 定義隊列的消費者  

  25.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  26.         // 監聽隊列,手動返回完成  

  27.         channel.basicConsume(QUEUE_NAME, false, consumer);  

  28.   

  29.         // 獲取消息  

  30.         while (true) {  

  31.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  32.             String message = new String(delivery.getBody());  

  33.             System.out.println(" 搜索系統: '" + message + "'");  

  34.             Thread.sleep(10);  

  35.   

  36.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  37.         }  

  38.     }  

  39. }  

  若是生產者發佈了insert消息,那麼消費者2能夠收到,消費者 1收不到,若是發佈了update或者delete消息,兩個消費者均可以收到。若是發佈ABC消息兩個消費者都收不到,由於沒有綁定這個鍵值。這種模式基本知足了咱們的需求,可是還不夠靈活,下面介紹另一個模式。


3、通配符模式(Topic Exchange)

   基本思想和路由模式是同樣的,只不過路由鍵支持模糊匹配,符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞

       

任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue上

1.這種模式較爲複雜,簡單來講,就是每一個隊列都有其關心的主題,全部的消息都帶有一個「標題」(RouteKey),Exchange會將消息轉發到全部關注主題能與RouteKey模糊匹配的隊列。

2.這種模式須要RouteKey,也許要提早綁定Exchange與Queue。

3.在進行綁定時,要提供一個該隊列關心的主題,如「#.log.#」表示該隊列關心全部涉及log的消息(一個RouteKey爲」MQ.log.error」的消息會被轉發到該隊列)。

4.「#」表示0個或若干個關鍵字,「*」表示一個關鍵字。如「log.*」能與「log.warn」匹配,沒法與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配。

5.一樣,若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息。

示例代碼:

生產者:

[java] view plain copy

  1. public class Send {  

  2.   

  3.     private final static String EXCHANGE_NAME = "test_exchange_topic";  

  4.   

  5.     public static void main(String[] argv) throws Exception {  

  6.         // 獲取到鏈接以及mq通道  

  7.         Connection connection = ConnectionUtil.getConnection();  

  8.         Channel channel = connection.createChannel();  

  9.   

  10.         // 聲明exchange  

  11.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  

  12.   

  13.         // 消息內容  

  14.         String message = "刪除商品,id = 1001";  

  15.         channel.basicPublish(EXCHANGE_NAME, "item.delete"null, message.getBytes());  

  16.         System.out.println(" [x] Sent '" + message + "'");  

  17.   

  18.         channel.close();  

  19.         connection.close();  

  20.     }  

  21. }  


消費者1:

[java] view plain copy

  1. public class Recv {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_topic_1";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_topic";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 獲取到鏈接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 聲明隊列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 綁定隊列到交換機  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");  

  18.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");  

  19.   

  20.         // 同一時刻服務器只會發一條消息給消費者  

  21.         channel.basicQos(1);  

  22.   

  23.         // 定義隊列的消費者  

  24.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  25.         // 監聽隊列,手動返回完成  

  26.         channel.basicConsume(QUEUE_NAME, false, consumer);  

  27.   

  28.         // 獲取消息  

  29.         while (true) {  

  30.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  31.             String message = new String(delivery.getBody());  

  32.             System.out.println(" 前臺系統: '" + message + "'");  

  33.             Thread.sleep(10);  

  34.   

  35.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  36.         }  

  37.     }  

  38. }  



消費者2:

[java] view plain copy

  1. public class Recv2 {  

  2.   

  3.     private final static String QUEUE_NAME = "test_queue_topic_2";  

  4.   

  5.     private final static String EXCHANGE_NAME = "test_exchange_topic";  

  6.   

  7.     public static void main(String[] argv) throws Exception {  

  8.   

  9.         // 獲取到鏈接以及mq通道  

  10.         Connection connection = ConnectionUtil.getConnection();  

  11.         Channel channel = connection.createChannel();  

  12.   

  13.         // 聲明隊列  

  14.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  

  15.   

  16.         // 綁定隊列到交換機  

  17.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");  

  18.   

  19.         // 同一時刻服務器只會發一條消息給消費者  

  20.         channel.basicQos(1);  

  21.   

  22.         // 定義隊列的消費者  

  23.         QueueingConsumer consumer = new QueueingConsumer(channel);  

  24.         // 監聽隊列,手動返回完成  

  25.         channel.basicConsume(QUEUE_NAME, false, consumer);  

  26.   

  27.         // 獲取消息  

  28.         while (true) {  

  29.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  30.             String message = new String(delivery.getBody());  

  31.             System.out.println(" 搜索系統: '" + message + "'");  

  32.             Thread.sleep(10);  

  33.   

  34.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

  35.         }  

  36.     }  

  37. }  


  消費者1是按需索取,並無使用通配符模式,而是用的徹底匹配,消費者2使用通配符模式,這樣以item.開頭的消息都會所有接收。


小結:

  1.與簡單模式和work模式對比,前面兩種同一個消息只能被一個消費者獲取,而今天的這三種模式,能夠實現一個消息被多個消費者 獲取。

  2.fanout這種模式沒有加入路由器,隊列與exchange綁定後,就會接收到全部的消息,其他兩種增長了路由鍵,而且第三種增長通配符,更加便利。

本文出自https://blog.csdn.net/ww130929/article/details/72842234

相關文章
相關標籤/搜索