RabbitMQ+PHP 教程一(Hello World)

介紹

RabbitMQ是一個消息代理器:它接受和轉發消息。你能夠把它看成一個郵局:當你把郵件放在信箱裏時,你能夠確定郵差先生最終會把郵件送到你的收件人那裏。在這個比喻中,RabbitMQ就是這裏的郵箱,郵局和郵差。php

RabbitMQ和郵局之間的主要區別是,它不處理紙張,而是接受、存儲和轉發二進制數據‒消息。html

RabbitMQ,和通常的消息傳遞,使用專業術語。json

生產者的工做就是發送消息。發送消息的程序是生產者:segmentfault

clipboard.png

隊列類比一個郵箱,存在於RabbitMQ, 然而信息流經過RabbitMQ和您的應用程序,他們只能存儲在一個隊列。隊列只受主機內存和磁盤限制的約束,它本質上是一個很大的消息緩衝區。會有許多生產者能夠發送到一個隊列的消息,許多消費者能夠嘗試從一個隊列接收數據。這就是咱們如何表示隊列的方式:數組

clipboard.png

消費者和生產者有着類似的意義. 消費者無非就是等待消息而後處理的程序:服務器

clipboard.png

請注意,生產者、消費者和代理沒必要同一主機上;事實上,在大多數應用程序中它們沒有這樣作。composer

"Hello World"

(使用PHP amqplib客戶端)異步

在本教程的這一部分中,咱們將用PHP編寫兩個程序;一個生產者發送一條消息,一個用戶接收消息並將它們打印出來。咱們會PHP amqplib API的忽略一些細節,集中在這個很是簡單的事情剛剛開始。這是一個「Hello World」的消息傳遞。socket

在下圖中,「p」是咱們的生產商,「C」是咱們的消費者。在中間的框是一個隊列的消息緩衝區,RabbitMQ保持表明的消費。ide

clipboard.png

PHP amqplib客戶端庫

RabbitMQ有不少協議。本教程介紹AMQP 0-9-1,這是一個開放的、通用的協議消息。有許多不一樣的語言RabbitMQ一批客戶。咱們將在本教程中使用PHP amqplib,composer解決依賴管理。

添加composer.json:

{
    "require": {
        "php-amqplib/php-amqplib": ">=2.6.1"
    }
}
composer install

# 或者 直接運行包引入
composer require php-amqplib/php-amqplib

如今咱們能夠開始咱們的hello world

生產者(消息發送方)

clipboard.png

咱們命令咱們的消息發佈者(發送者)send.php和消息接收receive.php。發送者將鏈接到RabbitMQ,發送一條消息,而後退出。

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

如今咱們能建立一個鏈接服務器的Connection:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

該鏈接抽象套接字(socket)鏈接,併爲咱們負責協議版本協商和認證等。這裏,咱們鏈接到一個rabbitmq代理器在本地機器上-使用localhost。若是咱們想在不一樣的機器上鍊接到一個代理,咱們只需在這裏指定它的名稱或IP地址。

接下來,咱們建立一個通道,這是處理事情的大部分API的地方。

發送消息前,咱們必須聲明一個隊列爲咱們發送作準備;而後咱們能夠向隊列發佈消息:

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

聲明隊列是冪等的(原句:Declaring a queue is idempotent,這裏的idempotent不知道是什麼意思) - 只有在它不存在時纔會建立隊列。消息內容是一個字節數組,所以您能夠在那裏編碼用你喜歡的方式。

最後,咱們關閉通道和鏈接;

$channel->close();
$connection->close();

上面咱們完成了send.php.

接下來咱們完成消費方的代碼

消費者(接收方,任務處理方)

消費者從RabbitMQ接收推來的消息,咱們會保持運行監聽消息並打印出來。

clipboard.png

引入lib

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

設置與發佈程序相同;咱們打開一個鏈接和一個通道,並聲明將要消耗的隊列。注意,這與發送發佈的隊列匹配。

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

注意,咱們也在這裏聲明隊列。由於咱們可能在發佈以前啓動消費者,咱們但願在咱們嘗試從它那裏消費消息以前肯定隊列的存在。

咱們將告訴服務器從隊列中發送消息。咱們將定義一個PHP可調用,它將接收服務器發送的消息。請記住,消息是從服務器異步發送到客戶機的。

$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

當調用basic_consume,咱們的代碼會阻塞。當咱們收到消息時,咱們的回調函數將經過接收到返回的消息傳遞。

以上是咱們receive.php的代碼

運行測試

運行消費者

php receive.php

運行消息發送方

php send.php

列出隊列

rabbitmqctl list_queues

完整源碼(調整過)

config.php

<?php
return [
    'vendor' => [
        'path' => dirname(dirname(__DIR__)) . '/vendor'
    ],
    'rabbitmq' => [
        'host' => '127.0.0.1',
        'port' => '5672',
        'login' => 'qkl',
        'password' => '123456',
        'vhost' => '/'
    ]
];
?>

receive.php

<?php
$config = require "../config.php";

require_once $config['vendor']['path'] . '/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'],
    $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']);
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);
 
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

send.php

<?php
$config = require "../config.php";

require_once $config['vendor']['path'] . '/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'],
    $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']);
$channel = $connection->channel();

//發送方其實不須要設置隊列, 不過對於持久化有關,建議執行該行
$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();
?>

瞭解如何構建一個簡單的工做隊列, 你能夠閱讀下一章節: RabbitMQ+PHP 教程二(Work Queues)

翻譯來自 RabbitMQ - RabbitMQ tutorial - "Hello World!"

相關文章
相關標籤/搜索