PHP RabbitMQ

安裝php

rabbitmq 在 mac 下能夠直接用 brew 安裝
默認安裝在 /usr/local/Cellar/下
命令被軟鏈接加入到了/usr/local/sbin 下,所以能夠把此目錄放到環境變量中,建議加入到~/.bash_profile 中
rabbitmq-server start 開啓服務
端口5672 默認
端口15672 web 端登陸管理端口127.0.0.1:15672
rabbitmq 默認提供的用戶 guest 密碼 guesthtml

管理

中止服務
rabbitmqctl stop
開啓應用 [服務依舊運行]
rabbitmqctl start_app
中止應用 [服務依舊運行]
rabbitmqctl stop_appweb

用戶管理

添加用戶
sudo rabbitmqctl add_user username password
刪除用戶
sudo rabbitmqctl delete_user username
修改密碼
sudo rabbitmqctl change_password username newpassword
清除用戶密碼,禁止用戶登陸
sudo rabbitmqctl clear_password <username>
列出全部用戶
sudo rabbitmqctl list_users
設置用戶角色
rabbitmqctl set_user_tags username tagbash

vhost虛擬主機管理

virtual host只是起到一個命名空間的做用,因此能夠多個user共同使用一個virtual host,'/'這個是系統默認的vhost,就是說當咱們建立一個到rabbitmq的connection時候,它的命名空間是'/',須要注意的是不一樣的命名空間之間的資源是不能訪問的,好比 exchang,queue ,bingding等
建立虛擬主機
sudo rabbitmqctl add_vhost vhostpath
刪除虛擬主機
sudo rabbitmqctl delete_vhost vhostpath
列出全部虛擬主機
sudo rabbitmqctl list_vhosts
列出某個 vhost 的全部用戶和權限
list_permissions [-p vhostpath]
列出某個用戶的全部權限。
list_user_permissions {username}
清除用戶對某個 vhost 的權限。
clear_permissions [-p vhostpath] {username}
設置用戶對某個 virtual host 的權限,若是不指定 vhost,則默認爲「/」 vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang 「." "." ".*"服務器

添加一個管理員代替 guest
rabbitmqctl add_user admin 123456
指定用戶的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*」
分配給用戶指定虛擬主機的權限,雖然是administrator角色,但不對全部虛擬主機都有權限,同樣須要對每一個虛擬主機都受權app

顯示信息
rabbitmqctl list_queues [-p <vhostpath>] [<queueinfoitem> ...]
列出某個 vhost 的全部 queue。
rabbitmqctl list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
列出某個 vhost 的全部 exchange。
rabbitmqctl list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
列出某個 vhost 的全部 binding。
rabbitmqctl list_connections [<connectioninfoitem> ...]
列出 RabbitMQ broker 的全部 connection。
rabbitmqctl list_channels [<channelinfoitem> ...]
列出 RabbitMQ broker 的全部 channel
rabbitmqcrl list_consumers [-p <vhostpath>]
列出某個 vhost 的全部 consumer。composer

基本概念點

1.Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。函數

2.Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Hostui

3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。spa

4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。

5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。

6.Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。

7.Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。

8.Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。

9.Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。

客戶端管理

php端 rabbitmq 客戶端可使用 composer 下的庫

{
    "require": {
        "php-amqplib/php-amqplib": "2.5.*"
    }
}

使用時,用到了這兩個東西

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

開始發送消息

public function handle()
{
    //鏈接到 test_host 虛擬主機,每一個虛擬主機有本身的隊列,交換機...
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'kang', 'a943434603', 'test_host');
    //建立一個 channel
    $channel = $connection->channel();
    //聲明 hello 隊列
    $channel->queue_declare('hello', false, false, false, false);
    //建立一個消息
    $msg = new AMQPMessage(time());
    //把消息推送到默認的交換機中,而且告訴交換機要把消息交給 hello 隊列
    $channel->basic_publish($msg, '', 'hello');
    echo " [x] Sent ".time()."\n";
}

重要概念,消息是保存在交換機中的,當消息存放時指定的隊列存在,交換機會把消息推送到該隊列

消息隊列發送消息給消費者,一個消息發給一個消費者

