RabbitMQ與PHP(一)

RabbitMQ與PHP(一)

項目中使用RabbitMQ做爲隊列處理用戶消息通知,消息由前端PHP代碼產生,處理消息使用Python,這就致使代碼一致性問題,調整消息定義時須要PHP和Python都進行修改。這兩天抽時間研究了下,如何將消息的產生與處理(消費)所有用PHP來作。查資料時發現,關於PHP處理消息隊列的資料不多,有必要把一些初學者容易混淆的地方再講一下。html

擬分紅兩部分: 一,RabbitMQ的原理與操做示例;二,具體服務安裝及如何用PHP做爲守護模式處理消息。前端

RabbitMQ是流行的開源消息隊列系統,用erlang語言開發,完整的實現了AMPQ(高級消息隊列協議)。網站在: http://www.rabbitmq.com/ 上面有教程和實例代碼(Python和Java的)。python

AMPQ協議爲了可以知足各類消息隊列需求,在概念上比較複雜。首先,rabbitMQ啓動默認是沒有任何配置的,須要客戶端鏈接上去,設置交換機等才能工做。不把這些基礎概念弄清楚,後面程序設計就容易產生問題。後端

1,vhosts : 虛擬主機

一個RabbitMQ的實體上能夠有多個vhosts,用戶與權限設置就是依附於vhosts。對通常PHP應用,不須要用戶權限設定,直接使用默認就存在的"/"就能夠了,用戶可使用默認就存在的"guest"。一個簡單的配置示例:數組

$conn_args = array( 
    'host' => '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/'
);

2,connection 與 channel : 鏈接與信道

connection是指物理的鏈接,一個client與一個server之間有一個鏈接;一個鏈接上能夠創建多個channel,能夠理解爲邏輯上的鏈接。通常應用的狀況下,有一個channel就夠用了,不須要建立更多的channel。示例代碼:多線程

//建立鏈接和channel 
$conn = new AMQPConnection($conn_args);   
if (!$conn->connect()) {   
    die("Cannot connect to the broker!\n");   
}   
$channel = new AMQPChannel($conn);  
##3,exchange 與  routingkey : 交換機 與 路由鍵##

爲了將不一樣類型的消息進行區分,設置了交換機與路由兩個概念。好比,將A類型的消息發送到名爲‘C1’的交換機,將類型爲B的發送到'C2'的交換機。當客戶端鏈接C1處理隊列消息時,取到的就只是A類型消息。進一步的,若是A類型消息也很是多,須要進一步細化區分,好比某個客戶端只處理A類型消息中針對K用戶的消息,routingkey就是來作這個用途的。函數

$e_name = 'e_linvo'; //交換機名 
$k_route = array(0=> 'key_1', 1=> 'key_2'); //路由key 
//建立交換機    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name); 
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declare()."\n";   
for($i=0; $i<5; ++$i){ 
    echo "Send Message:".$ex->publish($message . date('H:i:s'), $k_route[i%2])."\n";  
}

由以上代碼能夠看到,發送消息時,只要有「交換機」就夠了。至於交換機後面有沒有對應的處理隊列,發送方是不用管的。routingkey能夠是空的字符串。在示例中,我使用了兩個key交替發送消息,是爲了下面更便於理解routingkey的做用。網站

對於交換機,有兩個重要的概念:spa

A,類型。有三種類型: Fanout類型最簡單,這種模型忽略routingkey;Direct類型是使用最多的,使用肯定的routingkey。這種模型下,接收消息時綁定'key_1'則只接收key_1的消息;最後一種是Topic,這種模式與Direct相似,可是支持通配符進行匹配,好比: 'key_*',就會接受key_1和key_2。Topic貌似美好,可是有可能致使不嚴謹,因此仍是推薦使用Direct。線程

B,持久化。指定了持久化的交換機,在從新啓動時才能重建,不然須要客戶端從新聲明生成才行。

