RabbitMQ官方中文入門教程(PHP版) 第一部分:Hello World

RabbitMQ是一個消息代理。它的核心原理很是簡單:接收和發送消息。你能夠把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ是一個郵箱、郵局、郵遞員。RabbitMQ和郵局的主要區別是,它處理的不是紙,而是接收、存儲和發送二進制的數據——消息。通常提到RabbitMQ和消息,都用到一些專有名詞。php

  • 生產(Producing)意思就是發送。發送消息的程序就是一個生產者(producer)。咱們通常用」P」來表示:
  • 隊列(queue)就是郵箱的名稱。消息經過你的應用程序和RabbitMQ進行傳輸,它們可以只存儲在一個隊列(queue)中。 隊列(queue)沒有任何限制,你要存儲多少消息均可以——基本上是一個無限的緩衝。多個生產者(producers)可以把消息發送給同一個隊列,一樣,多個消費者(consumers)也能攻從一個隊列(queue)中獲取數據。隊列能夠化城這樣(圖上是隊列的名稱):
  • 消費(Consuming)和獲取消息是同樣的意思。一個消費者(consumer)就是一個等待獲取消息的程序。咱們把它畫做」C」:

Hello World!

(使用pika 0.9.5 Python客戶端)

咱們的「Hello world」不會很複雜——僅僅發送一個消息,而後獲取它並輸出到屏幕。這樣以來咱們須要兩個程序,一個用做發送消息,另外一個接受消息並打印消息內容html

咱們大致的設計是這樣的:python

生產者(Producer)把消息發送到一個名爲「hello」的隊列中。消費者(consumer)從這個隊列中獲取消息。web

RabbitMQ庫

RabbitMQ使用的是AMQP協議。要使用她你就必須須要一個使用一樣協議的庫。幾乎全部的編程語言都有可選擇的庫。python也是同樣,能夠從如下幾個庫中選擇:編程

  • py-amqplib
  • txAMQP
  • pika

在這一系列教程中,咱們打算使用PHPAMQP擴展。詳細教程請查看:服務器

mac os 下RabbitMq 以及 PHP amqp擴展安裝記錄

發送消息

咱們第一個程序send.php會發送一個消息到隊列中。首先要作的事情就是創建一個到RabbitMQ服務器的鏈接。網絡

$connection = new AMQPConnection(array('host' =>'127.0.0.1', 'port' =>'5672', 'vhost' =>'/', 'login' =>'guest', 'password' => 'guest'));

如今咱們已經鏈接上服務器了,那麼,在發送消息以前咱們須要確認隊列是存在的。若是咱們把消息發送到一個不存在的隊列,RabbitMQ會丟棄這條消息。我門先建立一個名爲hello的隊列,而後把消息發送到這個隊列中。編程語言

$queue = new AMQPQueue($channel);
$queue->setName($queueName);

這時候咱們就能夠發送消息了,咱們第一條消息只包含了 Hello World!字符串,咱們打算把它發送到咱們的hello隊列。函數

在RabbitMQ中,消息是不能直接發送到隊列,它須要發送到交換器(exchange)中。咱們不打算在這裏深刻討論它——你能夠經過教程的第三部分瞭解更多。如今咱們所須要瞭解的是如何使用默認的交換器(exchange),它使用一個空字符串來標識。交換器容許咱們指定某條消息須要投遞到哪一個隊列,$$routeKey參數必須指定爲隊列的名稱:工具

$exchange->publish($message, $routeKey);
var_dump("[x] Sent 'Hello World!'");

在退出程序以前,咱們須要確認網絡緩衝已經被刷寫、消息已經投遞到RabbitMQ。完成這些事情(正確的關閉鏈接)是很簡單的。

$connection->disconnect();

獲取數據

咱們的第二個程序receive.php,將會從隊列中獲取消息並打印消息。

此次咱們仍是先要鏈接到RabbitMQ服務器。鏈接服務器的代碼和以前是同樣的。

