RabbitMQ+PHP 教程五(Topics)

(using php-amqplib)php

前提必讀

本教程假設RabbitMQ是安裝在標準端口上運行(5672)。若是您使用不一樣的主機、端口或憑據,則鏈接設置須要調整。html

在哪裏獲得幫助git

若是您在本教程中遇到困難,能夠經過郵件列表與咱們聯繫。github

開始

在前面的教程中,咱們改進了日誌系統。咱們使用的是一種直接廣播方式,而不是隻使用一種直接(direct)廣播方式的fanout交換機,從而得到了有選擇地接收日誌的可能性。segmentfault

雖然使用直接direct交換機改進了咱們的系統,但它仍然有侷限性——它不能根據多個標準進行路由。工具

在咱們的日誌系統中,咱們可能但願訂閱基於嚴重性的日誌,但也要基於發出日誌的源。你可能從syslog UNIX工具知道這個概念,路由日誌基於嚴重性(info/warn/crit…)和設備(auth/cron/kern…)。ui

這會給咱們很大的靈活性,咱們可能要聽關鍵的錯誤來自kern, 全部日誌來自kern」。spa

爲了在日誌系統中實現這一點,咱們須要瞭解一個更復雜的主題topic交換機。翻譯

Topic exchange

發送到一個話題交換機(topic exchange)信息,不能是任意routing_key -它必須是一個單詞的列表,用逗號分隔。這些詞能夠是任何東西,但一般它們指定鏈接到消息的某些特性。一些有效的路由鍵的例子:stock.usd.nysenyse.vmw"quick.orange.rabbit"。在你喜歡的路由鍵中,最多能夠有255個字節的單詞。日誌

綁定鍵也必須是相同的形式。主題交換背後的邏輯相似於一個直接的消息,用特定的路由鍵發送的消息將被髮送到綁定到綁定鍵的全部隊列中。可是有兩個重要的綁定鍵的特殊狀況:

*(星號)能夠代替一個詞。
#(哈希)能夠代替零個或更多的單詞。

在一個例子中解釋這一點是最容易的:

clipboard.png

在這個示例中,咱們將發送全部描述動物的消息。消息將用一個包含三個單詞(兩個點)的路由鍵發送。路由鍵中的第一個字將描述速度,第二個顏色和第三個種:<speed>.<colour>.<species>

咱們建立三的綁定:Q1綁定綁定鍵*.orange.* 和 Q2 with *.*.rabbitlazy.#

這些綁定能夠歸納爲:

Q1對全部的橙色(orange)動物很感興趣。
Q2想聽關於兔子(rabbits)的一切,關於懶惰(lazy)動物的一切。

帶有quick.orange.rabbit的路由鍵的消息將傳送到兩個隊列中。信息lazy.orange.elephant也將去他們倆。另外一方面,quick.orange.fox只會進入第一排,而lazy.brown.fox只到第二個。lazy.pink.rabbit將被送到第二個隊列只有一次,即便它匹配兩個綁定。quick.brown.fox不匹配任何綁定,因此它將被丟棄。

若是咱們違背合同,用一個或四個詞,如orangequick.orange.male.rabbit?那麼,這些消息將不匹配任何綁定並將丟失。

另外一方面,lazy.orange.male.rabbit,即便它有四個詞,將匹配最後的綁定,並將交付給第二個隊列。

Topic exchange

主題交換(Topic exchange)功能強大,能夠像其餘交換機同樣。

當隊列綁定#(hash)綁定鍵-它將收到的全部郵件,無論路由關鍵同樣的fanout交換機。

當特殊字符*(star)和#(hash)中不使用綁定,主題交換機會表現的像一個direct交換機。

彙總(Putting it all together)

咱們將在日誌系統中使用主題交換機(topic exchange)。咱們將從一個工做假設開始,假設日誌的路由鍵有兩個詞:<facility>.<severity>

代碼與前面的教程幾乎相同。

emit_log_topic.php代碼:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ",$routing_key,':',$data," \n";

$channel->close();
$connection->close();

?>

receive_logs_topic.php代碼:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

接收全部日誌:

php receive_logs_topic.php "#"

接受全部的日誌來自kern

php receive_logs_topic.php "kern.*"

或者,若是您只想聽關於critical的日誌:

php receive_logs_topic.php "*.critical"

你能夠建立多個綁定:

php receive_logs_topic.php "kern.*" "*.critical"

觸發一個日誌來自路由鍵kern.critical類型

php emit_log_topic.php "kern.critical" "A critical kernel error"

這些程序讓咱們以爲很好玩。請注意,代碼對路由或綁定鍵不做任何假設,您可能但願使用兩個以上的路由鍵參數。

(所有源碼:emit_log_topic.phpreceive_logs_topic

接下來,瞭解如何經過一個遠程過程調用來執行往返消息,你能夠閱讀下一章節:RabbitMQ+PHP 教程六(RPC)

翻譯來自 RabbitMQ - RabbitMQ tutorial - Topics

相關文章
相關標籤/搜索