本教程假設 RabbitMQ 是運行在標準端口上運行(5672).
若是您使用不一樣的主機、端口或憑據,則鏈接設置須要調整。php
若是您在本教程中遇到困難,能夠經過郵件列表與咱們聯繫。html
在前面的教程中,咱們構建了一個簡單的日誌系統。咱們可以向許多接收者廣播日誌消息。git
在本教程中,咱們將爲它添加一個特性——咱們將只可能訂閱消息的一個子集。例如,咱們只可以將關鍵錯誤消息直接指向日誌文件(以節省磁盤空間),同時仍然可以打印控制檯上的全部日誌消息。github
在前面的示例中,咱們已經建立綁定。您可能還記得代碼:算法
$channel->queue_bind($queue_name, 'logs');
綁定是交換和隊列之間的一種關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。segmentfault
綁定能夠採起額外的routing_key參數。避免混淆和$channel::basic_publish參數咱們要叫它綁定key。這就是咱們如何用鍵建立綁定的緣由:學習
$binding_key = 'black'; $channel->queue_bind($queue_name, $exchange_name, $binding_key);
綁定鍵的含義取決於交換類型。咱們之前使用的fanout交換將忽略了它的值。ui
咱們之前的教程中的日誌系統將全部消息廣播給全部消費者。咱們但願擴展這一點,容許基於其必定嚴重性程度來過濾消息。例如,咱們可能但願將日誌消息寫入到磁盤的腳本只接收關鍵錯誤,而不會在警告或信息日誌消息上浪費磁盤空間。spa
咱們使用的是fanout交換機,這並不能給咱們帶來很大的靈活性——它只能進行無心識的廣播。翻譯
咱們將使用direct
交換機替代。direct
交換機背後的路由算法很簡單-消息傳遞到隊列,其綁定鍵徹底匹配消息的路由鍵。
爲了說明這一點,請考慮如下設置:
在這個設置中,咱們能夠看到兩個隊列綁定到它的direct
交換機X。第一個隊列與綁定鍵orange
綁定,第二個綁定有兩個綁定,一個綁定鍵black
,另外一個綁定green
。
在這樣的設置中,將路由消息發送到Exchange的路由密鑰orange
將被路由到隊列Q1
。帶有black
或green
路由鍵的消息將轉到Q2。全部其餘消息都將被丟棄。
用相同的綁定鍵綁定多個隊列是徹底合法的。在咱們的示例中,咱們能夠在綁定綁定鍵X
和Q1
之間添加一個綁定。在這種狀況下,direct
交換機將表現爲fanout交換機,並將消息發送到全部匹配隊列。將帶有路由鍵black
的消息發送給Q1
和Q2
。
咱們將使用這個模型做爲咱們的日誌系統。咱們將把消息發送給direct
交換機,而不是fanout
交換機。咱們將提供日誌嚴重性做爲路由鍵。這樣,接收腳本將可以選擇它想要接收的嚴重性。讓咱們先專一於發佈日誌。
和以往同樣,咱們須要首先建立一個交換:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
咱們已經準備好發送消息了:
$channel->exchange_declare('direct_logs', 'direct', false, false, false); $channel->basic_publish($msg, 'direct_logs', $severity);
爲了簡化事情,咱們會假設嚴重錯誤有能夠是info
, warning
, error
的一種。
接收消息將與前面的教程同樣,只有一個例外——咱們將爲咱們感興趣的每一個嚴重性建立一個新的綁定。
foreach($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); }
代碼都放在一塊兒:
emit_log_direct.php源碼:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info'; $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo " [x] Sent ",$severity,':',$data," \n"; $channel->close(); $connection->close(); ?>
receive_logs_direct.php源碼:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $severities = array_slice($argv, 1); if(empty($severities )) { file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); exit(1); } foreach($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
若是您只想保存warning
和 error
(不包含info
)日誌消息到文件,只需打開控制檯並鍵入:
php receive_logs_direct.php warning error > logs_from_rabbit.log
若是您想查看屏幕上的全部日誌消息,請打開一個新的終端並執行:
php receive_logs_direct.php info warning error # => [*] Waiting for logs. To exit press CTRL+C
例如,觸發錯誤日誌消息:
php emit_log_direct.php error "Run. Run. Or it will explode." # => [x] Sent 'error':'Run. Run. Or it will explode.'
(所有源碼 emit_log_direct.php source and receive_logs_direct.php source)
學習如何基於模式偵聽消息, 你能夠閱讀下一章節:RabbitMQ+PHP 教程五(Topics)。