RabbitMQ是一個在AMQP基礎上實現的企業級消息系統。何謂消息系統,就是消息隊列系統,消息隊列是「」消費-生產者模型「」的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取或者訂閱隊列中的消息。php
在項目中,將一些無需即時返回且耗時的操做提取出來,進行了異步操做,而這種異步處理的方式大大的節省了服務器的請求時間,從而提升了系統的吞吐量。並且不影響服務器作其餘相應,不獨佔服務器資源。html
如:註冊用戶這種服務,它可能解耦成好幾種獨立的服務(帳號驗證,郵箱驗證碼,手機短信碼等)。它們做爲消費者,等待用戶輸入數據,在前臺數據提交以後會通過分解併發送到各個服務所在的url,分發的那個角色就至關於生產者。消費者在獲取數據時候有可能一次不能處理完,那麼它們各自有一個請求隊列,那就是內存緩衝區了。作這項工做的框架叫作消息隊列。node
又好比:電商系統中的訂單處理系統,傳統處理模式是:下訂單的時候,訂單系統可能會調用庫存系統的接口,這樣兩個系統之間存在一個嚴重依賴關係,若是庫存系統宕機,那麼整個流程都會受到影響。如今大多公司的處理方法是:引入消息隊列,下完訂單,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。git
對庫存系統來講,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做。這樣實現了兩個系統間的解耦。github
即便在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。vim
給一張結構圖:瀏覽器
brew install rabbitmq
安裝完成的界面以下:安全
RabbitMQ安裝後的路徑爲:/usr/local/Cellar/rabbitmq/3.7.14服務器
進入該目錄(/usr/local/Cellar/rabbitmq/3.7.14)以後,輸入 ./sbin/rabbitmq-server 指令啓動:併發
此時在瀏覽器輸入http://localhost:15672 便可進入rabbitmq控制終端登陸頁面,默認用戶名和密碼爲 guest/guest.
備註:若是啓動rabbitmq,提示ERROR: node with name "rabbit" already running on "localhost"
ps aux|grep epmd ps aux|grep erl kill -9 4519
下面開始安裝php擴展
php中的rabbitmq 擴展是amqp ,而amqp依賴於rabbitmq-c ,首先須要安裝rabbitmq-c
brew install rabbitmq-c
安裝PECL
//php版本 > 7 $ wget http://pear.php.net/go-pear.phar $ php go-pear.phar //php版本 < 7 $ yum install php-pear //不然會報錯PHP Parse error: syntax error, unexpected //'new' (T_NEW) in /usr/share/pear/PEAR/Frontend.php on //line 91
使用 pecl 安裝amqp:
pear install amqp-1.9.3.tgz
直至出現 Set the path to librabbitmq install prefix [autodetect] ,默認直接回車便可。
查看php.ini裏面是否有extension=amqp.so,若是沒有則手動添加:
sudo vim /usr/local/etc/php/7.1/php.ini
重啓php-fpm
sudo killall php-fpm sudo /usr/local/Cellar/php@7.1/7.1.10_21/sbin/php-fpm -D
打開 phpinfo() 搜索 amqp 能夠看到已經有這個可擴展了。
測試代碼:
生產者:
//配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_meng'; //交換機名 //$q_name = 'q_meng'; //無需隊列名 $k_route = 'key_meng'; //路由key //建立鏈接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //建立交換機對象 $ex = new AMQPExchange($channel); $ex->setName($e_name); date_default_timezone_set("Asia/Shanghai"); //發送消息 //$channel->startTransaction(); //開始事務 for($i=0; $i<5; ++$i){ sleep(1);//休眠1秒 //消息內容 $message = "TEST MESSAGE!".date("h:i:sa"); echo "Send Message:".$ex->publish($message, $k_route)."\n"; } //$channel->commitTransaction(); //提交事務 $conn->disconnect();
消費者:
//配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_meng'; //交換機名 $q_name = 'q_meng'; //隊列名 $k_route = 'key_meng'; //路由key //建立鏈接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //建立交換機 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declareExchange()."\n"; //建立隊列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declareQueue()."\n"; //綁定交換機與隊列,並指定路由鍵 echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; //阻塞模式接收消息 echo "Message:\n"; while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 } $conn->disconnect(); /** * 消費回調函數 * 處理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //處理消息 $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答 }
測試代碼能夠分兩部分,注意打開rabbitmq進程,而後運行消費者代碼和生產者代碼,能夠看到當有消息進入隊列,消費者會將消息不斷打印出來,能夠更好理解的使用消息隊列的解耦、消峯等優勢。實際項目中能夠參考測試代碼進一步封裝處理。
rabbitmq官網:http://www.rabbitmq.com/tutorials/tutorial-one-php.html
不過咱們通常不本身寫,可使用composer 去集成一下第三方:php-amqplib