下一步也和以前同樣,咱們須要確認隊列是存在的。使用$queue->declare()建立一個隊列——咱們能夠運行這個命令不少次,可是隻有一個隊列會建立。

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->declare();

你也許要問爲何重複聲明瞭隊列——咱們已經在前面的代碼中聲明瞭它。若是咱們肯定了隊列是已經存在的,那麼咱們能夠不這麼作。好比先運行send.php程序。但是咱們並不肯定哪一個程序先運行,這種狀況的話再程序中重複聲明是好的作法。

列出全部隊列

你也許但願查看RabbitMQ由哪些隊列、有多少消息在隊列中。你可使用rabbitmqctl工具(使用有權限的用戶):

``` $ sudo rabbitmqctl list_queues Listing queues ... hello 0 ...done.

```

(omit sudo on Windows)

(在Windows中不須要sudo命令)

從隊列中獲取消息相對來講稍顯複雜。須要爲隊列定義一個回調(callback)函數。當咱們獲取到消息的時候,Pika庫就會調用這個回調(callback)函數。咱們的這個回調函數將會但因消息的內容到屏幕上。

function callback($envelope, $queue) {  
        $msg = $envelope->getBody();
        var_dump(" [x] Received:" . $msg);
        $queue->nack($envelope->getDeliveryTag());
}

下一步,咱們須要告訴RabbitMQ這個回調函數將會從hello隊列中接收消息:

$queue->consume('callback');

要成功運行這些命令,咱們必須保證隊列是存在的,咱們已經可以保證——咱們以前已經使用建立了一個隊列queue_declare。

$queue->nack()//函數稍後會介紹。

最後,咱們輸入一個無限循環來等待消息數據並確運行回調函數。

var_dump('[*] Waiting for messages. To exit press CTRL+C');  
while (TRUE) {  
        $queue->consume('callback');
}

整合

send.php的所有代碼:

<?php

/**
 * PHP amqp(RabbitMQ) Demo-1
 * @author  yuansir &amp;lt;yuansir@live.cn/yuansir-web.com>
 */
$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';
$message = 'Hello World!';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");

try {  
        $channel = new AMQPChannel($connection);
        $exchange = new AMQPExchange($channel);
        $exchange->setName($exchangeName);
        $queue = new AMQPQueue($channel);
        $queue->setName($queueName);
        $exchange->publish($message, $routeKey);
        var_dump("[x] Sent 'Hello World!'");
} catch (AMQPConnectionException $e) {
        var_dump($e);
        exit();
}
$connection->disconnect();

receive.py的所有代碼:

<?php

/**
 * PHP amqp(RabbitMQ) Demo-1
 * @author  yuansir &amp;lt;yuansir@live.cn/yuansir-web.com>
 */
$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->declare();
$queue->bind($exchangeName, $routeKey);

var_dump('[*] Waiting for messages. To exit press CTRL+C');  
while (TRUE) {  
        $queue->consume('callback');
}
$connection->disconnect();

function callback($envelope, $queue) {  
        $msg = $envelope->getBody();
        var_dump(" [x] Received:" . $msg);
        $queue->nack($envelope->getDeliveryTag());
}

如今就能夠在終端中運行咱們的程序了。首先,用send.php重續發送一條消息:

php send.php  
string(23) "[x] Sent 'Hello World!'"</pre>

生產者(producer)程序send.php每次運行以後就會中止。如今咱們就來接收消息:

php receive.php  
string(46) "[*] Waiting for messages. To exit press CTRL+C"  
string(26) " [x] Received:Hello World!"</pre>

成功了!咱們已經經過RabbitMQ發送第一條消息。你也許已經注意到了,receive.py程序並無退出。它一直在準備獲取消息,你能夠經過Ctrl-C來終端它。

試下在新的終端中再次運行send.php。

咱們已經學會如何發送消息到一個已知隊列中並接收消息。是時候移步到第二部分了,咱們將會創建一個簡單的工做隊列(work queue)

相關文章
相關標籤/搜索