工做隊列的應用場景一般爲一個生產者、一個隊列、多個消費者公平消費隊列。php
工做隊列一般配合如下設置共同使用:json
# composer.json { "require": { "php-amqplib/php-amqplib": ">=3.0" } }
> composer.phar install
生產者鏈接到RabbitMQ,發送一條消息,而後退出。composer
# send.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 建立隊列,已存在的不會重複建立,第三個參數爲開啓隊列持久化 $channel->queue_declare('task_queue', false, true, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "Hello World!"; } // 第二個參數 delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT 爲設置消息持久化 $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); // 經過默認的交換機發送消息到隊列 (消息內容, 默認交換機, 路由鍵); $channel->basic_publish($msg, '', 'task_queue'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close();
消費者監聽來自RabbitMQ的消息,一般須要一直保持運行狀態以監聽消息。函數
# receive.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 建立隊列,已存在的不會重複建立,第三個參數爲開啓隊列持久化 $channel->queue_declare('task_queue', false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; // 定義回調函數 $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done\n"; // 手動消息確認 $msg->ack(); }; // 設置 prefetch_count = 1,開啓公平分發(默認爲循環分發) // 在處理並確認上一條消息以前,不要將新消息發送給消費者,而發送給其餘消費者 $channel->basic_qos(null, 1, null); // 第四個參數設爲false關閉自動消息確認,爲true打開自動消息確認即投遞消息後馬上標記爲刪除 $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while ($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close();
打開一個終端,運行消費者:fetch
php receive.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
打開另外一個終端,運行消費者:ui
php receive.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
打開另外一個終端,運行生產者:spa
php send.php First message. php send.php Second message.. php send.php Third message... php send.php Fourth message.... php send.php Fifth message.....
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged