RabbitMQ是使用erlang語言開發的開源消息隊列系統,完整的實現了AMPQ(高級抽象層消息通訊協議)。php
使用Homebrew安裝git
$ brew install rabbitmq
修改 ~/.bash_profile 配置環境變量:github
# RabbitMQ Config export PATH=$PATH:/usr/local/sbin
重啓配置json
$ source ~/.bash_profile
啓動mq服務(後臺啓動爲rabbitmq-server -detached)bash
$ rabbitmq-server
登陸管理界面 http://127.0.0.1:15672 帳號密碼爲:guestcomposer
RabbitMQ官方提供了三種PHP可用的擴展:php-amqp,php-rabbit,php-amqplibtcp
php的客戶端如今經常使用的是php-amqplib函數
$ git clone https://github.com/php-amqplib/php-amqplib.git
將composer.json文件添加到您的項目中ui
{spa
「require」:{ 「php-amqplib / php-amqplib」:「> = 2.6.1」 }
}
下載依賴
$ composer install
virtual vhosts是一個命名空間,能夠存在多個exchange和queue。實現了環境(用戶,用戶組,exchange,queue)隔離,是權限控制的最小粒度。默認的virtual host爲/。
接受producer發送的消息,並根據binding綁定規則轉發到對應的隊列。默認是無名交換使用空字符串標識。exchange type(交換機類型)包含四種類型:direct,topic,headers,fanout
轉發消息到routigKey指定的隊列
相似於direct類型,只不過routigKey爲一個句點號「.」分隔的字符串
*能夠替代一個字。
#能夠替換零個或多個單詞。
根據發送的消息內容中的headers屬性進行匹配。
將全部收到的消息廣播到全部已知的隊列。
queue是mq內部對象,用於存儲未被customer消費的消息。相同屬性的queue能夠重複定義,每一個消息都會被投入到一個或多個隊列。
binding是將exchange和queue按照路由規則綁定起來。能夠理解爲binding是exchange和queue之間的關係
消息tcp鏈接
每一個connection裏,可創建多個channel,每一個channel表明一個會話任務。作到儘可能共用connection
1.send.php:
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立channel,多個channel能夠共用鏈接 $channel = $connection->channel(); // 建立交換機以及隊列(若是已經存在,不須要從新再次建立而且綁定) // 建立直連的交換機 $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 建立隊列 $channel->queue_declare('hello', false, false, false, false); // 交換機跟隊列的綁定, $channel->queue_bind('hello', 'direct_logs', 'routigKey'); // 設置消息bady傳送字符串logs(消息只能爲字符串,建議消息均json格式) $msg = new AMQPMessage('logs'); // 發送數據到對應的交換機direct_logs並設置對應的routigKey $channel->basic_publish($msg, 'direct_logs', 'routigKey');
2.receive.php:
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立channel,多個channel能夠共用鏈接 $channel = $connection->channel(); // 可能會在數據發佈以前啓動消費者,因此咱們要確保隊列存在,而後再嘗試從中消費消息。 // 建立直連的交換機 $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 建立隊列 $channel->queue_declare('hello', false, false, false, false); // 交換機跟隊列的綁定, $channel->queue_bind('hello', 'direct_logs', 'routigKey'); // 回調函數 $callback = function ($msg) { echo $msg->body; }; // 啓動隊列消費者 $channel->basic_consume('hello3', '', false, true, false, false, $callback); // 判斷是否存在回調函數 while(count($channel->callbacks)) { // 此處爲執行回調函數 $channel->wait(); }
非持久化會致使,隊列重啓,數據丟失
exchange持久化,在聲明durable參數時指定爲true
queue持久化,在聲明durable參數時指定true
消息持久化,實例化AMQPMessage類時指定delivery_mode爲2
exchange和queue是否持久化須要一致才能綁定
消費者設置手動ack,在聲明no_ack參數時指定false
隊列消息異常須要將消息刪除並再次發送一樣的消息置於末尾並手動記錄日誌