在你的項目中添加一個 composer.json文件:
{
"require": {
"php-amqplib/php-amqplib": "2.6.*"
}
}
只要你已經安裝Composer功能,你能夠運行如下:
$ composer install
已經存在的項目則執行
$ composer update
這時在verdor目錄就已經下載完畢
具體能夠參考官方文檔:https://github.com/php-amqplib/php-amqplib
英文文檔:http://www.rabbitmq.com/getstarted.html
中文文檔:https://rabbitmq.shujuwajue.com/tutorials_with_php/[2]Work_Queues.md.html
總結:官方文檔看起來有點亂,總結了一下,基本都是互通的,如下爲簡單基本步驟,僅供參考,NO BB。
生產者:
1、建立鏈接
主要參數說明:
$host: RabbitMQ服務器主機IP地址
$port: RabbitMQ服務器端口
$user: 鏈接RabbitMQ服務器的用戶名
$password: 鏈接RabbitMQ服務器的用戶密碼
$vhost: 鏈接RabbitMQ服務器的vhost(服務器能夠有多個vhost,虛擬主機,相似nginx的vhost)
$connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
2、獲取信道
// $channel_id 信道id,不傳則獲取$channel[「」]信道,再無則循環$this->channle數組,下標從1到最大信道數找第一個不是AMQPChannel對象的下標,實例化並返回AMQPChannel對象,無則拋出異常No free channel ids
$channel = $connection->channel($channel_id);
3、在信道里建立交換器
# $exhcange_name 交換器名字
# $type 交換器類型:
’’ 默認交換機 匿名交換器 未顯示聲明類型都是該類型
fanout 扇形交換器 會發送消息到它所知道的全部隊列,每一個消費者獲取的消息都是一致的
headers 頭部交換器
direct 直連交換器,該交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配
topic 話題交換器 該交換機會對路由鍵正則匹配,必須是*(一個單詞)、#(多個單詞,以.分割) 、 user.key .abc.* 類型的key
rpc
#$passive false
#durable false
#auto_detlete false
$channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete);
//經常使用設置 $passive=>false $durable=>false $auto_delete->false
4、建立要發送的信息 ,能夠建立多個消息
#$data string類型 要發送的消息
#$properties array類型 設置的屬性,好比設置該消息持久化[‘delivery_mode’=>2]
$msg = new AMQPMessage($data,$properties)
5、發送消息
#$msg object AMQPMessage對象
#$exchange string 交換機名字
#$routing_key string 路由鍵 若是交換機類型
fanout: 該值會被忽略,由於該類型的交換機會把全部它知道的隊列發消息,無差異區別
direct 只有精確匹配該路由鍵的隊列,纔會發送消息到該隊列
topic 只有正則匹配到的路由鍵的隊列,纔會發送到該隊列
$channel->basic_publish($msg,$exchange,$routing_key);
6、關閉信道和連接
$channel->close();
$connection->close();
消費者:
1、建立鏈接
主要參數說明:
$host: RabbitMQ服務器主機IP地址
$port: RabbitMQ服務器端口
$user: 鏈接RabbitMQ服務器的用戶名
$password: 鏈接RabbitMQ服務器的用戶密碼
$vhost: 鏈接RabbitMQ服務器的vhost(服務器能夠有多個vhost,虛擬主機,相似nginx的vhost)
$connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
2、獲取信道
// $channel_id 信道id,不傳則獲取$channel[「」]信道,再無則循環$this->channle數組,下標從1到最大信道數找第一個不是AMQPChannel對象的下標,實例化並返回AMQPChannel對象,無則拋出異常No free channel ids
$channel = $connection->channel($channel_id);
3、在信道里建立交換器,未顯式聲明交換機都是使用匿名交換機
# $exhcange_name 交換器名字
# $type 交換器類型:
’’ 默認交換機 匿名交換器 未顯示聲明類型都是該類型
fanout 扇形交換器 會發送消息到它所知道的全部隊列,每一個消費者獲取的消息都是一致的
headers 頭部交換器
direct 直連交換器,該交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配
topic 話題交換器 該交換機會對路由鍵正則匹配,必須是*(一個單詞)、#(多個單詞,以.分割) 、 user.key .abc.* 類型的key
rpc
#$passive false
#durable false
#auto_detlete false
$channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete);
//經常使用設置 $passive=>false $durable=>false $auto_delete->false
4、聲明消費者隊列
(1) 非持久化隊列,RabbitMQ退出或者崩潰時,該隊列就不存在
list($queue_name, ,) = $channel->queue_declare("", false, false, false, false)
(2) 持久化隊列(須要顯示聲明,第三個參數要設置爲true),保存到磁盤,但不必定徹底保證不丟失信息,由於保存老是要有時間的。
list($queue_name, ,) = $channel->queue_declare("ex_queue", false, false, true, false)
5、綁定交換機,當未顯示綁定交換機時,默認是綁定匿名交換機
#綁定:交換機與隊列的關係,以下面這句代碼意思是,$queue_name隊列對logs交換機數據感興趣,該隊列就消費該交換機傳過來的數據:這個隊列(queue)對這個交換機(exchange)的消息感興趣. $binding_key默認爲空,表示對該交換機全部消息感興趣,若是值不爲空,則該隊列只對該類型的消息感興趣(除了fanout交換機之外)
$channel->queue_bind($queue_name, 'logs', $binding_key);
6、消費消息
#該代碼表示使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker),輪詢、負載均衡配置
#$channel->basic_qos(null, 1, null);
#第四個參數 no_ack = false 時,表示進行ack應答,確保消息已經處理
#$callback 表示回調函數,傳入消息參數
$channel->basic_consume('ex_queue', '', false, false, false, false, $callback);
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
#當no_ack=false時, 須要寫下行代碼,不然可能出現內存不足狀況#$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
#監聽消息,一有消息,立馬就處理
while(count($channel->callbacks)) {
$channel->wait();
}