使用php-amqplib鏈接rabbitMQ 學習筆記及總結

一、使用composer安裝php-amqplib

在你的項目中添加一個 composer.json文件:
  {
     "require": {
      "php-amqplib/php-amqplib": "2.6.*"
     }
 }
只要你已經安裝Composer功能,你能夠運行如下:
   $ composer install
已經存在的項目則執行 
   $ composer update
這時在verdor目錄就已經下載完畢

具體能夠參考官方文檔:https://github.com/php-amqplib/php-amqplib

二、使用php-amqplib

英文文檔:http://www.rabbitmq.com/getstarted.html
中文文檔:https://rabbitmq.shujuwajue.com/tutorials_with_php/[2]Work_Queues.md.html

總結:官方文檔看起來有點亂,總結了一下,基本都是互通的,如下爲簡單基本步驟,僅供參考,NO BB。 生產者:
1、建立鏈接 主要參數說明: $host: RabbitMQ服務器主機IP地址 $port: RabbitMQ服務器端口 $user: 鏈接RabbitMQ服務器的用戶名 $password: 鏈接RabbitMQ服務器的用戶密碼 $vhost: 鏈接RabbitMQ服務器的vhost(服務器能夠有多個vhost,虛擬主機,相似nginx的vhost) $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost); 2、獲取信道 // $channel_id 信道id,不傳則獲取$channel[「」]信道,再無則循環$this->channle數組,下標從1到最大信道數找第一個不是AMQPChannel對象的下標,實例化並返回AMQPChannel對象,無則拋出異常No free channel ids $channel = $connection->channel($channel_id); 3、在信道里建立交換器 # $exhcange_name 交換器名字 # $type 交換器類型: ’’ 默認交換機 匿名交換器 未顯示聲明類型都是該類型 fanout 扇形交換器 會發送消息到它所知道的全部隊列,每一個消費者獲取的消息都是一致的 headers 頭部交換器 direct 直連交換器,該交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配 topic 話題交換器 該交換機會對路由鍵正則匹配,必須是*(一個單詞)、#(多個單詞,以.分割) 、 user.key .abc.* 類型的key rpc #$passive false #durable false #auto_detlete false $channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete); //經常使用設置 $passive=>false $durable=>false $auto_delete->false 4、建立要發送的信息 ,能夠建立多個消息 #$data string類型 要發送的消息 #$properties array類型 設置的屬性,好比設置該消息持久化[‘delivery_mode’=>2] $msg = new AMQPMessage($data,$properties) 5、發送消息 #$msg object AMQPMessage對象 #$exchange string 交換機名字 #$routing_key string 路由鍵 若是交換機類型 fanout: 該值會被忽略,由於該類型的交換機會把全部它知道的隊列發消息,無差異區別 direct 只有精確匹配該路由鍵的隊列,纔會發送消息到該隊列 topic 只有正則匹配到的路由鍵的隊列,纔會發送到該隊列 $channel->basic_publish($msg,$exchange,$routing_key); 6、關閉信道和連接 $channel->close(); $connection->close(); 消費者: 1、建立鏈接 主要參數說明: $host: RabbitMQ服務器主機IP地址 $port: RabbitMQ服務器端口 $user: 鏈接RabbitMQ服務器的用戶名 $password: 鏈接RabbitMQ服務器的用戶密碼 $vhost: 鏈接RabbitMQ服務器的vhost(服務器能夠有多個vhost,虛擬主機,相似nginx的vhost) $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost); 2、獲取信道 // $channel_id 信道id,不傳則獲取$channel[「」]信道,再無則循環$this->channle數組,下標從1到最大信道數找第一個不是AMQPChannel對象的下標,實例化並返回AMQPChannel對象,無則拋出異常No free channel ids $channel = $connection->channel($channel_id); 3、在信道里建立交換器,未顯式聲明交換機都是使用匿名交換機 # $exhcange_name 交換器名字 # $type 交換器類型: ’’ 默認交換機 匿名交換器 未顯示聲明類型都是該類型 fanout 扇形交換器 會發送消息到它所知道的全部隊列,每一個消費者獲取的消息都是一致的 headers 頭部交換器 direct 直連交換器,該交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配 topic 話題交換器 該交換機會對路由鍵正則匹配,必須是*(一個單詞)、#(多個單詞,以.分割) 、 user.key .abc.* 類型的key rpc #$passive false #durable false #auto_detlete false $channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete); //經常使用設置 $passive=>false $durable=>false $auto_delete->false 4、聲明消費者隊列 (1) 非持久化隊列,RabbitMQ退出或者崩潰時,該隊列就不存在 list($queue_name, ,) = $channel->queue_declare("", false, false, false, false) (2) 持久化隊列(須要顯示聲明,第三個參數要設置爲true),保存到磁盤,但不必定徹底保證不丟失信息,由於保存老是要有時間的。 list($queue_name, ,) = $channel->queue_declare("ex_queue", false, false, true, false) 5、綁定交換機,當未顯示綁定交換機時,默認是綁定匿名交換機 #綁定:交換機與隊列的關係,以下面這句代碼意思是,$queue_name隊列對logs交換機數據感興趣,該隊列就消費該交換機傳過來的數據:這個隊列(queue)對這個交換機(exchange)的消息感興趣. $binding_key默認爲空,表示對該交換機全部消息感興趣,若是值不爲空,則該隊列只對該類型的消息感興趣(除了fanout交換機之外) $channel->queue_bind($queue_name, 'logs', $binding_key); 6、消費消息 #該代碼表示使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker),輪詢、負載均衡配置 #$channel->basic_qos(null, 1, null); #第四個參數 no_ack = false 時,表示進行ack應答,確保消息已經處理 #$callback 表示回調函數,傳入消息參數 $channel->basic_consume('ex_queue', '', false, false, false, false, $callback); $callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; #當no_ack=false時, 須要寫下行代碼,不然可能出現內存不足狀況#$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; #監聽消息,一有消息,立馬就處理 while(count($channel->callbacks)) { $channel->wait(); }
相關文章
相關標籤/搜索