RabbitMQ (五)主題(Topic)

轉載請標明出處:http://blog.csdn.net/lmj623565791/article/details/37706355 java

上一篇博客中,咱們進步改良了咱們的日誌系統。咱們使用direct類型轉發器,使得接收者有能力進行選擇性的接收日誌,,而非fanout那樣,只可以無腦的轉發,若是你還不瞭解:RabbitMQ (四) 路由選擇 (Routing)dom

雖然使用direct類型改良了咱們的系統,可是仍然存在一些侷限性:它不可以基於多重條件進行路由選擇。
在咱們的日誌系統中,咱們有可能但願不只根據日誌的級別並且想根據日誌的來源進行訂閱。這個概念相似unix工具:syslog,它轉發日誌基於嚴重性(info/warning/crit…)和設備(auth/cron/kern…)
這樣可能給咱們更多的靈活性:咱們可能只想訂閱來自’cron’的致命錯誤日誌,而不是來自’kern’的。
爲了在咱們的系統中實現上述的需求,咱們須要學習稍微複雜的主題類型的轉發器(topic exchange)。

一、 主題轉發(Topic Exchange)
發往主題類型的轉發器的消息不能隨意的設置選擇鍵(routing_key),必須是由點隔開的一系列的標識符組成。標識符能夠是任何東西,可是通常都與消息的某些特性相關。一些合法的選擇鍵的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你能夠定義任何數量的標識符,上限爲255個字節。
綁定鍵和選擇鍵的形式同樣。主題類型的轉發器背後的邏輯和直接類型的轉發器很相似:一個附帶特殊的選擇鍵將會被轉發到綁定鍵與之匹配的隊列中。須要注意的是:關於綁定鍵有兩種特殊的狀況。
*能夠匹配一個標識符。
#能夠匹配0個或多個標識符。
二、 圖解:
咱們準備發送關於動物的消息。消息會附加一個選擇鍵包含3個標識符(兩個點隔開)。第一個標識符描述動物的速度,第二個標識符描述動物的顏色,第三個標識符描述動物的物種:<speed>.<color>.<species>。
咱們建立3個綁定鍵:Q1與*.orange.*綁定Q2與*.*.rabbit和lazy.#綁定。
能夠簡單的認爲:
Q1對全部的橙色動物感興趣。
Q2想要知道關於兔子的一切以及關於懶洋洋的動物的一切。
一個附帶quick.orange.rabbit的選擇鍵的消息將會被轉發到兩個隊列。附帶lazy.orange.elephant的消息也會被轉發到兩個隊列。另外一方面quick.orange.fox只會被轉發到Q1,lazy.brown.fox將會被轉發到Q2。lazy.pink.rabbit雖然與兩個綁定鍵匹配,可是也只會被轉發到Q2一次。quick.brown.fox不能與任何綁定鍵匹配,因此會被丟棄。
若是咱們違法咱們的約定,發送一個或者四個標識符的選擇鍵,相似:orange,quick.orange.male.rabbit,這些選擇鍵不能與任何綁定鍵匹配,因此消息將會被丟棄。
另外一方面,lazy.orange.male.rabbit,雖然是四個標識符,也能夠與lazy.#匹配,從而轉發至Q2。
注:主題類型的轉發器很是強大,能夠實現其餘類型的轉發器。
當一個隊列與綁定鍵#綁定,將會收到全部的消息,相似fanout類型轉發器。
當綁定鍵中不包含任何#與*時,相似direct類型轉發器。
三、 完整的例子

發送端EmitLogTopic.java: 工具

[java]  view plain  copy
  在CODE上查看代碼片 派生到個人代碼片
  1. package com.zhy.rabbit._05_topic_exchange;  
  2.   
  3. import java.util.UUID;  
  4.   
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8.   
  9. public class EmitLogTopic  
  10. {  
  11.   
  12.     private static final String EXCHANGE_NAME = "topic_logs";  
  13.   
  14.     public static void main(String[] argv) throws Exception  
  15.     {  
  16.         // 建立鏈接和頻道  
  17.         ConnectionFactory factory = new ConnectionFactory();  
  18.         factory.setHost("localhost");  
  19.         Connection connection = factory.newConnection();  
  20.         Channel channel = connection.createChannel();  
  21.   
  22.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  23.   
  24.         String[] routing_keys = new String[] { "kernal.info""cron.warning",  
  25.                 "auth.info""kernel.critical" };  
  26.         for (String routing_key : routing_keys)  
  27.         {  
  28.             String msg = UUID.randomUUID().toString();  
  29.             channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg  
  30.                     .getBytes());  
  31.             System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");  
  32.         }  
  33.   
  34.         channel.close();  
  35.         connection.close();  
  36.     }  
  37. }  

