發佈訂閱的應用場景一般爲一個生產者、一個交換機,多個消費者各自建立隊列綁定到交換機上訂閱消息並消費。php
發佈訂閱一般配合如下設置共同使用:json
# composer.json { "require": { "php-amqplib/php-amqplib": ">=3.0" } } > composer.phar install
在 RabbitMQ 消息傳遞模型中,生產者是不會向隊列直接發送消息的,只能將消息發送給交換機。composer
交換機接收來自生產者的消息,而後將它們推送到隊列中。函數
發佈訂閱使用的是fanout
交換機,這個交換機很是簡單,將它收到的全部消息廣播到它綁定的全部隊列。ui
生產者鏈接到RabbitMQ,發送一條消息,而後退出。spa
# send.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 定義一個名爲 logs 的 fanout 廣播交換機 $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "info: Hello World!"; } // $msg = new AMQPMessage($data); // 將消息發送到名爲 logs 的 fanout 廣播交換機 (消息內容, 交換機, 路由鍵); $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close();
消費者監聽來自RabbitMQ的消息,一般須要一直保持運行狀態以監聽消息。日誌
# receive.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 定義一個名爲 logs 的 fanout 廣播交換機 $channel->exchange_declare('logs', 'fanout', false, false, false); // 建立一個隨機命名的新隊列,第三個參數爲關閉隊列持久化,第四個參數爲當聲明它的鏈接關閉時隊列會被自動刪除 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); // 將隨機命名的隊列綁定到 fanout 廣播交換機,生產者向交換機發送消息將被廣播到綁定的隊列中 $channel->queue_bind($queue_name, 'logs'); echo " [*] Waiting for messages. To exit press CTRL+C\n"; // 定義回調函數 $callback = function ($msg) { echo ' [x] ', $msg->body, "\n"; }; // 第四個參數設爲true開啓自動消息確認,即投遞消息後馬上標記爲刪除 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while ($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close();
打開一個終端,運行消費者,將日誌放到文件中:code
php receive.php > logs_from_rabbit.log
打開另外一個終端,運行消費者,將日誌輸出到終端:rabbitmq
php receive.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
打開另外一個終端,運行生產者:隊列
php send.php
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.