在以前的文章中咱們創建了一個簡單的日誌系統。咱們能夠經過這個系統將日誌message廣播給不少接收者。java
在本篇文章中,咱們在這之上,添加一個新的功能,即容許接收者訂閱message的一個子集。舉個例子,咱們將日誌分紅多個級別,一個接收者接收錯誤日誌將之保存到磁盤,另外一個接收者接收全部日誌將之打印到控制檯。算法
在前面的章節中,咱們已經接觸過binding了,像下面的代碼這樣:bash
channel.queueBind(queueName,EXCHANGE_NAME,"");
binding將exchange和queue關聯在了一塊兒。更形象的表示,如:queue對exchange中的message感興趣。spa
bindings能夠攜帶一個routingKey參數。爲了不和basic_publish的參數弄混,咱們稱之它爲binding_key.咱們像下面這樣建立一個binding日誌
channel.queueBind(queueName,EXCHANGE_NAME,"black");
binding key的做用要看exchange的類型,對於fanout類型的exchange,binding key是直接忽略的。code
在以前的日誌系統中,message會推送到全部的消費者去。咱們想讓系統依據message的日誌級別進行過濾。好比一個消費者只接收嚴重級別的日誌。rabbitmq
fanout沒法幫咱們實現這樣的功能,它只是無腦的進行廣播。圖片
咱們使用direct類型的exchange,它的路由算法是很是簡單的 - 只要message的routing_key和bind的binding_key相同即進行轉發。ip
爲了進行說明,像下圖這麼來設置
如圖,能夠看到有兩個queue綁到了類型爲direct的exchange上。第一個queue綁定用了orange這個binding key,第二個則用了black和green兩個binding key。路由
那麼結果就是有routing key爲orange的message路由到了Q1.而routing key爲black和green的message則路由到了Q2,其餘的消息則被丟棄了。
若使用相同的binding key將多個queue綁定到exchange上,就和fanout的行爲同樣了,message會廣播到binding key相同的queue去。如圖的設置中,一個routing key爲black的message就會同時發送到Q1和Q2。
咱們將在咱們的日誌系統上應用這個模型,使用direct類型的exchange去替代fanout類型的exchange。提供日誌的嚴重性做爲routing key。接收程序能夠選擇要接收日誌的嚴重性級別。
首先咱們建立exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
而後就是發送message
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
咱們先假設severity取值 info | warning | error
接收message和上一章沒什麼區別,只是須要給各個severity建立新的binding。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
EmitLogDirect.java代碼以下
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } //.. }
ReceiveLogsDirect.java代碼以下:
import com.rabbitmq.client.*; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
編譯代碼
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
若是想把warning和error的日誌保存到文件去,那麼
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
若是想把全部的日誌打印到控制檯,那麼
java -cp $CP ReceiveLogsDirect info warning error
發送error日誌
java -cp $CP EmitLogDirect error "Run.Run. Or it will explode"