RabbitMQ消息隊列:Routing 消息路由

咱們構建了一個簡單的日誌系統。接下來,咱們將豐富它:可以使用不一樣的severity來監聽不一樣等級的log。好比咱們但願只有error的log才保存到磁盤上。java

一、綁定(Bindings)

在上一篇博客中咱們已經使用過綁定。相似下面的代碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定表示轉發器與隊列之間的關係。咱們也能夠簡單的認爲:隊列對該轉發器上的消息感興趣。
綁定能夠附帶一個額外的參數routingKey。爲了與避免basicPublish方法(發佈消息的方法)的參數混淆,咱們準備把它稱做綁定鍵(binding key)。下面展現如何使用綁定鍵(binding key)來建立一個綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵的意義依賴於轉發器的類型。對於fanout類型,忽略此參數。
web

二、直接轉發(Direct exchange)

上一篇的日誌系統廣播全部的消息給全部的消費者。咱們但願能夠對其擴展,來容許根據日誌的嚴重性進行過濾日誌。例如:咱們可能但願把致命類型的錯誤寫入硬盤,而不把硬盤空間浪費在警告或者消息類型的日誌上。
以前咱們使用fanout類型的轉發器,可是並無給咱們帶來更多的靈活性:僅僅能夠愚蠢的轉發。
咱們將會使用direct類型的轉發器進行替代。direct類型的轉發器背後的路由轉發算法很簡單:消息會被推送至綁定鍵(binding key)和消息發佈附帶的選擇鍵(routing key)徹底匹配的隊列。
圖解: 算法

上圖,咱們能夠看到direct類型的轉發器與兩個隊列綁定。第一個隊列與綁定鍵orange綁定,第二個隊列與轉發器間有兩個綁定,一個與綁定鍵black綁定,另外一個與green綁定鍵綁定。
這樣的話,當一個消息附帶一個選擇鍵(routing key) orange發佈至轉發器將會被導向到隊列Q1。消息附帶一個選擇鍵(routing key)black或者green將會被導向到Q2.全部的其餘的消息將會被丟棄。
dom

三、多重綁定(multiple bindings)

使用一個綁定鍵(binding key)綁定多個隊列是徹底合法的。如上圖,一個附帶選擇鍵(routing key)的消息將會被轉發到Q1和Q2。spa

四、發送日誌(Emittinglogs)

咱們準備將這種模式用於咱們的日誌系統。咱們將消息發送到direct類型的轉發器而不是fanout類型。咱們將把日誌的嚴重性做爲選擇鍵(routing key)。這樣的話,接收程序能夠根據嚴重性來選擇接收。咱們首先關注發送日誌的代碼:.net

像之前同樣,咱們須要先建立一個轉發器:日誌

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, true, null);orm

而後咱們準備發送一條消息:server

channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());rabbitmq

爲了簡化代碼,咱們假定‘severity’是‘info’,‘warning’,‘error’中的一個。

五、訂閱

接收消息的代碼和前面的博客的中相似,只有一點不一樣:咱們給咱們所感興趣的嚴重性類型的日誌建立一個綁定。

StringqueueName = channel.queueDeclare().getQueue();

for(Stringseverity : argv)

{

channel.queueBind(queueName, EXCHANGE_NAME, severity);

}

六、完整的實例

package event;

import java.util.Random;
import java.util.UUID;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect
{

 private static final String EXCHANGE_NAME = "ex_logs_direct";
 private static final String[] SEVERITIES = { "info", "warning", "error" };

 public static void main(String[] argv) throws java.io.IOException
 {
  // 建立鏈接和頻道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 聲明轉發器的類型
  //向全部綁定了相應routing key的隊列發送消息
        //若是producer在發佈消息時沒有consumer在監聽,消息將被丟棄
        //若是有多個consumer監聽了相同的routing key  則他們都會受到消息
  channel.exchangeDeclare(EXCHANGE_NAME, "direct");

  
       
  //發送6條消息
  for (int i = 0; i < 6; i++)
  {
   String severity = getSeverity();
   String message = severity + "_log :" + UUID.randomUUID().toString();
   // 發佈消息至轉發器,指定routingkey
   //向server發佈一條消息
            //參數1:exchange名字,若爲空則使用默認的exchange
            //參數2:routing key
            //參數3:其餘的屬性
            //參數4:消息體
            //RabbitMQ默認有一個exchange,叫default exchange,它用一個空字符串表示,它是direct exchange類型,
            //任何發往這個exchange的消息都會被路由到routing key的名字對應的隊列上,若是沒有對應的隊列,則消息會被丟棄
   channel.basicPublish(EXCHANGE_NAME, severity, null, message
     .getBytes());
   System.out.println(" [x] Sent '" + message + "'");
  }

  channel.close();
  connection.close();
 }

 /**
  * 隨機產生一種日誌類型
  *
  * @return
  */
 private static String getSeverity()
 {
  Random random = new Random();
  int ranVal = random.nextInt(3);
  return SEVERITIES[ranVal];
 }
}

接收端:

package event;

import java.util.Random;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsDirect
{

 private static final String EXCHANGE_NAME = "ex_logs_direct";
 private static final String[] SEVERITIES = { "info", "warning", "error" };

 public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 建立鏈接和頻道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 聲明direct類型轉發器
  channel.exchangeDeclare(EXCHANGE_NAME, "direct");

  String queueName = channel.queueDeclare().getQueue();
  String severity = getSeverity();
  System.out.println(severity);
  // 指定binding_key
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
  System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");

  QueueingConsumer consumer = new QueueingConsumer(channel);
  channel.basicConsume(queueName, true, consumer);

  while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());

   System.out.println(" [x] Received '" + message + "'");
  }
 }

 /**
  * 隨機產生一種日誌類型
  *
  * @return   */ private static String getSeverity() {  Random random = new Random();  int ranVal = random.nextInt(3);  return SEVERITIES[ranVal]; }}

相關文章
相關標籤/搜索