RabbitMQ是一個消息代理。它的核心原理很是簡單:接收和發送消息。你能夠把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ是一個郵箱、郵局、郵遞員。RabbitMQ和郵局的主要區別是,它處理的不是紙,而是接收、存儲和發送二進制的數據——消息。通常提到RabbitMQ和消息,都用到一些專有名詞。php
咱們的「Hello world」不會很複雜——僅僅發送一個消息,而後獲取它並輸出到屏幕。這樣以來咱們須要兩個程序,一個用做發送消息,另外一個接受消息並打印消息內容html
咱們大致的設計是這樣的:python
生產者(Producer)把消息發送到一個名爲「hello」的隊列中。消費者(consumer)從這個隊列中獲取消息。web
RabbitMQ庫
RabbitMQ使用的是AMQP協議。要使用她你就必須須要一個使用一樣協議的庫。幾乎全部的編程語言都有可選擇的庫。python也是同樣,能夠從如下幾個庫中選擇:編程
- py-amqplib
- txAMQP
- pika
在這一系列教程中,咱們打算使用PHP 的AMQP擴展。詳細教程請查看:服務器
mac os 下RabbitMq 以及 PHP amqp擴展安裝記錄
咱們第一個程序send.php會發送一個消息到隊列中。首先要作的事情就是創建一個到RabbitMQ服務器的鏈接。網絡
$connection = new AMQPConnection(array('host' =>'127.0.0.1', 'port' =>'5672', 'vhost' =>'/', 'login' =>'guest', 'password' => 'guest'));
如今咱們已經鏈接上服務器了,那麼,在發送消息以前咱們須要確認隊列是存在的。若是咱們把消息發送到一個不存在的隊列,RabbitMQ會丟棄這條消息。我門先建立一個名爲hello的隊列,而後把消息發送到這個隊列中。編程語言
$queue = new AMQPQueue($channel); $queue->setName($queueName);
這時候咱們就能夠發送消息了,咱們第一條消息只包含了 Hello World!字符串,咱們打算把它發送到咱們的hello隊列。函數
在RabbitMQ中,消息是不能直接發送到隊列,它須要發送到交換器(exchange)中。咱們不打算在這裏深刻討論它——你能夠經過教程的第三部分瞭解更多。如今咱們所須要瞭解的是如何使用默認的交換器(exchange),它使用一個空字符串來標識。交換器容許咱們指定某條消息須要投遞到哪一個隊列,$$routeKey參數必須指定爲隊列的名稱:工具
$exchange->publish($message, $routeKey); var_dump("[x] Sent 'Hello World!'");
在退出程序以前,咱們須要確認網絡緩衝已經被刷寫、消息已經投遞到RabbitMQ。完成這些事情(正確的關閉鏈接)是很簡單的。
$connection->disconnect();
咱們的第二個程序receive.php,將會從隊列中獲取消息並打印消息。
此次咱們仍是先要鏈接到RabbitMQ服務器。鏈接服務器的代碼和以前是同樣的。
下一步也和以前同樣,咱們須要確認隊列是存在的。使用$queue->declare()建立一個隊列——咱們能夠運行這個命令不少次,可是隻有一個隊列會建立。
$queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->declare();
你也許要問爲何重複聲明瞭隊列——咱們已經在前面的代碼中聲明瞭它。若是咱們肯定了隊列是已經存在的,那麼咱們能夠不這麼作。好比先運行send.php程序。但是咱們並不肯定哪一個程序先運行,這種狀況的話再程序中重複聲明是好的作法。
列出全部隊列
你也許但願查看RabbitMQ由哪些隊列、有多少消息在隊列中。你可使用rabbitmqctl工具(使用有權限的用戶):
``` $ sudo rabbitmqctl list_queues Listing queues ... hello 0 ...done.
```
(omit sudo on Windows)
(在Windows中不須要sudo命令)
從隊列中獲取消息相對來講稍顯複雜。須要爲隊列定義一個回調(callback)函數。當咱們獲取到消息的時候,Pika庫就會調用這個回調(callback)函數。咱們的這個回調函數將會但因消息的內容到屏幕上。
function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); $queue->nack($envelope->getDeliveryTag()); }
下一步,咱們須要告訴RabbitMQ這個回調函數將會從hello隊列中接收消息:
$queue->consume('callback');
要成功運行這些命令,咱們必須保證隊列是存在的,咱們已經可以保證——咱們以前已經使用建立了一個隊列queue_declare。
$queue->nack()//函數稍後會介紹。
最後,咱們輸入一個無限循環來等待消息數據並確運行回調函數。
var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); }
send.php的所有代碼:
<?php /** * PHP amqp(RabbitMQ) Demo-1 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'hello'; $routeKey = 'hello'; $message = 'Hello World!'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $exchange->publish($message, $routeKey); var_dump("[x] Sent 'Hello World!'"); } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect();
receive.py的所有代碼:
<?php /** * PHP amqp(RabbitMQ) Demo-1 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'hello'; $routeKey = 'hello'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->declare(); $queue->bind($exchangeName, $routeKey); var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); $queue->nack($envelope->getDeliveryTag()); }
如今就能夠在終端中運行咱們的程序了。首先,用send.php重續發送一條消息:
php send.php string(23) "[x] Sent 'Hello World!'"</pre>
生產者(producer)程序send.php每次運行以後就會中止。如今咱們就來接收消息:
php receive.php string(46) "[*] Waiting for messages. To exit press CTRL+C" string(26) " [x] Received:Hello World!"</pre>
成功了!咱們已經經過RabbitMQ發送第一條消息。你也許已經注意到了,receive.py程序並無退出。它一直在準備獲取消息,你能夠經過Ctrl-C來終端它。
試下在新的終端中再次運行send.php。
咱們已經學會如何發送消息到一個已知隊列中並接收消息。是時候移步到第二部分了,咱們將會創建一個簡單的工做隊列(work queue)。