RabbitMQ+PHP php-amqplib使用教程與經常使用場景-死信隊列等

發現網上並無RabbitMQ PHP客戶端的較爲詳實的使用文檔(固然也多是我搜索引擎使用不熟練?)總之,簡單的總結了此文,附錄各參數和經常使用的工做隊列,死信隊列,以及不一樣類型交換器的示例,本人水平有限,不免有錯誤之處,歡迎大佬斧正~php

服務器環境 Ubuntu 18.04.5 LTS
PHP 7.2.24
RabbitMQ 3.6.10
php-amqplib 2.7nginx

0.安裝(順手補一下安裝過程吧...友好一點點)

apt-get install erlang-nox
apt-get install rabbitmq-server
rabbitmqctl add_user  admin  admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'

//開啓web管理頁面
//cd到安裝目錄 我這裏是/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10
rabbitmq-plugins enable rabbitmq_management

cd 到項目目錄
composer require php-amqplib/php-amqplib

在須要使用的地方
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

1.各方法參數

//1.1 創建鏈接
$conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
參數:
$host:      RabbitMQ服務器主機IP地址
$port:      RabbitMQ服務器端口
$user:      鏈接RabbitMQ服務器的用戶名
$password:  鏈接RabbitMQ服務器的用戶密碼
$vhost:     鏈接RabbitMQ服務器的vhost(服務器能夠有多個vhost,虛擬主機,相似nginx的vhost)
//1.2 創建信道
$channel = $conn->channel($channel_id); 
參數:
$channel_id 信道id,不傳則獲取$channel[「」]信道,再無則循環$this->channle數組,下標從1到最大信道數找第一個不是AMQPChannel對象的下標,實例化並返回AMQPChannel對象,無則拋出異常No free channel ids
//1.3 聲明交換器
$channel->exchange_declare($exhcange_name, $type, $passive, $durable, $auto_delete);
參數:
$exhcange_name 交換器名字
$type          交換器類型
$passive       是否檢測同名隊列
$durable       交換機是否開啓持久化
$auto_detlete  通道關閉後是否刪除隊列
(1)交換器類型
枚舉 [
    direct: (默認)直接交換器,工做方式相似於單播,Exchange會將消息發送徹底匹配ROUTING_KEY的Queue,
    fanout: 廣播是式交換器,無論消息的ROUTING_KEY設置爲何,Exchange都會將消息轉發給全部綁定的Queue,
    topic:  主題交換器,工做方式相似於組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的全部隊列,好比,ROUTING_KEY爲user.stock的Message會轉發給綁定匹配模式爲 * .stock,user.stock, * . * 和#.user.stock.#的隊列。(* 表是匹配一個任意詞組,#表示匹配0個或多個詞組),
    headers:根據消息體的header匹配
    
]
//1.4 聲明隊列
$channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete);
參數:
$queue_name  隊列名稱
$passive     是否檢測同名隊列
$durable     是否開啓隊列持久化
$exclusive   隊列是否能夠被其餘隊列訪問
$auto_delete 通道關閉後是否刪除隊列
//1.5 建立要發送的信息 ,能夠建立多個消息
$msg = new AMQPMessage($data, $properties)
$data             要發送的消息
$properties Array 設置的屬性,好比設置該消息持久化['delivery_mode'=>2]
//單個發送
$channel->basic_publish($msg,
        $exchange = '',
        $routing_key = '',
        $mandatory = false,
        $immediate = false,
        $ticket = null);
參數:
$msg         消息內容
$exchange    交換器
$routing_key routing_key
$mandatory   匹配不到隊列時,是否當即丟棄消息
$immediate   隊列無消費者時,是否當即丟棄消息
$ticket      這個俺也不知道 坐等大佬

//多個發送
1.屢次調用 $channel->batch_basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null)
內部實現:往$this->batch_messages[]塞
2.再調用一次$channel->publish_batch(),完成發送
1.6 路由綁定
$channel->queue_bind(
        $queue,
        $exchange,
        $routing_key = '',
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )
參數:
$queue       隊列名
$exchange    交換器名
$routing_key routing_key
$nowait      同上 俺也不知
$arguments
$ticket
1.7 消費消息
$channel->basic_consume(
        $queue = '',
        $consumer_tag = '',
        $no_local = false,
        $no_ack = false,
        $exclusive = false,
        $nowait = false,
        $callback = null,
        $ticket = null,
        $arguments = array()
    )
參數:
$queue        隊列名
$consumer_tag 
$no_local 
$no_ack       是否不須要手動ack:true就是不須要ack|false須要手動ack
$exclusive
$nowait
$callback     消息回調函數
$ticket
$arguments
1.8 手動ack 示例
$callback = function($msg) {
            sleep($msg->body);
            echo " [x] Received sleep ", $msg->body, "\n";
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            echo " [x] Ack "."\n";
        };
1.9 限制分發 示例
限制RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢後,有了反饋,纔會進行第二次發送。
$channel->basic_qos(null,1,null);

2.經常使用場景

2.0 無交換器 直接隊列

這裏也順手補一下最基礎的使用,鏈接參數做爲demo就直接寫死了哈~web