咱們發送了4條消息,分別設置了不一樣的選擇鍵。

接收端1,ReceiveLogsTopicForKernel.java 學習

[java]  view plain  copy
  在CODE上查看代碼片 派生到個人代碼片
  1. package com.zhy.rabbit._05_topic_exchange;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.QueueingConsumer;  
  7.   
  8. public class ReceiveLogsTopicForKernel  
  9. {  
  10.   
  11.     private static final String EXCHANGE_NAME = "topic_logs";  
  12.   
  13.     public static void main(String[] argv) throws Exception  
  14.     {  
  15.         // 建立鏈接和頻道  
  16.         ConnectionFactory factory = new ConnectionFactory();  
  17.         factory.setHost("localhost");  
  18.         Connection connection = factory.newConnection();  
  19.         Channel channel = connection.createChannel();  
  20.         // 聲明轉發器  
  21.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  22.         // 隨機生成一個隊列  
  23.         String queueName = channel.queueDeclare().getQueue();  
  24.           
  25.         //接收全部與kernel相關的消息  
  26.         channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");  
  27.   
  28.         System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");  
  29.   
  30.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  31.         channel.basicConsume(queueName, true, consumer);  
  32.   
  33.         while (true)  
  34.         {  
  35.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  36.             String message = new String(delivery.getBody());  
  37.             String routingKey = delivery.getEnvelope().getRoutingKey();  
  38.   
  39.             System.out.println(" [x] Received routingKey = " + routingKey  
  40.                     + ",msg = " + message + ".");  
  41.         }  
  42.     }  
  43. }  

直接收和Kernel相關的日誌消息。

接收端2,ReceiveLogsTopicForCritical.java ui

[java]  view plain  copy
  在CODE上查看代碼片 派生到個人代碼片
  1. package com.zhy.rabbit._05_topic_exchange;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.QueueingConsumer;  
  7.   
  8. public class ReceiveLogsTopicForCritical  
  9. {  
  10.   
  11.     private static final String EXCHANGE_NAME = "topic_logs";  
  12.   
  13.     public static void main(String[] argv) throws Exception  
  14.     {  
  15.         // 建立鏈接和頻道  
  16.         ConnectionFactory factory = new ConnectionFactory();  
  17.         factory.setHost("localhost");  
  18.         Connection connection = factory.newConnection();  
  19.         Channel channel = connection.createChannel();  
  20.         // 聲明轉發器  
  21.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  22.         // 隨機生成一個隊列  
  23.         String queueName = channel.queueDeclare().getQueue();  
  24.   
  25.         // 接收全部與kernel相關的消息  
  26.         channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");  
  27.   
  28.         System.out  
  29.                 .println(" [*] Waiting for critical messages. To exit press CTRL+C");  
  30.   
  31.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  32.         channel.basicConsume(queueName, true, consumer);  
  33.   
  34.         while (true)  
  35.         {  
  36.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  37.             String message = new String(delivery.getBody());  
  38.             String routingKey = delivery.getEnvelope().getRoutingKey();  
  39.   
  40.             System.out.println(" [x] Received routingKey = " + routingKey  
  41.                     + ",msg = " + message + ".");  
  42.         }  
  43.     }  
  44. }  

只接收致命錯誤的日誌消息。

運行結果: spa

 [x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.
 [x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.
 [x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.
 [x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9. .net

-------------------------------------------------------------------------------------------------------------------- 3d

 [*] Waiting for messages about kernel. To exit press CTRL+C
 [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9. unix

-------------------------------------------------------------------------------------------------------------------- 日誌

 [*] Waiting for critical messages. To exit press CTRL+C
 [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

能夠看到,咱們經過使用topic類型的轉發器,成功實現了多重條件選擇的訂閱。

相關文章
相關標籤/搜索