在前面的教程中,咱們實現了一個簡單的日誌系統。能夠把日誌消息廣播給多個接收者。php
本篇教程中咱們打算新增一個功能——使得它可以只訂閱消息的一個字集。例如,咱們只須要把嚴重的錯誤日誌信息寫入日誌文件(存儲到磁盤),但同時仍然把全部的日誌信息輸出到控制檯中web
前面的例子,咱們已經建立過綁定(bindings),代碼以下:算法
$exchange->publish($message, '');
綁定(binding)是指交換器(exchange)和隊列(queue)的關係。能夠簡單理解爲:這個隊列(queue)對這個交換器(exchange)的消息感興趣。spa
綁定的時候能夠帶上一個額外的routingkey參數。爲了不與basicpublish的參數混淆,咱們把它叫作binding key。如下是如何建立一個帶binding key的綁定。日誌
$exchange->publish($message, $routeKey);
binding key的含義取決於交換器(exchange)的類型。咱們以前使用過的fanout類型會忽略這個值。code
咱們的日誌系統廣播全部的消息給全部的消費者(consumers)。咱們打算擴展它,使其能夠可以精確的過濾消息。例如咱們也許值是但願當接收到一個嚴重的錯誤的時候才把消息寫入磁盤,以避免浪費磁盤空間。教程
咱們使用的fanout類型的交換器(exchange)擴展性不夠——它能作的僅僅是廣播。隊列
咱們將會使用direct類型的交換器(exchange)來代替。路由的算法很簡單——交換器將會對binding key和routing key進行精確匹配,從而肯定消息該分發到哪一個隊列。ip
下圖可以很好的描述這個場景:路由
在這個場景中,咱們能夠看到direct exchange X和兩個隊列綁定了。第一個隊列使用orange做爲binding key,第二個隊列有兩個綁定,一個使用black做爲binding key,另一個是green。
這樣以來,當routing key爲orange的消息發佈到交換器(exchange),就會被路由到隊列Q1。routing key爲black或者green的消息就會路由到Q2。其餘的全部消息都將會被丟棄。
多個隊列使用相同的binding key是合法的。咱們的這個例子,咱們能夠添加一個X和Q1之間的綁定,使用blackbinding key。這樣一來,direct交換器就和fanout交換器的行爲同樣,將會廣播消息到全部匹配的隊列。帶有routing key爲black的消息都會發送到Q1和Q2。
咱們將會發送消息到一個direct exchange,把日誌級別做爲routing key。這樣子負責處理接收的腳本就能夠選擇它要處理的日誌級別。咱們先看看觸發日誌。
咱們須要建立一個交換器(exchange):
$exchange->setName('direct_logs');
而後咱們發送一則消息:
$exchange->publish($message, $severity);
咱們先假設「severity」的值是info、warning、error中的一個。
處理接收消息的方式和以前差很少,可是咱們爲每個日誌級別建立了一個新的綁定。
foreach ($severities as $item) { $queue->bind($exchangeName, $item); }
emitlogdirect.py的代碼:
<?php /** * PHP amqp(RabbitMQ) Demo-4 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $severity = count($argv) > 2 ? $argv[1] : 'info'; $message = empty($argv[2]) ? 'Hello World!' : ' ' . $argv[2]; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName('direct_logs'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $exchange->publish($message, $severity); var_dump("[x] Sent $message"); $connection->disconnect();
receivelogsdirect.py的代碼:
<?php /** * PHP amqp(RabbitMQ) Demo-4 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'direct_logs'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); $queue->declare(); $severities = $argv; $file = $severities[0]; unset($severities[0]); if (!$severities) { var_dump("Usage:$file [info] [warning] [error]"); exit(); } else { foreach ($severities as $item) { $queue->bind($exchangeName, $item); } } var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump('[x]' . $envelope->getRoutingKey() . ':' . $msg); $queue->nack($envelope->getDeliveryTag()); }
若是你但願只是保存warning和error級別的日誌到磁盤,只須要打開控制檯並輸入:
$ php receive_logs_direct.php warning error > logs_from_rabbit.log
若是你但願全部的日誌信息都輸出到屏幕中,打開一個新的終端,而後輸入:
$ php receive_logs_direct.php info warning error [*] Waiting for logs. To exit press CTRL+C
若是要觸發一個error級別的日誌,只須要輸入:
$ php emit_log_direct.php error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.'