RabbitMQ (四) 路由選擇 (Routing)

上一篇博客咱們創建了一個簡單的日誌系統,咱們可以廣播日誌消息給全部你的接收者,若是你不瞭解,請查看:RabbitMQ (三) 發佈/訂閱。本篇博客咱們準備給日誌系統添加新的特性,讓日誌接收者可以訂閱部分消息。例如,咱們能夠僅僅將致命的錯誤寫入日誌文件,然而仍然在控制面板上打印出全部的其餘類型的日誌消息。 java

一、綁定(Bindings)
在上一篇博客中咱們已經使用過綁定。相似下面的代碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定表示轉發器與隊列之間的關係。咱們也能夠簡單的認爲:隊列對該轉發器上的消息感興趣。
綁定能夠附帶一個額外的參數routingKey。爲了與避免basicPublish方法(發佈消息的方法)的參數混淆,咱們準備把它稱做綁定鍵(binding key)。下面展現如何使用綁定鍵(binding key)來建立一個綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵的意義依賴於轉發器的類型。對於fanout類型,忽略此參數。
二、直接轉發(Direct exchange)
上一篇的日誌系統廣播全部的消息給全部的消費者。咱們但願能夠對其擴展,來容許根據日誌的嚴重性進行過濾日誌。例如:咱們可能但願把致命類型的錯誤寫入硬盤,而不把硬盤空間浪費在警告或者消息類型的日誌上。
以前咱們使用fanout類型的轉發器,可是並無給咱們帶來更多的靈活性:僅僅能夠愚蠢的轉發。
咱們將會使用direct類型的轉發器進行替代。direct類型的轉發器背後的路由轉發算法很簡單:消息會被推送至綁定鍵(binding key)和消息發佈附帶的選擇鍵(routing key)徹底匹配的隊列。
圖解:

上圖,咱們能夠看到direct類型的轉發器與兩個隊列綁定。第一個隊列與綁定鍵orange綁定,第二個隊列與轉發器間有兩個綁定,一個與綁定鍵black綁定,另外一個與green綁定鍵綁定。
這樣的話,當一個消息附帶一個選擇鍵(routing key) orange發佈至轉發器將會被導向到隊列Q1。消息附帶一個選擇鍵(routing key)black或者green將會被導向到Q2.全部的其餘的消息將會被丟棄。 算法

三、多重綁定(multiple bindings)

使用一個綁定鍵(binding key)綁定多個隊列是徹底合法的。如上圖,一個附帶選擇鍵(routing key)的消息將會被轉發到Q1和Q2。 dom

四、發送日誌(Emittinglogs)

咱們準備將這種模式用於咱們的日誌系統。咱們將消息發送到direct類型的轉發器而不是fanout類型。咱們將把日誌的嚴重性做爲選擇鍵(routing key)。這樣的話,接收程序能夠根據嚴重性來選擇接收。咱們首先關注發送日誌的代碼: spa

像之前同樣,咱們須要先建立一個轉發器: .net

channel.exchangeDeclare(EXCHANGE_NAME,"direct"); 日誌

而後咱們準備發送一條消息: blog

channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes()); rabbitmq

爲了簡化代碼,咱們假定‘severity’是‘info’,‘warning’,‘error’中的一個。 隊列

五、訂閱

接收消息的代碼和前面的博客的中相似,只有一點不一樣:咱們給咱們所感興趣的嚴重性類型的日誌建立一個綁定。 ip

StringqueueName = channel.queueDeclare().getQueue();

for(Stringseverity : argv)

{

channel.queueBind(queueName, EXCHANGE_NAME, severity);

}

