RabbitMQ+PHP 教程三(Publish/Subscribe)

使用 php-amqplib

介紹

在前面的教程中,咱們建立了一個工做隊列。工做隊列背後的假設是每一個任務都交付給一個工做人員處理。在這一部分中,咱們將作一些徹底不一樣的事情——咱們將向多個消費者發送消息。此模式稱爲「發佈/訂閱」。php

爲了說明這個模式,咱們將構建一個簡單的日誌系統。它將由兩個程序組成,第一個程序將發出日誌消息,第二個程序將接收並打印它們。html

在咱們的日誌系統中,接收程序的每一個運行副本都會收到消息。這樣咱們就能夠運行一個接收器,並將日誌引導到磁盤;同時,咱們還能夠運行另外一個接收器,並在屏幕上看到日誌。git

本質上,已發佈的日誌消息將被廣播到全部接收器。github

交換機(Exchanges)

在本教程的前幾部分中,咱們從隊列中發送和接收消息。如今是在Rabbit中引入完整消息傳遞模型的時候了。segmentfault

讓咱們快速瀏覽一下前面教程中介紹的內容:安全

  1. 生產者是發送消息的用戶應用程序。
  2. 隊列是存儲消息的緩衝區。
  3. 消費者是接收消息的用戶應用程序。

RabbitMQ消息傳遞模型的核心思想是,生產者不發送任何信息直接到隊列。事實上,生產者甚至不知道消息是否會發送到任何隊列。服務器

相反,生產商只能向交換機(Exchange)發送消息。交換機作的事情很簡單。一方面,它接收來自生產者的信息,另外一邊則推他們排隊。Exchange必須知道如何處理接收到的消息。應該附加到特定隊列嗎?它應該被添加到多個隊列?仍是應該被拋棄?。這個規則是由交換類型定義的。ui

clipboard.png

有幾種交換類型可用:direct, topic, headers 和 fanout。咱們將集中討論最後一個——fanout。讓咱們建立這種類型的交換,並稱之爲日誌:spa

$channel->exchange_declare('logs', 'fanout', false, false, false);

fanout交換很是簡單。正如你可能從這個名字猜到的,它只廣播它收到的全部消息給它所知道的全部隊列。這正是咱們須要的記錄器。翻譯

Listing exchanges

列出服務器上的交換機,你能夠運行rabbitmqctl:

sudo rabbitmqctl list_exchanges

在這個列表中會有一些amq. *交流和默認(未命名)交換。默認狀況下建立這些>,但目前不太可能使用它們。

默認的交換機

在本教程的前幾部分中,咱們對交換機一無所知,但仍然可以將消息發送到隊列中。這是可能的,由於咱們使用的是默認的交換,咱們經過空字符串(「」)來標識它們。

回想一下咱們以前如何發佈消息:

$channel->basic_publish($msg, '', 'hello');

咱們在這裏使用默認的或無名的交換:消息路由到指定的routing_key名稱的隊列,若是它存在的話。路由鍵是第三個參數:basic_publish

如今,咱們能夠將其發佈到咱們命名的Exchange中:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

臨時隊列(Temporary queues)

也許你還記得之前咱們使用的隊列所指定的名稱(記得hellotask_queue?). 可以說出一個隊列對咱們來講相當重要 -- 咱們須要把工人指向同一個隊列。當你想在生產者和消費者之間共享一個隊列時,給隊列一個名字是很重要的。

但咱們的記錄器不是這樣的。咱們想了解全部日誌消息,而不單單是其中的一個子集。咱們也只對當前流動的消息感興趣,而不是舊消息。爲了解決這個問題,咱們須要兩件事。

首先,每當咱們與Rabbit鏈接時,咱們須要一個新的空隊列。爲此,咱們能夠建立一個帶有隨機名稱的隊列,或者更好 - 讓服務器爲咱們選擇一個隨機隊列名。

第二,一旦斷開消費者,隊列應該自動刪除。

在php客戶端中,當咱們將隊列名稱做爲空字符串提供時,咱們建立一個帶有生成名稱的非持久隊列:

list($queue_name, ,) = $channel->queue_declare("");

方法返回時,queue_name變量包含一個隨機生成的RabbitMQ隊列名稱。例如,它可能看起來像amq.gen-jzty20brgko-hjmujj0wlg

當聲明它關閉的鏈接時,隊列將被刪除,由於它被聲明爲獨佔。

綁定(Bindings)

clipboard.png

咱們已經建立了fanout交換機和隊列。如今咱們須要告訴Exchange發送消息到咱們的隊列中。交換和隊列之間的關係稱爲綁定。

$channel->queue_bind($queue_name, 'logs');

從如今開始,日誌交換將向隊列添加消息。

列出綁定列表(Listing bindings)

您能夠使用現有的綁定列表,使用下面命令:

rabbitmqctl list_bindings

讓咱們把全部整理在一塊兒(Putting it all together)

clipboard.png

生成日誌消息的生成程序與前面的教程沒有多大區別。最重要的變化是,咱們如今但願把消息發佈到咱們的日誌交換,而不是無名的。這裏給出emit_log.php代碼:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

?>

emit_log.php源碼

如您所見,在創建鏈接以後,咱們聲明交換。這一步是必要的,由於發佈到一個不存在的交換機是禁止的。

若是沒有隊列綁定到Exchange,消息將丟失,但這對咱們來講是好的;若是沒有用戶正在監聽,咱們能夠安全地丟棄消息。

receive_logs.php代碼:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

receive_logs.php

若是要將日誌保存到文件中,只需打開控制檯並鍵入:

php receive_logs.php > logs_from_rabbit.log

若是您但願看到屏幕上的日誌,生成一個新的終端並運行:

php receive_logs.php

固然,而後觸發日誌類型:

php emit_log.php

使用rabbitmqctl list_bindings能夠驗證代碼其實是建立綁定和隊列是咱們想要的。兩receive_logs.php程序運行你應該看到:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

對結果的解釋很簡單:來自Exchange日誌的數據使用服務器分配的名稱到兩個隊列中。這正是咱們想要的。

要了解如何偵聽一個消息的子集,讓咱們轉到RabbitMQ+PHP 教程四(Routing)

翻譯來自 RabbitMQ - RabbitMQ tutorial - Publish/Subscribe

相關文章
相關標籤/搜索