php操做rabbitmq教程

1: 鏈接rabbitmq 新建exchange和queuephp

amqp_manager.phpjson

<?php
$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
    exit(0);
}

$channel = new AMQPChannel($conn);
服務器


$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE) ;
$exchange->declare();     // 聲明一個名爲 lizhifeng的 路由器

// 添加一個名爲queue1  的隊列並綁定 key1
$queue = new AMQPQueue($channel);
$queue->setName('queue1');    
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();  
$queue->bind('lizhifeng','key1');


// 添加一個名爲queue2  的隊列並綁定 key2
$queue = new AMQPQueue($channel);
$queue->setName('queue2');    
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();  
$queue->bind('lizhifeng','key2');

// 將queue1 綁定到key3    注意key3不會覆蓋key1   
// 而是key1和key3將同時生效
$queue = new AMQPQueue($channel);
$queue->setName('queue1');    
##$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); 無需重複設置隊列queue1的屬性
##$queue->declare();   這裏不須要再重複申明瞭
$queue->bind('lizhifeng','key3');


/*
// 刪除exchange
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->delete();

// 刪除隊列
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->delete();
$queue = new AMQPQueue($channel);
$queue->setName('queue2');
$queue->delete();
*/
?>
spa

2:鏈接rabbitmq 往exchange中寫消息code

amqp_server.phpserver

<?php
$routingkey='key1';

$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
}

$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');

for($i=0;$i<100000;$i++)
{
        if($routingkey=='key1')
    {
                $routingkey='key2';    // 路由到隊列queue2
        }
    else if($routingkey=='key2')
    {
                $routingkey='key3';    // 路由到隊列queue1
        }
    else
    {
        $routingkey='key1';    // 路由到隊列queue1
    }
    $tmp=array();
    $tmp[]="第".$i."個消息的key爲".$routingkey ;
    $message = json_encode($tmp);
        if($exchange->publish($message,$routingkey))
    {
        print $routingkey."\tok\n";
    }
    else
    {
        print "error\n" ;
    }
}
rabbitmq

3:鏈接rabbitmq消費消息隊列

amqp_client.php路由

<?php

//鏈接RabbitMQ
$conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/');
$conn = new AMQPConnection($conn_args);

if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
    exit();
}


$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('queue1');    
// 這裏並非建立新的隊列,只是鏈接到名爲quene1的隊列
// 個人理解爲隊列其實在服務器上,消息已經被路由到不一樣的隊列了, 咱們只需取消息
while($messages = $q->get(AMQP_AUTOACK))
{
    var_dump(json_decode($messages->getBody(), true ));
}

$q = new AMQPQueue($channel);
$q->setName('queue2');    
while($messages = $q->get(AMQP_AUTOACK))
{
    var_dump(json_decode($messages->getBody(), true ));
}

$conn->disconnect();
?>

get

相關文章
相關標籤/搜索