六、完整的實例
發送端:EmitLogDirect.java
[java]  view plain  copy
  1. package com.zhy.rabbit._04_binding_key;  
  2.   
  3. import java.util.Random;  
  4. import java.util.UUID;  
  5.   
  6. import com.rabbitmq.client.Channel;  
  7. import com.rabbitmq.client.Connection;  
  8. import com.rabbitmq.client.ConnectionFactory;  
  9.   
  10. public class EmitLogDirect  
  11. {  
  12.   
  13.     private static final String EXCHANGE_NAME = "ex_logs_direct";  
  14.     private static final String[] SEVERITIES = { "info""warning""error" };  
  15.   
  16.     public static void main(String[] argv) throws java.io.IOException  
  17.     {  
  18.         // 建立鏈接和頻道  
  19.         ConnectionFactory factory = new ConnectionFactory();  
  20.         factory.setHost("localhost");  
  21.         Connection connection = factory.newConnection();  
  22.         Channel channel = connection.createChannel();  
  23.         // 聲明轉發器的類型  
  24.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  25.   
  26.         //發送6條消息  
  27.         for (int i = 0; i < 6; i++)  
  28.         {  
  29.             String severity = getSeverity();  
  30.             String message = severity + "_log :" + UUID.randomUUID().toString();  
  31.             // 發佈消息至轉發器,指定routingkey  
  32.             channel.basicPublish(EXCHANGE_NAME, severity, null, message  
  33.                     .getBytes());  
  34.             System.out.println(" [x] Sent '" + message + "'");  
  35.         }  
  36.   
  37.         channel.close();  
  38.         connection.close();  
  39.     }  
  40.   
  41.     /** 
  42.      * 隨機產生一種日誌類型 
  43.      *  
  44.      * @return  
  45.      */  
  46.     private static String getSeverity()  
  47.     {  
  48.         Random random = new Random();  
  49.         int ranVal = random.nextInt(3);  
  50.         return SEVERITIES[ranVal];  
  51.     }  
  52. }  

隨機發送6條隨機類型(routing key)的日誌給轉發器~~
接收端:ReceiveLogsDirect.java
[java]  view plain  copy
  1. package com.zhy.rabbit._04_binding_key;  
  2.   
  3. import java.util.Random;  
  4.   
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8. import com.rabbitmq.client.QueueingConsumer;  
  9.   
  10. public class ReceiveLogsDirect  
  11. {  
  12.   
  13.     private static final String EXCHANGE_NAME = "ex_logs_direct";  
  14.     private static final String[] SEVERITIES = { "info""warning""error" };  
  15.   
  16.     public static void main(String[] argv) throws java.io.IOException,  
  17.             java.lang.InterruptedException  
  18.     {  
  19.         // 建立鏈接和頻道  
  20.         ConnectionFactory factory = new ConnectionFactory();  
  21.         factory.setHost("localhost");  
  22.         Connection connection = factory.newConnection();  
  23.         Channel channel = connection.createChannel();  
  24.         // 聲明direct類型轉發器  
  25.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  26.   
  27.         String queueName = channel.queueDeclare().getQueue();  
  28.         String severity = getSeverity();  
  29.         // 指定binding_key  
  30.         channel.queueBind(queueName, EXCHANGE_NAME, severity);  
  31.         System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");  
  32.   
  33.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  34.         channel.basicConsume(queueName, true, consumer);  
  35.   
  36.         while (true)  
  37.         {  
  38.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  39.             String message = new String(delivery.getBody());  
  40.   
  41.             System.out.println(" [x] Received '" + message + "'");  
  42.         }  
  43.     }  
  44.   
  45.     /** 
  46.      * 隨機產生一種日誌類型 
  47.      *  
  48.      * @return  
  49.      */  
  50.     private static String getSeverity()  
  51.     {  
  52.         Random random = new Random();  
  53.         int ranVal = random.nextInt(3);  
  54.         return SEVERITIES[ranVal];  
  55.     }  
  56. }  

接收端隨機設置一個日誌嚴重級別(binding_key)。。。
我開啓了3個接收端程序,兩個準備接收error類型日誌,一個接收info類型日誌,而後運行發送端程序
運行結果:
 [x] Sent 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Sent 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Sent 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Sent 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
 [x] Sent 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'
 [x] Sent 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'
------------------------------------------------------------------------------------
 [*] Waiting for error logs. To exit press CTRL+C
 [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
------------------------------------------------------------------------------------
 [*] Waiting for error logs. To exit press CTRL+C
 [x] Received 'error_log :d142b096-46c0-4380-a1d2-d8b2ac136a9c'
 [x] Received 'error_log :55ee1fc4-c87c-4e5e-81ba-49433890b9ce'
 [x] Received 'error_log :d01877d6-87c7-4e0a-a109-697d122bc4c9'
 [x] Received 'error_log :b42471b1-875c-43f1-b1ea-0dd5b49863f3'
------------------------------------------------------------------------------------
 [*] Waiting for info logs. To exit press CTRL+C
 [x] Received 'info_log :a6c1bc87-efb0-43eb-8314-8a74c345ed05'
 [x] Received 'info_log :b6a84b6a-353e-4e88-8c23-c791d93b44be'

能夠看到咱們實現了博文開頭所描述的特性,接收者能夠自定義本身感興趣類型的日誌。
其實文章這麼長就在說:發送消息時能夠設置routing_key,接收隊列與轉發器間能夠設置binding_key,接收者接收與binding_key與routing_key相同的消息。
相關文章
相關標籤/搜索