public function send()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        $channel->queue_declare('hello', false, false, false, false);
        $msg = new AMQPMessage('Hello World!');
        $channel->basic_publish($msg, '', 'hello');
        echo " [x] Sent 'Hello World!'\n";
        $channel->close();
        $connection->close();
    }
    public function consume()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        $channel->queue_declare('hello', false, false, false, false);

        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
        $callback = function($msg) {
            echo " [x] Received ", $msg->body, "\n";
        };
        $channel->basic_consume('hello', '', false, true, false, false, $callback);
        while(count($channel->callbacks)) {
            $channel->wait();
        }
    }
2.1 工做隊列 按消費能力分發
生產者和消費者均增長
$channel->basic_qos(null,1,null);
便可。
2.2 fanout廣播示例 註冊行爲
例如註冊後須要發送歡迎短信和郵件,將註冊行爲廣播至短信和郵件

生產者
//定義交換器
$channel->exchange_declare('register','fanout',false,false,false);
$msg = new AMQPMessage('register event');
$channel->basic_publish($msg, 'register');

註冊短信消費者
$channel->exchange_declare('register','fanout',false,false,false);
$channel->queue_declare('register.sms', false, false, false, false);
$channel->queue_bind('register.sms', 'register');

註冊郵件消費者
$channel->exchange_declare('register','fanout',false,false,false);
$channel->queue_declare('register.mail', false, false, false, false);
$channel->queue_bind('register.mail', 'register');
2.3 topic類型 模糊匹配示例 日誌分級
例如我想一個消費者接受全部日誌,一個消費者只接收Error級別日誌

生產者
//定義交換器
$channel->exchange_declare('log','topic',false,false,false);
$num = rand(0,10);
if ($num%3 == 0) {
    $level = 'error';
}elseif($num%3 == 1){
    $level = 'warning';
}else{
    $level = 'common';
}
$msg = new AMQPMessage('log event '.$level);
$channel->basic_publish($msg, 'log', 'log.'.$level);

全量日誌消費者
$channel->exchange_declare('log','topic',false,false,false);
$channel->queue_declare('log.all', false, false, false, false);
$channel->queue_bind('log.all', 'log', 'log.*');

Error日誌消費者
$channel->exchange_declare('log','topic',false,false,false);
$channel->queue_declare('log.error', false, false, false, false);
$channel->queue_bind('log.error', 'log', 'log.error');
2.4 headers類型 匹配示例 日誌分級
例如我想一個消費者接受全部日誌,一個消費者只接收Error級別日誌

生產者
//定義交換器
$channel->exchange_declare('log2','headers',false,false,false);
$num = rand(0,10);
if ($num%3 == 0) {
    $level = 'error';
}elseif($num%3 == 1){
    $level = 'warning';
}else{
    $level = 'common';
}
$msg = new AMQPMessage('log2 event '.$level);

$bindArguments = [
    'level' => $level,
    'type'  => 'log'
 ]; 
$headers = new AMQPTable($bindArguments);
$msg->set('application_headers', $bindArguments);    

$channel->basic_publish($msg, 'log2');

全量日誌消費者
$channel->exchange_declare('log2','headers',false,false,false);
$channel->queue_declare('log2.all', false, false, false, false);

$bindArguments = [
    'type'  => 'log',
    //'x-match' => 'any' //默認any
]; 
$headers = new AMQPTable($bindArguments);
$channel->queue_bind('log2.all', 'log2', '', false, $headers);

Error日誌消費者
$channel->exchange_declare('log2','headers',false,false,false);
$channel->queue_declare('log2.error', false, false, false, false);

$bindArguments = [
    'type'    => 'log',
    'level'   => 'error',
    'x-match' => 'all' //默認any
]; 
$headers = new AMQPTable($bindArguments);
$channel->queue_bind('log2.error', 'log2', '', false, $headers);
2.5死信隊列
//2.5.1 定義一個沒有消費者,5s後消息過時的隊列
//生產者
$arguments = new AMQPTable([
    'x-dead-letter-exchange'    => 'dead',
    'x-message-ttl'             => 5000, //消息存活時間毫秒
    'x-dead-letter-routing-key' => 'dead'
]);

//定義隊列 不要交換器
$channel->queue_declare('no_consume', false, false, false, false, false, $arguments);
$now = time();
$msg = new AMQPMessage($now);        
$channel->basic_publish($msg, '', 'no_consume');
echo " [x] Sent no_consume :".date('Y-m-d H:i:s',$now)."\n";
$channel->close();
$connection->close();

//消費者
$channel->exchange_declare('dead','topic',false,false,false);
$channel->queue_declare('dead.all', false, false, false, false);
$channel->queue_bind('dead.all', 'dead', 'dead');

$channel->basic_qos(null,1,null);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
    var_dump('msg:'.date('Y-m-d H:i:s',$msg->body));
    var_dump('now:'.date('Y-m-d H:i:s'));
    echo " [x] Received log error ", $msg->body, "\n";
};
$channel->basic_consume('dead.all', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}

image

相關文章
相關標籤/搜索