RabbitMQ是一個消息代理器:它接受和轉發消息。你能夠把它看成一個郵局:當你把郵件放在信箱裏時,你能夠確定郵差先生最終會把郵件送到你的收件人那裏。在這個比喻中,RabbitMQ就是這裏的郵箱,郵局和郵差。php
RabbitMQ和郵局之間的主要區別是,它不處理紙張,而是接受、存儲和轉發二進制數據‒消息。html
RabbitMQ,和通常的消息傳遞,使用專業術語。json
生產者的工做就是發送消息。發送消息的程序是生產者:segmentfault
隊列類比一個郵箱,存在於RabbitMQ, 然而信息流經過RabbitMQ和您的應用程序,他們只能存儲在一個隊列。隊列只受主機內存和磁盤限制的約束,它本質上是一個很大的消息緩衝區。會有許多生產者能夠發送到一個隊列的消息,許多消費者能夠嘗試從一個隊列接收數據。這就是咱們如何表示隊列的方式:數組
消費者和生產者有着類似的意義. 消費者無非就是等待消息而後處理的程序:服務器
請注意,生產者、消費者和代理沒必要同一主機上;事實上,在大多數應用程序中它們沒有這樣作。composer
(使用PHP amqplib客戶端)異步
在本教程的這一部分中,咱們將用PHP編寫兩個程序;一個生產者發送一條消息,一個用戶接收消息並將它們打印出來。咱們會PHP amqplib API的忽略一些細節,集中在這個很是簡單的事情剛剛開始。這是一個「Hello World」的消息傳遞。socket
在下圖中,「p」是咱們的生產商,「C」是咱們的消費者。在中間的框是一個隊列的消息緩衝區,RabbitMQ保持表明的消費。ide
RabbitMQ有不少協議。本教程介紹AMQP 0-9-1,這是一個開放的、通用的協議消息。有許多不一樣的語言RabbitMQ一批客戶。咱們將在本教程中使用PHP amqplib,composer解決依賴管理。
添加composer.json:
{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
composer install # 或者 直接運行包引入 composer require php-amqplib/php-amqplib
生產者(消息發送方)
咱們命令咱們的消息發佈者(發送者)send.php和消息接收receive.php。發送者將鏈接到RabbitMQ,發送一條消息,而後退出。
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
如今咱們能建立一個鏈接服務器的Connection:
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();
該鏈接抽象套接字(socket)鏈接,併爲咱們負責協議版本協商和認證等。這裏,咱們鏈接到一個rabbitmq代理器在本地機器上-使用localhost。若是咱們想在不一樣的機器上鍊接到一個代理,咱們只需在這裏指定它的名稱或IP地址。
接下來,咱們建立一個通道,這是處理事情的大部分API的地方。
發送消息前,咱們必須聲明一個隊列爲咱們發送作準備;而後咱們能夠向隊列發佈消息:
$channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n";
聲明隊列是冪等的(原句:Declaring a queue is idempotent,這裏的idempotent不知道是什麼意思) - 只有在它不存在時纔會建立隊列。消息內容是一個字節數組,所以您能夠在那裏編碼用你喜歡的方式。
最後,咱們關閉通道和鏈接;
$channel->close(); $connection->close();
上面咱們完成了send.php.
接下來咱們完成消費方的代碼
消費者從RabbitMQ接收推來的消息,咱們會保持運行監聽消息並打印出來。
引入lib
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection;
設置與發佈程序相同;咱們打開一個鏈接和一個通道,並聲明將要消耗的隊列。注意,這與發送發佈的隊列匹配。
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
注意,咱們也在這裏聲明隊列。由於咱們可能在發佈以前啓動消費者,咱們但願在咱們嘗試從它那裏消費消息以前肯定隊列的存在。
咱們將告訴服務器從隊列中發送消息。咱們將定義一個PHP可調用,它將接收服務器發送的消息。請記住,消息是從服務器異步發送到客戶機的。
$callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
當調用basic_consume,咱們的代碼會阻塞。當咱們收到消息時,咱們的回調函數將經過接收到返回的消息傳遞。
以上是咱們receive.php的代碼
php receive.php
php send.php
rabbitmqctl list_queues
<?php return [ 'vendor' => [ 'path' => dirname(dirname(__DIR__)) . '/vendor' ], 'rabbitmq' => [ 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'qkl', 'password' => '123456', 'vhost' => '/' ] ]; ?>
<?php $config = require "../config.php"; require_once $config['vendor']['path'] . '/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'], $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
<?php $config = require "../config.php"; require_once $config['vendor']['path'] . '/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'], $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']); $channel = $connection->channel(); //發送方其實不須要設置隊列, 不過對於持久化有關,建議執行該行 $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close(); ?>
瞭解如何構建一個簡單的工做隊列, 你能夠閱讀下一章節: RabbitMQ+PHP 教程二(Work Queues)