用 PHP 收發 RabbitMQ 消息

AMQP php

擴展的安裝參照《給 函數

PHP 測試

安裝 對象

amqp 隊列

擴展》 事務

 

 

消費者:接收消息 路由

 

邏輯: get

建立鏈接 回調函數

--> it

建立

channel-->

建立交換機

-->

建立隊列

-->

綁定交換機

/

隊列

/

路由鍵

-->

接收

消息

 

<?php    

/************************************* 

 * PHP amqp(RabbitMQ) Demo - consumer 

 * Author: Linvo 

 * Date: 2012/7/30 

 *************************************/  

//

配置信息

  

$conn_args = array(  

    

'host' => '192.168.1.93',   

    

'port' => '5672',   

    

'login' => 'guest',   

    

'password' => 'guest',  

    

'vhost'=>'/'  

);    

$e_name = 'e_linvo'; //

交換機名

  

$q_name = 'q_linvo'; //

隊列名

  

$k_route = 'key_1'; //

路由

key  

  

//

建立鏈接和

channel  

$conn = new AMQPConnection($conn_args);    

if (!$conn->connect()) {    

    

die("Cannot connect to the broker!\n");    

}    

$channel = new AMQPChannel($conn);    

  

//

建立交換機

     

$ex = new AMQPExchange($channel);    

$ex->setName($e_name);  

$ex->setType(AMQP_EX_TYPE_DIRECT); //direct

類型

   

$ex->setFlags(AMQP_DURABLE); //

持久化

  

echo "Exchange Status:".$ex->declare()."\n";    

    

//

建立隊列

     

$q = new AMQPQueue($channel);  

$q->setName($q_name);    

$q->setFlags(AMQP_DURABLE); //

持久化

   

echo "Message Total:".$q->declare()."\n";    

  

//

綁定交換機與隊列,並指定路由鍵

  

echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";  

  

//

阻塞模式接收消息

  

echo "Message:\n";    

while(True){  

    

$q->consume('processMessage');    

    

//$q->consume('processMessage', AMQP_AUTOACK); //

自動

ACK

應答

   

}  

$conn->disconnect();    

  

/** 

 * 

消費回調函數

 

 * 

處理消息

 

 */  

function processMessage($envelope, $queue) {  

    

$msg = $envelope->getBody();  

    

echo $msg."\n"; //

處理消息

  

    

$queue->ack($envelope->getDeliveryTag()); //

手動發送

ACK

應答

  

}  

 

生產者:發送消息

 

邏輯:建立鏈接

-->

建立

channel-->

建立交換機對象

-->

發送消息

 

<?php    

/************************************* 

 * PHP amqp(RabbitMQ) Demo - publisher 

 * Author: Linvo 

 * Date: 2012/7/30 

 *************************************/  

//

配置信息

  

$conn_args = array(  

    

'host' => '192.168.1.93',   

    

'port' => '5672',   

    

'login' => 'guest',   

    

'password' => 'guest',  

    

'vhost'=>'/'  

);    

$e_name = 'e_linvo'; //

交換機名

  

//$q_name = 'q_linvo'; //

無需隊列名

  

$k_route = 'key_1'; //

路由

key  

  

//

建立鏈接和

channel  

$conn = new AMQPConnection($conn_args);    

if (!$conn->connect()) {    

    

die("Cannot connect to the broker!\n");    

}    

$channel = new AMQPChannel($conn);    

  

//

消息內容

  

$message = "TEST MESSAGE! 

測試消息!

";    

  

//

建立交換機對象

     

$ex = new AMQPExchange($channel);    

$ex->setName($e_name);    

  

//

發送消息

  

//$channel->startTransaction(); //

開始事務

   

for($i=0; $i<5; ++$i){  

    

echo "Send Message:".$ex->publish($message, $k_route)."\n";   

}  

//$channel->commitTransaction(); //

提交事務

  

  

$conn->disconnect();    

 

須要注意的地方是:

 

queue

對象有兩個方法可用於取消息:

consume

get

 

前者是阻塞的,無消息時會被掛起,適合循環中使用;

 

後者則是非阻塞的,取消息時有則取,無則返回

false

 

 

測試截圖:

 

運行消費者,收消息:

 

 

 

運行生產者,發消息:

 

 

消費者接收到消息:

 

 

 

 

相關文章
相關標籤/搜索