安裝php
rabbitmq 在 mac 下能夠直接用 brew 安裝
默認安裝在 /usr/local/Cellar/下
命令被軟鏈接加入到了/usr/local/sbin 下,所以能夠把此目錄放到環境變量中,建議加入到~/.bash_profile 中
rabbitmq-server start 開啓服務
端口5672 默認
端口15672 web 端登陸管理端口127.0.0.1:15672
rabbitmq 默認提供的用戶 guest 密碼 guesthtml
中止服務
rabbitmqctl stop
開啓應用 [服務依舊運行]
rabbitmqctl start_app
中止應用 [服務依舊運行]
rabbitmqctl stop_appweb
添加用戶
sudo rabbitmqctl add_user username password
刪除用戶
sudo rabbitmqctl delete_user username
修改密碼
sudo rabbitmqctl change_password username newpassword
清除用戶密碼,禁止用戶登陸
sudo rabbitmqctl clear_password <username>
列出全部用戶
sudo rabbitmqctl list_users
設置用戶角色
rabbitmqctl set_user_tags username tagbash
virtual host只是起到一個命名空間的做用,因此能夠多個user共同使用一個virtual host,'/'這個是系統默認的vhost,就是說當咱們建立一個到rabbitmq的connection時候,它的命名空間是'/',須要注意的是不一樣的命名空間之間的資源是不能訪問的,好比 exchang,queue ,bingding等
建立虛擬主機
sudo rabbitmqctl add_vhost vhostpath
刪除虛擬主機
sudo rabbitmqctl delete_vhost vhostpath
列出全部虛擬主機
sudo rabbitmqctl list_vhosts
列出某個 vhost 的全部用戶和權限
list_permissions [-p vhostpath]
列出某個用戶的全部權限。
list_user_permissions {username}
清除用戶對某個 vhost 的權限。
clear_permissions [-p vhostpath] {username}
設置用戶對某個 virtual host 的權限,若是不指定 vhost,則默認爲「/」 vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang 「." "." ".*"服務器
添加一個管理員代替 guest
rabbitmqctl add_user admin 123456
指定用戶的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*」
分配給用戶指定虛擬主機的權限,雖然是administrator角色,但不對全部虛擬主機都有權限,同樣須要對每一個虛擬主機都受權app
顯示信息
rabbitmqctl list_queues [-p <vhostpath>] [<queueinfoitem> ...]
列出某個 vhost 的全部 queue。
rabbitmqctl list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
列出某個 vhost 的全部 exchange。
rabbitmqctl list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
列出某個 vhost 的全部 binding。
rabbitmqctl list_connections [<connectioninfoitem> ...]
列出 RabbitMQ broker 的全部 connection。
rabbitmqctl list_channels [<channelinfoitem> ...]
列出 RabbitMQ broker 的全部 channel
rabbitmqcrl list_consumers [-p <vhostpath>]
列出某個 vhost 的全部 consumer。composer
1.Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。函數
2.Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Hostui
3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。spa
4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。
5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。
6.Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。
7.Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。
8.Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。
9.Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。
php端 rabbitmq 客戶端可使用 composer 下的庫
{ "require": { "php-amqplib/php-amqplib": "2.5.*" } }
使用時,用到了這兩個東西
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
開始發送消息
public function handle() { //鏈接到 test_host 虛擬主機,每一個虛擬主機有本身的隊列,交換機... $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'kang', 'a943434603', 'test_host'); //建立一個 channel $channel = $connection->channel(); //聲明 hello 隊列 $channel->queue_declare('hello', false, false, false, false); //建立一個消息 $msg = new AMQPMessage(time()); //把消息推送到默認的交換機中,而且告訴交換機要把消息交給 hello 隊列 $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent ".time()."\n"; }
重要概念,消息是保存在交換機中的,當消息存放時指定的隊列存在,交換機會把消息推送到該隊列
消息隊列發送消息給消費者,一個消息發給一個消費者
public function handle() { //鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'kang', 'a943434603', "test_host"); //建立一個 channel $channel = $connection->channel(); //能夠運行這個命令不少次,可是隻有一個隊列會被建立, 在程序中重複將隊列重複聲明一下是種值得推薦的作法,保證隊列存在 $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; sleep($msg->body); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; //默認狀況下,隊列會把消息公平的分配給各個消費者 //若是某個消費者腳本處理完成分配給他的消息任務後,會一直空閒 //另一個消費者腳本處理的消息都很是耗時,這就容易致使消費者腳本得不到合理利用, //加入此句話,是告訴隊列,取消把消息公平分配到各個腳本,而是那個腳本空閒,就交給它一個消息任務 //這樣,合理利用到每個空閒的消費者腳本 $channel->basic_qos(null, 1, null); /** * basic_consume 方法 從隊列中讀取數據 * @param string $queue 指定隊列 * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack 消費者處理完消息後,是否不須要告訴隊列已經處理完成,true 不須要 false 須要, * true 默認狀況下,隊列會把消息公平分配到各個消費者中,而後一次性把消息交給消費者,若是消費者處理了一半掛了,那麼消息就丟失了 * false 默認狀況下,隊列會把消息公平的分配給各個消費者,而後一個一個的把消息分配到消費者腳本中,腳本處理完成後,告訴隊列,隊列會刪除這個消息,而且接着給下一個消息, 當腳本掛掉,不會丟失消息,隊列會把未完成的消息分配給其餘消費者 在 callback 函數中須要加入這句話,處理完後通知隊列能夠刪除消息了 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 未加入這句話,隊列不會刪除已處理完的消息,當腳本掛掉時,會把分配給當前隊列的全部消息再次從新分配給其餘隊列,會致使消息會重複處理 */ $channel->basic_consume('hello', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }
一個消息發送給多個消費者
扇形交換機 fanout
發佈訂閱模式,科院實現一個消息發送到多個隊列中
在發佈消息腳本中,建立一個扇形交換機,把消息推送到交換機,不須要推進到指定的隊列中,隊列在消費者腳本中建立
消費腳本定義個臨時隊列,並綁定這個臨時隊列到交換機中,扇形交換機會把接收到的消息推進到每個綁定的隊列中
生產者腳本
public function handle() { //鏈接到 test_host 虛擬主機,每一個虛擬主機有本身的隊列,交換機... $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'kang', 'a943434603', 'test_host'); //建立一個 channel $channel = $connection->channel(); //聲明 log 隊列 //$channel->queue_declare('log', false, false, false, false); //建立一個fanout類型交換機 $channel->exchange_declare("logs","fanout",false,false,false); //建立一個消息 $msg = new AMQPMessage( time() ); $channel->basic_publish ( $msg , 'logs' ); echo " [x] Sent ".time()."\n"; $channel->close(); $connection->close(); }
消費者腳本
public function handle() { //鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'kang', 'a943434603', "test_host"); //建立一個 channel $channel = $connection->channel(); //能夠運行這個命令不少次,可是隻有一個隊列會被建立, 在程序中重複將隊列重複聲明一下是種值得推薦的作法,保證隊列存在 //$channel->queue_declare('hello', false, false, false, false); //建立一個fanout類型交換機 $channel->exchange_declare('logs', 'fanout', false, false, false); //系統建立一個臨時隊列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); //綁定臨時隊列到交換機上 $channel->queue_bind($queue_name, 'logs'); $callback = function($msg){ echo ' [x] ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }
直連交換機 direct
交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而肯定消息該分發到哪一個隊列
生產者,建立基本上和扇形交換機同樣,不一樣的是
$channel->exchange_declare("direct_logs","direct",false,false,false); //建立一個消息 $msg = new AMQPMessage( time() ); //把消息推進到direct_logs交換機,並給消息加上路由 key,讓消費者隊列來根據 key 接收消息 $channel->basic_publish ( $msg , 'direct_logs', "warning" );
消費者
$channel->exchange_declare("direct_logs","direct",false,false,false); //系統建立一個臨時隊列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); //綁定臨時隊列到交換機上,並指定消息的路由 key $channel->queue_bind($queue_name, 'direct_logs', 'error'); $channel->queue_bind($queue_name, 'direct_logs', 'warning');
主題交換機 topic
直連交換機中路由 key 匹配模式
(星號*) 用來表示一個單詞.
(井號#) 用來表示任意數量(零個或多個)單詞。
Q1會接收到 a.orange.b 等key 值中間爲 orange 的消息
Q2會接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行爲。
生產者,指定路由 key
$channel->exchange_declare("topic_logs","topic",false,false,false); //建立一個消息 $msg = new AMQPMessage( time() ); //把消息推進到direct_logs交換機,並給消息加上路由 key,讓消費者隊列來根據 key 接收消息 $channel->basic_publish ( $msg , 'topic_logs', "baidu.warning" );
消費者1
$channel->exchange_declare("topic_logs","topic",false,false,false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'topic_logs', "baidu.#");
消費者2
$channel->exchange_declare("topic_logs","topic",false,false,false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'topic_logs', 'ali.#');
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/