Mac搭建PHP+rabbitMQ環境

RabbitMQ是一個在AMQP基礎上實現的企業級消息系統。何謂消息系統,就是消息隊列系統,消息隊列是「」消費-生產者模型「」的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取或者訂閱隊列中的消息。php

在項目中,將一些無需即時返回且耗時的操做提取出來,進行了異步操做,而這種異步處理的方式大大的節省了服務器的請求時間,從而提升了系統的吞吐量。並且不影響服務器作其餘相應,不獨佔服務器資源。html

如:註冊用戶這種服務,它可能解耦成好幾種獨立的服務(帳號驗證,郵箱驗證碼,手機短信碼等)。它們做爲消費者,等待用戶輸入數據,在前臺數據提交以後會通過分解併發送到各個服務所在的url,分發的那個角色就至關於生產者。消費者在獲取數據時候有可能一次不能處理完,那麼它們各自有一個請求隊列,那就是內存緩衝區了。作這項工做的框架叫作消息隊列。node

又好比:電商系統中的訂單處理系統,傳統處理模式是:下訂單的時候,訂單系統可能會調用庫存系統的接口,這樣兩個系統之間存在一個嚴重依賴關係,若是庫存系統宕機,那麼整個流程都會受到影響。如今大多公司的處理方法是:引入消息隊列,下完訂單,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。git

對庫存系統來講,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做。這樣實現了兩個系統間的解耦。github

即便在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。vim

 給一張結構圖:瀏覽器

幾個概念說明:
   Broker:簡單來講就是消息隊列服務器實體。
   Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
   Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
   Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
   Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
   vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
   producer:消息生產者,就是投遞消息的程序。
   consumer:消息消費者,就是接受消息的程序。
   channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
 
消息隊列的使用過程大概以下:
  (1)客戶端鏈接到消息隊列服務器,打開一個channel。
  (2)客戶端聲明一個exchange,並設置相關屬性。
  (3)客戶端聲明一個queue,並設置相關屬性。
  (4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
  (5)客戶端投遞消息到exchange。
 
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
exchange也有幾個類型,徹底根據key進行投遞的叫作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。對key進行模式匹配後進行投遞的叫作Topic交換機,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。還有一種不須要key的,叫作Fanout交換機,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。
RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,爲了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
  (1)exchange持久化,在聲明時指定durable => 1
  (2)queue持久化,在聲明時指定durable => 1
  (3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
 
若是exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。若是exchange和queue二者之間有一個持久化,一個非持久化,就不容許創建綁定。
 
 
在Mac下可使用brew方法安裝:
brew install rabbitmq

安裝完成的界面以下:安全

 

RabbitMQ安裝後的路徑爲:/usr/local/Cellar/rabbitmq/3.7.14服務器

進入該目錄(/usr/local/Cellar/rabbitmq/3.7.14)以後,輸入 ./sbin/rabbitmq-server 指令啓動:併發

此時在瀏覽器輸入http://localhost:15672 便可進入rabbitmq控制終端登陸頁面,默認用戶名和密碼爲 guest/guest.

 

備註:若是啓動rabbitmq,提示ERROR: node with name "rabbit" already running on "localhost"

ps aux|grep epmd
ps aux|grep erl
kill -9 4519

 下面開始安裝php擴展

php中的rabbitmq 擴展是amqp ,而amqp依賴於rabbitmq-c ,首先須要安裝rabbitmq-c

brew install rabbitmq-c

安裝PECL

//php版本 > 7
$ wget http://pear.php.net/go-pear.phar
$ php go-pear.phar

//php版本 < 7
$ yum install php-pear
//不然會報錯PHP Parse error:  syntax error, unexpected //'new' (T_NEW) in /usr/share/pear/PEAR/Frontend.php on //line 91

使用 pecl 安裝amqp:

pear install amqp-1.9.3.tgz

 直至出現 Set the path to librabbitmq install prefix [autodetect] ,默認直接回車便可。

查看php.ini裏面是否有extension=amqp.so,若是沒有則手動添加:

sudo vim /usr/local/etc/php/7.1/php.ini

重啓php-fpm

sudo killall php-fpm
sudo /usr/local/Cellar/php@7.1/7.1.10_21/sbin/php-fpm -D

打開 phpinfo()  搜索 amqp 能夠看到已經有這個可擴展了。

 


 

測試代碼:

生產者:

//配置信息
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
 
$e_name = 'e_meng'; //交換機名
//$q_name = 'q_meng'; //無需隊列名
$k_route = 'key_meng'; //路由key
 
//建立鏈接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
 
 
//建立交換機對象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
date_default_timezone_set("Asia/Shanghai");
//發送消息
//$channel->startTransaction(); //開始事務
for($i=0; $i<5; ++$i){
    sleep(1);//休眠1秒
    //消息內容
    $message = "TEST MESSAGE!".date("h:i:sa");
    echo "Send Message:".$ex->publish($message, $k_route)."\n";
}
//$channel->commitTransaction(); //提交事務
 
$conn->disconnect();

消費者:

//配置信息 
$conn_args = array( 
    'host' => '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
);   
$e_name = 'e_meng'; //交換機名 
$q_name = 'q_meng'; //隊列名 
$k_route = 'key_meng'; //路由key 
 
//建立鏈接和channel 
$conn = new AMQPConnection($conn_args);   
if (!$conn->connect()) {   
    die("Cannot connect to the broker!\n");   
}   
$channel = new AMQPChannel($conn);   
 
//建立交換機    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name); 
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declareExchange()."\n";   
   
//建立隊列    
$q = new AMQPQueue($channel); 
$q->setName($q_name);   
$q->setFlags(AMQP_DURABLE); //持久化  
echo "Message Total:".$q->declareQueue()."\n";   
 
//綁定交換機與隊列,並指定路由鍵 
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; 
 
//阻塞模式接收消息 
echo "Message:\n";   
while(True){ 
    $q->consume('processMessage');   
    //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答  
} 
$conn->disconnect();   
 
/**
 * 消費回調函數
 * 處理消息
 */ 
function processMessage($envelope, $queue) { 
    $msg = $envelope->getBody(); 
    echo $msg."\n"; //處理消息 
    $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答 
}

測試代碼能夠分兩部分,注意打開rabbitmq進程,而後運行消費者代碼和生產者代碼,能夠看到當有消息進入隊列,消費者會將消息不斷打印出來,能夠更好理解的使用消息隊列的解耦、消峯等優勢。實際項目中能夠參考測試代碼進一步封裝處理。

rabbitmq官網:http://www.rabbitmq.com/tutorials/tutorial-one-php.html

不過咱們通常不本身寫,可使用composer 去集成一下第三方:php-amqplib

相關文章
相關標籤/搜索