咱們構建了一個簡單的日誌系統。接下來,咱們將豐富它:可以使用不一樣的severity來監聽不一樣等級的log。好比咱們但願只有error的log才保存到磁盤上。java
在上一篇博客中咱們已經使用過綁定。相似下面的代碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定表示轉發器與隊列之間的關係。咱們也能夠簡單的認爲:隊列對該轉發器上的消息感興趣。
綁定能夠附帶一個額外的參數routingKey。爲了與避免basicPublish方法(發佈消息的方法)的參數混淆,咱們準備把它稱做綁定鍵(binding key)。下面展現如何使用綁定鍵(binding key)來建立一個綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵的意義依賴於轉發器的類型。對於fanout類型,忽略此參數。
web
上一篇的日誌系統廣播全部的消息給全部的消費者。咱們但願能夠對其擴展,來容許根據日誌的嚴重性進行過濾日誌。例如:咱們可能但願把致命類型的錯誤寫入硬盤,而不把硬盤空間浪費在警告或者消息類型的日誌上。
以前咱們使用fanout類型的轉發器,可是並無給咱們帶來更多的靈活性:僅僅能夠愚蠢的轉發。
咱們將會使用direct類型的轉發器進行替代。direct類型的轉發器背後的路由轉發算法很簡單:消息會被推送至綁定鍵(binding key)和消息發佈附帶的選擇鍵(routing key)徹底匹配的隊列。
圖解: 算法
上圖,咱們能夠看到direct類型的轉發器與兩個隊列綁定。第一個隊列與綁定鍵orange綁定,第二個隊列與轉發器間有兩個綁定,一個與綁定鍵black綁定,另外一個與green綁定鍵綁定。
這樣的話,當一個消息附帶一個選擇鍵(routing key) orange發佈至轉發器將會被導向到隊列Q1。消息附帶一個選擇鍵(routing key)black或者green將會被導向到Q2.全部的其餘的消息將會被丟棄。
dom
使用一個綁定鍵(binding key)綁定多個隊列是徹底合法的。如上圖,一個附帶選擇鍵(routing key)的消息將會被轉發到Q1和Q2。spa
咱們準備將這種模式用於咱們的日誌系統。咱們將消息發送到direct類型的轉發器而不是fanout類型。咱們將把日誌的嚴重性做爲選擇鍵(routing key)。這樣的話,接收程序能夠根據嚴重性來選擇接收。咱們首先關注發送日誌的代碼:.net
像之前同樣,咱們須要先建立一個轉發器:日誌
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, true, null);orm
而後咱們準備發送一條消息:server
channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());rabbitmq
爲了簡化代碼,咱們假定‘severity’是‘info’,‘warning’,‘error’中的一個。
接收消息的代碼和前面的博客的中相似,只有一點不一樣:咱們給咱們所感興趣的嚴重性類型的日誌建立一個綁定。
StringqueueName = channel.queueDeclare().getQueue();
for(Stringseverity : argv)
{
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
package event;
import java.util.Random;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect
{
private static final String EXCHANGE_NAME = "ex_logs_direct";
private static final String[] SEVERITIES = { "info", "warning", "error" };
public static void main(String[] argv) throws java.io.IOException
{
// 建立鏈接和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明轉發器的類型
//向全部綁定了相應routing key的隊列發送消息
//若是producer在發佈消息時沒有consumer在監聽,消息將被丟棄
//若是有多個consumer監聽了相同的routing key 則他們都會受到消息
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//發送6條消息
for (int i = 0; i < 6; i++)
{
String severity = getSeverity();
String message = severity + "_log :" + UUID.randomUUID().toString();
// 發佈消息至轉發器,指定routingkey
//向server發佈一條消息
//參數1:exchange名字,若爲空則使用默認的exchange
//參數2:routing key
//參數3:其餘的屬性
//參數4:消息體
//RabbitMQ默認有一個exchange,叫default exchange,它用一個空字符串表示,它是direct exchange類型,
//任何發往這個exchange的消息都會被路由到routing key的名字對應的隊列上,若是沒有對應的隊列,則消息會被丟棄
channel.basicPublish(EXCHANGE_NAME, severity, null, message
.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
/**
* 隨機產生一種日誌類型
*
* @return
*/
private static String getSeverity()
{
Random random = new Random();
int ranVal = random.nextInt(3);
return SEVERITIES[ranVal];
}
}
接收端:
package event;
import java.util.Random;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsDirect
{
private static final String EXCHANGE_NAME = "ex_logs_direct";
private static final String[] SEVERITIES = { "info", "warning", "error" };
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
// 建立鏈接和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明direct類型轉發器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
String severity = getSeverity();
System.out.println(severity);
// 指定binding_key
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
/**
* 隨機產生一種日誌類型
*
* @return */ private static String getSeverity() { Random random = new Random(); int ranVal = random.nextInt(3); return SEVERITIES[ranVal]; }}