須要特別明確的概念:交換機的持久化,並不等於消息的持久化。只有在持久化隊列中的消息,才能持久化;若是沒有隊列,消息是沒有地方存儲的;消息自己在投遞時也有一個持久化標誌的,PHP中默認投遞到持久化交換機就是持久的消息,不用特別指定。

4,queue: 隊列

講了這麼多,纔講到隊列呀。事實上,隊列僅是針對接收方(consumer)的,由接收方根據需求建立的。只有隊列建立了,交換機纔會將新接受到的消息送到隊列中,交換機是不會在隊列建立以前的消息放進來的。換句話說,在創建隊列以前,發出的全部消息都被丟棄了。下面這個圖比RabbitMQ官方的圖更清楚——Queue是屬於ReceiveMessage的一部分。

接下來看一下建立隊列及接收消息的示例:

$e_name = 'e_linvo'; //交換機名 
$q_name = 'q_linvo'; //隊列名 
$k_route = ''; //路由key 

//建立鏈接和channel 
$conn = new AMQPConnection($conn_args);   
if (!$conn->connect()) {   
    die("Cannot connect to the broker!\n");   
}   
$channel = new AMQPChannel($conn);   

//建立交換機    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name); 
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declare()."\n";   

//建立隊列    
$q = new AMQPQueue($channel); 
$q->setName($q_name);   
$q->setFlags(AMQP_DURABLE); //持久化  

//綁定交換機與隊列,並指定路由鍵 
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; 

//阻塞模式接收消息 
echo "Message:\n";   
$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答  

$conn->disconnect();   

/**
 * 消費回調函數
 * 處理消息
 */
function processMessage($envelope, $queue) { 
    var_dump($envelope->getRoutingKey);
    $msg = $envelope->getBody(); 
    echo $msg."\n"; //處理消息 
}

從上述示例中能夠看到,交換機既能夠由消息發送端建立,也能夠由消息消費者建立。

建立一個隊列(line:20)後,須要將隊列綁定到交換機上(line:25)隊列才能工做,routingkey也是在這裏指定的。有的資料上寫成bindingkey,其實一回事兒,弄兩個名詞反倒容易混淆。

消息的處理,是有兩種方式:

A,一次性。用 $q->get([...]),無論取到取不到消息都會當即返回,通常狀況下使用輪詢處理消息隊列就要用這種方式;

B,阻塞。用 $q->consum( callback, [...] ) 程序會進入持續偵聽狀態,每收到一個消息就會調用callback指定的函數一次,直到某個callback函數返回FALSE才結束。

關於callback,這裏多說幾句: PHP的call_back是支持使用數組的,好比: $c = new MyClass(); $c->counter = 100; $q->consume( array($c,'myfunc') ) 這樣就能夠調用本身寫的處理類。MyClass中myfunc的參數定義,與上例中processMessage同樣就行。

在上述示例中,使用的$routingkey = '', 意味着接收所有的消息。咱們能夠將其改成 $routingkey = 'key_1',能夠看到結果中僅有設置routingkey爲key_1的內容了。

注意: routingkey = 'key_1' 與 routingkey = 'key_2' 是兩個不一樣的隊列。假設: client1 與 client2 都鏈接到 key_1 的隊列上,一個消息被client1處理以後,就不會被client2處理。而 routingkey = '' 是另類,client_all綁定到 '' 上,將消息全都處理後,client1和client2上也就沒有消息了。

在程序設計上,須要規劃好exchange的名稱,以及如何使用key區分開不一樣類型的標記,在消息產生的地方插入發送消息代碼。後端處理,能夠針對每個key啓動一個或多個client,以提升消息處理的實時性。如何使用PHP進行多線程的消息處理,將在下一節中講述。

更多消息模型,能夠參考: http://www.rabbitmq.com/tutorials/tutorial-two-python.html

相關文章
相關標籤/搜索