1: 鏈接rabbitmq 新建exchange和queuephp
amqp_manager.phpjson
<?php
$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
echo "Established a connection to the broker \n";
}
else {
echo "Cannot connect to the broker \n ";
exit(0);
}
$channel = new AMQPChannel($conn);服務器
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE) ;
$exchange->declare(); // 聲明一個名爲 lizhifeng的 路由器
// 添加一個名爲queue1 的隊列並綁定 key1
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();
$queue->bind('lizhifeng','key1');
// 添加一個名爲queue2 的隊列並綁定 key2
$queue = new AMQPQueue($channel);
$queue->setName('queue2');
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();
$queue->bind('lizhifeng','key2');
// 將queue1 綁定到key3 注意key3不會覆蓋key1
// 而是key1和key3將同時生效
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
##$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); 無需重複設置隊列queue1的屬性
##$queue->declare(); 這裏不須要再重複申明瞭
$queue->bind('lizhifeng','key3');
/*
// 刪除exchange
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->delete();
// 刪除隊列
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->delete();
$queue = new AMQPQueue($channel);
$queue->setName('queue2');
$queue->delete();
*/
?>spa
2:鏈接rabbitmq 往exchange中寫消息code
amqp_server.phpserver
<?php
$routingkey='key1';
$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
echo "Established a connection to the broker \n";
}
else {
echo "Cannot connect to the broker \n ";
}
$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
for($i=0;$i<100000;$i++)
{
if($routingkey=='key1')
{
$routingkey='key2'; // 路由到隊列queue2
}
else if($routingkey=='key2')
{
$routingkey='key3'; // 路由到隊列queue1
}
else
{
$routingkey='key1'; // 路由到隊列queue1
}
$tmp=array();
$tmp[]="第".$i."個消息的key爲".$routingkey ;
$message = json_encode($tmp);
if($exchange->publish($message,$routingkey))
{
print $routingkey."\tok\n";
}
else
{
print "error\n" ;
}
}rabbitmq
3:鏈接rabbitmq消費消息隊列
amqp_client.php路由
<?php
//鏈接RabbitMQ
$conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
echo "Established a connection to the broker \n";
}
else {
echo "Cannot connect to the broker \n ";
exit();
}
$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('queue1');
// 這裏並非建立新的隊列,只是鏈接到名爲quene1的隊列
// 個人理解爲隊列其實在服務器上,消息已經被路由到不一樣的隊列了, 咱們只需取消息
while($messages = $q->get(AMQP_AUTOACK))
{
var_dump(json_decode($messages->getBody(), true ));
}
$q = new AMQPQueue($channel);
$q->setName('queue2');
while($messages = $q->get(AMQP_AUTOACK))
{
var_dump(json_decode($messages->getBody(), true ));
}
$conn->disconnect();
?>
get