發現網上並無RabbitMQ PHP客戶端的較爲詳實的使用文檔(固然也多是我搜索引擎使用不熟練?)總之,簡單的總結了此文,附錄各參數和經常使用的工做隊列,死信隊列,以及不一樣類型交換器的示例,本人水平有限,不免有錯誤之處,歡迎大佬斧正~php
服務器環境 Ubuntu 18.04.5 LTS
PHP 7.2.24
RabbitMQ 3.6.10
php-amqplib 2.7
nginx
apt-get install erlang-nox apt-get install rabbitmq-server rabbitmqctl add_user admin admin rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p / admin '.*' '.*' '.*' //開啓web管理頁面 //cd到安裝目錄 我這裏是/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10 cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10 rabbitmq-plugins enable rabbitmq_management cd 到項目目錄 composer require php-amqplib/php-amqplib 在須要使用的地方 use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable;
//1.1 創建鏈接 $conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost); 參數: $host: RabbitMQ服務器主機IP地址 $port: RabbitMQ服務器端口 $user: 鏈接RabbitMQ服務器的用戶名 $password: 鏈接RabbitMQ服務器的用戶密碼 $vhost: 鏈接RabbitMQ服務器的vhost(服務器能夠有多個vhost,虛擬主機,相似nginx的vhost)
//1.2 創建信道 $channel = $conn->channel($channel_id); 參數: $channel_id 信道id,不傳則獲取$channel[「」]信道,再無則循環$this->channle數組,下標從1到最大信道數找第一個不是AMQPChannel對象的下標,實例化並返回AMQPChannel對象,無則拋出異常No free channel ids
//1.3 聲明交換器 $channel->exchange_declare($exhcange_name, $type, $passive, $durable, $auto_delete); 參數: $exhcange_name 交換器名字 $type 交換器類型 $passive 是否檢測同名隊列 $durable 交換機是否開啓持久化 $auto_detlete 通道關閉後是否刪除隊列 (1)交換器類型 枚舉 [ direct: (默認)直接交換器,工做方式相似於單播,Exchange會將消息發送徹底匹配ROUTING_KEY的Queue, fanout: 廣播是式交換器,無論消息的ROUTING_KEY設置爲何,Exchange都會將消息轉發給全部綁定的Queue, topic: 主題交換器,工做方式相似於組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的全部隊列,好比,ROUTING_KEY爲user.stock的Message會轉發給綁定匹配模式爲 * .stock,user.stock, * . * 和#.user.stock.#的隊列。(* 表是匹配一個任意詞組,#表示匹配0個或多個詞組), headers:根據消息體的header匹配 ]
//1.4 聲明隊列 $channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete); 參數: $queue_name 隊列名稱 $passive 是否檢測同名隊列 $durable 是否開啓隊列持久化 $exclusive 隊列是否能夠被其餘隊列訪問 $auto_delete 通道關閉後是否刪除隊列
//1.5 建立要發送的信息 ,能夠建立多個消息 $msg = new AMQPMessage($data, $properties) $data 要發送的消息 $properties Array 設置的屬性,好比設置該消息持久化['delivery_mode'=>2] //單個發送 $channel->basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null); 參數: $msg 消息內容 $exchange 交換器 $routing_key routing_key $mandatory 匹配不到隊列時,是否當即丟棄消息 $immediate 隊列無消費者時,是否當即丟棄消息 $ticket 這個俺也不知道 坐等大佬 //多個發送 1.屢次調用 $channel->batch_basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null) 內部實現:往$this->batch_messages[]塞 2.再調用一次$channel->publish_batch(),完成發送
1.6 路由綁定 $channel->queue_bind( $queue, $exchange, $routing_key = '', $nowait = false, $arguments = array(), $ticket = null ) 參數: $queue 隊列名 $exchange 交換器名 $routing_key routing_key $nowait 同上 俺也不知 $arguments $ticket
1.7 消費消息 $channel->basic_consume( $queue = '', $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array() ) 參數: $queue 隊列名 $consumer_tag $no_local $no_ack 是否不須要手動ack:true就是不須要ack|false須要手動ack $exclusive $nowait $callback 消息回調函數 $ticket $arguments
1.8 手動ack 示例 $callback = function($msg) { sleep($msg->body); echo " [x] Received sleep ", $msg->body, "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); echo " [x] Ack "."\n"; };
1.9 限制分發 示例 限制RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢後,有了反饋,纔會進行第二次發送。 $channel->basic_qos(null,1,null);
這裏也順手補一下最基礎的使用,鏈接參數做爲demo就直接寫死了哈~web
public function send() { $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close(); } public function consume() { $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } }
生產者和消費者均增長 $channel->basic_qos(null,1,null); 便可。
例如註冊後須要發送歡迎短信和郵件,將註冊行爲廣播至短信和郵件 生產者 //定義交換器 $channel->exchange_declare('register','fanout',false,false,false); $msg = new AMQPMessage('register event'); $channel->basic_publish($msg, 'register'); 註冊短信消費者 $channel->exchange_declare('register','fanout',false,false,false); $channel->queue_declare('register.sms', false, false, false, false); $channel->queue_bind('register.sms', 'register'); 註冊郵件消費者 $channel->exchange_declare('register','fanout',false,false,false); $channel->queue_declare('register.mail', false, false, false, false); $channel->queue_bind('register.mail', 'register');
例如我想一個消費者接受全部日誌,一個消費者只接收Error級別日誌 生產者 //定義交換器 $channel->exchange_declare('log','topic',false,false,false); $num = rand(0,10); if ($num%3 == 0) { $level = 'error'; }elseif($num%3 == 1){ $level = 'warning'; }else{ $level = 'common'; } $msg = new AMQPMessage('log event '.$level); $channel->basic_publish($msg, 'log', 'log.'.$level); 全量日誌消費者 $channel->exchange_declare('log','topic',false,false,false); $channel->queue_declare('log.all', false, false, false, false); $channel->queue_bind('log.all', 'log', 'log.*'); Error日誌消費者 $channel->exchange_declare('log','topic',false,false,false); $channel->queue_declare('log.error', false, false, false, false); $channel->queue_bind('log.error', 'log', 'log.error');
例如我想一個消費者接受全部日誌,一個消費者只接收Error級別日誌 生產者 //定義交換器 $channel->exchange_declare('log2','headers',false,false,false); $num = rand(0,10); if ($num%3 == 0) { $level = 'error'; }elseif($num%3 == 1){ $level = 'warning'; }else{ $level = 'common'; } $msg = new AMQPMessage('log2 event '.$level); $bindArguments = [ 'level' => $level, 'type' => 'log' ]; $headers = new AMQPTable($bindArguments); $msg->set('application_headers', $bindArguments); $channel->basic_publish($msg, 'log2'); 全量日誌消費者 $channel->exchange_declare('log2','headers',false,false,false); $channel->queue_declare('log2.all', false, false, false, false); $bindArguments = [ 'type' => 'log', //'x-match' => 'any' //默認any ]; $headers = new AMQPTable($bindArguments); $channel->queue_bind('log2.all', 'log2', '', false, $headers); Error日誌消費者 $channel->exchange_declare('log2','headers',false,false,false); $channel->queue_declare('log2.error', false, false, false, false); $bindArguments = [ 'type' => 'log', 'level' => 'error', 'x-match' => 'all' //默認any ]; $headers = new AMQPTable($bindArguments); $channel->queue_bind('log2.error', 'log2', '', false, $headers);
//2.5.1 定義一個沒有消費者,5s後消息過時的隊列 //生產者 $arguments = new AMQPTable([ 'x-dead-letter-exchange' => 'dead', 'x-message-ttl' => 5000, //消息存活時間毫秒 'x-dead-letter-routing-key' => 'dead' ]); //定義隊列 不要交換器 $channel->queue_declare('no_consume', false, false, false, false, false, $arguments); $now = time(); $msg = new AMQPMessage($now); $channel->basic_publish($msg, '', 'no_consume'); echo " [x] Sent no_consume :".date('Y-m-d H:i:s',$now)."\n"; $channel->close(); $connection->close(); //消費者 $channel->exchange_declare('dead','topic',false,false,false); $channel->queue_declare('dead.all', false, false, false, false); $channel->queue_bind('dead.all', 'dead', 'dead'); $channel->basic_qos(null,1,null); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) { var_dump('msg:'.date('Y-m-d H:i:s',$msg->body)); var_dump('now:'.date('Y-m-d H:i:s')); echo " [x] Received log error ", $msg->body, "\n"; }; $channel->basic_consume('dead.all', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }