RabbitMQ+PHP 教程四(Routing)

using php-amqplib

前提必讀

本教程假設 RabbitMQ 是運行在標準端口上運行(5672).
若是您使用不一樣的主機、端口或憑據,則鏈接設置須要調整。php

若是您在本教程中遇到困難,能夠經過郵件列表與咱們聯繫。html

在前面的教程中,咱們構建了一個簡單的日誌系統。咱們可以向許多接收者廣播日誌消息。git

開始

在本教程中,咱們將爲它添加一個特性——咱們將只可能訂閱消息的一個子集。例如,咱們只可以將關鍵錯誤消息直接指向日誌文件(以節省磁盤空間),同時仍然可以打印控制檯上的全部日誌消息。github

綁定(Bindings)

在前面的示例中,咱們已經建立綁定。您可能還記得代碼:算法

$channel->queue_bind($queue_name, 'logs');

綁定是交換和隊列之間的一種關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。segmentfault

綁定能夠採起額外的routing_key參數。避免混淆和$channel::basic_publish參數咱們要叫它綁定key。這就是咱們如何用鍵建立綁定的緣由:學習

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);

綁定鍵的含義取決於交換類型。咱們之前使用的fanout交換將忽略了它的值。ui

Direct exchange

咱們之前的教程中的日誌系統將全部消息廣播給全部消費者。咱們但願擴展這一點,容許基於其必定嚴重性程度來過濾消息。例如,咱們可能但願將日誌消息寫入到磁盤的腳本只接收關鍵錯誤,而不會在警告或信息日誌消息上浪費磁盤空間。spa

咱們使用的是fanout交換機,這並不能給咱們帶來很大的靈活性——它只能進行無心識的廣播。翻譯

咱們將使用direct交換機替代。direct交換機背後的路由算法很簡單-消息傳遞到隊列,其綁定鍵徹底匹配消息的路由鍵。

爲了說明這一點,請考慮如下設置:

clipboard.png

在這個設置中,咱們能夠看到兩個隊列綁定到它的direct交換機X。第一個隊列與綁定鍵orange綁定,第二個綁定有兩個綁定,一個綁定鍵black,另外一個綁定green

在這樣的設置中,將路由消息發送到Exchange的路由密鑰orange將被路由到隊列Q1。帶有blackgreen路由鍵的消息將轉到Q2。全部其餘消息都將被丟棄。

多個綁定 (Multiple bindings)

clipboard.png

用相同的綁定鍵綁定多個隊列是徹底合法的。在咱們的示例中,咱們能夠在綁定綁定鍵XQ1之間添加一個綁定。在這種狀況下,direct交換機將表現爲fanout交換機,並將消息發送到全部匹配隊列。將帶有路由鍵black的消息發送給Q1Q2

Emitting logs

咱們將使用這個模型做爲咱們的日誌系統。咱們將把消息發送給direct交換機,而不是fanout交換機。咱們將提供日誌嚴重性做爲路由鍵。這樣,接收腳本將可以選擇它想要接收的嚴重性。讓咱們先專一於發佈日誌。

和以往同樣,咱們須要首先建立一個交換:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

咱們已經準備好發送消息了:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);

爲了簡化事情,咱們會假設嚴重錯誤有能夠是info, warning, error的一種。

訂閱 (Subscribing)

接收消息將與前面的教程同樣,只有一個例外——咱們將爲咱們感興趣的每一個嚴重性建立一個新的綁定。

foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

代碼都放在一塊兒:

clipboard.png

emit_log_direct.php源碼:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent ",$severity,':',$data," \n";

$channel->close();
$connection->close();

?>

receive_logs_direct.php源碼:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if(empty($severities )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

若是您只想保存warningerror(不包含info)日誌消息到文件,只需打開控制檯並鍵入:

php receive_logs_direct.php warning error > logs_from_rabbit.log

若是您想查看屏幕上的全部日誌消息,請打開一個新的終端並執行:

php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,觸發錯誤日誌消息:

php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(所有源碼 emit_log_direct.php source and receive_logs_direct.php source)

學習如何基於模式偵聽消息, 你能夠閱讀下一章節:RabbitMQ+PHP 教程五(Topics)

翻譯來自 RabbitMQ - RabbitMQ tutorial - Routing

相關文章
相關標籤/搜索