public function handle()
{
    //鏈接
    $connection = new AMQPStreamConnection('localhost', 5672, 'kang', 'a943434603', "test_host");
    //建立一個 channel
    $channel = $connection->channel();
    //能夠運行這個命令不少次,可是隻有一個隊列會被建立, 在程序中重複將隊列重複聲明一下是種值得推薦的作法,保證隊列存在
    $channel->queue_declare('hello', false, false, false, false);

    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

    $callback = function($msg) {
        echo " [x] Received ", $msg->body, "\n";
        sleep($msg->body);
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };

    //默認狀況下,隊列會把消息公平的分配給各個消費者
    //若是某個消費者腳本處理完成分配給他的消息任務後,會一直空閒
    //另一個消費者腳本處理的消息都很是耗時,這就容易致使消費者腳本得不到合理利用,
    //加入此句話,是告訴隊列,取消把消息公平分配到各個腳本,而是那個腳本空閒,就交給它一個消息任務
    //這樣,合理利用到每個空閒的消費者腳本
    $channel->basic_qos(null, 1, null);

    /**
     * basic_consume 方法    從隊列中讀取數據
     * @param string $queue 指定隊列
     * @param string $consumer_tag
     * @param bool $no_local
     * @param bool $no_ack    消費者處理完消息後,是否不須要告訴隊列已經處理完成,true 不須要 false 須要,
     * true
        默認狀況下,隊列會把消息公平分配到各個消費者中,而後一次性把消息交給消費者,若是消費者處理了一半掛了,那麼消息就丟失了
     * false
        默認狀況下,隊列會把消息公平的分配給各個消費者,而後一個一個的把消息分配到消費者腳本中,腳本處理完成後,告訴隊列,隊列會刪除這個消息,而且接着給下一個消息,
        當腳本掛掉,不會丟失消息,隊列會把未完成的消息分配給其餘消費者
        在 callback 函數中須要加入這句話,處理完後通知隊列能夠刪除消息了
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        未加入這句話,隊列不會刪除已處理完的消息,當腳本掛掉時,會把分配給當前隊列的全部消息再次從新分配給其餘隊列,會致使消息會重複處理
     */
    $channel->basic_consume('hello', '', false, false, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}

發佈/訂閱

一個消息發送給多個消費者

扇形交換機 fanout
發佈訂閱模式,科院實現一個消息發送到多個隊列中
在發佈消息腳本中,建立一個扇形交換機,把消息推送到交換機,不須要推進到指定的隊列中,隊列在消費者腳本中建立
消費腳本定義個臨時隊列,並綁定這個臨時隊列到交換機中,扇形交換機會把接收到的消息推進到每個綁定的隊列中

生產者腳本

public function handle()
{
    //鏈接到 test_host 虛擬主機,每一個虛擬主機有本身的隊列,交換機...
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'kang', 'a943434603', 'test_host');
    //建立一個 channel
    $channel = $connection->channel();
    //聲明 log 隊列
    //$channel->queue_declare('log', false, false, false, false);
    //建立一個fanout類型交換機
    $channel->exchange_declare("logs","fanout",false,false,false);

    //建立一個消息
    $msg = new AMQPMessage( time() );
    $channel->basic_publish ( $msg , 'logs' );

    echo " [x] Sent ".time()."\n";

    $channel->close();
    $connection->close();
}

消費者腳本

public function handle()
{
    //鏈接
    $connection = new AMQPStreamConnection('localhost', 5672, 'kang', 'a943434603', "test_host");
    //建立一個 channel
    $channel = $connection->channel();
    //能夠運行這個命令不少次,可是隻有一個隊列會被建立, 在程序中重複將隊列重複聲明一下是種值得推薦的作法,保證隊列存在
    //$channel->queue_declare('hello', false, false, false, false);
    //建立一個fanout類型交換機
    $channel->exchange_declare('logs', 'fanout', false, false, false);

    //系統建立一個臨時隊列
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    //綁定臨時隊列到交換機上
    $channel->queue_bind($queue_name, 'logs');

    $callback = function($msg){
        echo ' [x] ', $msg->body, "\n";
    };
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}

直連交換機 direct
交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而肯定消息該分發到哪一個隊列

clipboard.png

生產者,建立基本上和扇形交換機同樣,不一樣的是

$channel->exchange_declare("direct_logs","direct",false,false,false);
//建立一個消息
$msg = new AMQPMessage( time() );
//把消息推進到direct_logs交換機,並給消息加上路由 key,讓消費者隊列來根據 key 接收消息
$channel->basic_publish ( $msg , 'direct_logs', "warning" );

消費者

$channel->exchange_declare("direct_logs","direct",false,false,false);
//系統建立一個臨時隊列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
//綁定臨時隊列到交換機上,並指定消息的路由 key
$channel->queue_bind($queue_name, 'direct_logs', 'error');
$channel->queue_bind($queue_name, 'direct_logs', 'warning');

主題交換機 topic

直連交換機中路由 key 匹配模式
(星號*) 用來表示一個單詞.
(井號#) 用來表示任意數量(零個或多個)單詞。

clipboard.png

Q1會接收到 a.orange.b 等key 值中間爲 orange 的消息
Q2會接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息

當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行爲。

生產者,指定路由 key

$channel->exchange_declare("topic_logs","topic",false,false,false);
//建立一個消息
$msg = new AMQPMessage( time() );
//把消息推進到direct_logs交換機,並給消息加上路由 key,讓消費者隊列來根據 key 接收消息
$channel->basic_publish ( $msg , 'topic_logs', "baidu.warning" );

消費者1

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'topic_logs', "baidu.#");

消費者2

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'topic_logs', 'ali.#');

參考

https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/

相關文章
相關標籤/搜索