簡單隊列一般爲一個生產者、一個消費者、一個隊列的結構。php
# composer.json { "require": { "php-amqplib/php-amqplib": ">=3.0" } }
> composer.phar install
生產者鏈接到RabbitMQ,發送一條消息,而後退出。json
# send.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 建立隊列,已存在的隊列不會重複建立 $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); // 經過默認的交換機發送消息到隊列 (消息內容, 默認交換機, 路由鍵) $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
消費者監聽來自 RabbitMQ 的消息,一般須要一直保持運行狀態以監聽消息。composer
# receive.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 建立隊列,已存在的不會重複建立 $channel->queue_declare('hello', false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; // 定義消息處理回調函數 $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while ($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close();
打開一個終端,運行消費者:函數
php receive.php
打開另外一個終端,運行生產者:工具
php send.php
# Linux sudo rabbitmqctl list_queues
# Windows rabbitmqctl.bat list_queues