RabbitMQ消息隊列 基本訂閱/發佈Demo(PHP版)

<?phpphp

/*
* 發佈-訂閱
* create by superid
*/json

  $queueName = 'superid';
  $exchangeName = 'superid';
  $routeKey = 'superid';函數

  $conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
  );
  $connection = new AMQPConnection($conn_args); //建立鏈接
  $connection->connect() or die("Cannot connect to the broker!\n");
  try {
    $channel = new AMQPChannel($connection); // 創建一個 Channel信道fetch

    $exchange = new AMQPExchange($channel); //建立交換機對象
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
    $exchange->setFlags(AMQP_DURABLE); //持久化
    $exchange->declareExchange();spa

    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE); //持久化隊列,並不表示消息也會持久化
    $queue->declareQueue();code

    $queue->bind($exchangeName, $routeKey); //綁定交換機與隊列,並指定路由鍵 對象

    for($i=0; $i<50000; ++$i){ blog

      $message = json_encode(array(
        'id' => $i,
        'name' => 'liming',
        'note' => '這是一個備註',
      ));隊列

      //Direct類型是使用最多的,使用肯定的routingkey。
      //這種模型下,接收消息時綁定'routeKey'則只接收routeKey的消息路由

      $result = $exchange->publish($message,$routeKey);
      var_dump("[{$result}-{$i}]");
    }

  } catch (AMQPConnectionException $e) {
    var_dump($e);
    exit();
  }
  $connection->disconnect();

?>

 

 

<?php
/*
* 發佈-訂閱
* create by superid
*/


$queueName = 'superid';
$exchangeName = 'superid';
$routeKey = 'superid';
$conn_args = array(
  'host' => '127.0.0.1',
  'port' => '5672',
  'login' => 'guest',
  'password' => 'guest',
  'vhost'=>'/'
);
$connection = new AMQPConnection($conn_args);
$connection->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($connection); // 創建一個 Channel信道

//若是Consumer數量不少或者但願每一個Consumer同時只處理一個任務
//能夠經過在Consumer中設置PrefetchCount來實現更加均勻的任務分發。,收到ack應答,纔會發生消息,若是有一條沒有收到ack,則不會發送消息了
$channel->setPrefetchCount(1);

$exchange = new AMQPExchange($channel); //建立交換機
$exchange->setName($exchangeName); //設置交換機的名字
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型 Exchange 類型: direct fanout topic
$exchange->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$exchange->declareExchange()."\n";

$queue = new AMQPQueue($channel); //建立隊列
$queue->setName($queueName); //設置隊列的名稱
$queue->setFlags(AMQP_DURABLE); //設置持久化隊列
echo "Message Total:".$queue->declareQueue()."\n"; //隊列中消息的總量

$queue->bind($exchangeName, $routeKey);

//阻塞模式接收消息

echo "Message:\n";
while(True){
  $queue->consume('processMessage');
  //自動ACK應答
  //$queue->consume('processMessage', AMQP_AUTOACK);
}

$conn->disconnect();

/**
  * 消費回調函數
  * 處理消息
  * *須要注意的地方是:queue對象有兩個方法可用於取消息: consume 和 get 。
  *前者是阻塞的,無消息時會被掛起,適合循環中使用;後者則是非阻塞的,
  *取消息時有則取,無則返回false。
*/
function processMessage($envelope, $queue) {
  $msg = $envelope->getBody();
  echo $msg."\n"; //處理消息
  //手動發送ACK應答, 若是不進行確認,消息會積累愈來愈多,產生嚴重bug
  $queue->ack($envelope->getDeliveryTag());
}

?>

 

備註:

發送消息時,只要有「交換機」就夠了。至於交換機後面有沒有對應的處理隊列,發送方是不用管的。

對於交換機,有兩個重要的概念:

A,類型。有三種類型: Fanout類型最簡單,這種模型忽略routingkey;Direct類型是使用最多的,使用肯定的routingkey。這種模型下,接收消息時綁定'key_1'則只接收key_1的消息;最後一種是Topic,這種模式與Direct相似,可是支持通配符進行匹配,好比: 'key_*',就會接受key_1和key_2。Topic貌似美好,可是有可能致使不嚴謹,因此仍是推薦使用Direct。

B,持久化。指定了持久化的交換機,在從新啓動時才能重建,不然須要客戶端從新聲明生成才行。

須要特別明確的概念:交換機的持久化,並不等於消息的持久化。只有在持久化隊列中的消息,才能持久化;若是沒有隊列,消息是沒有地方存儲的;消息自己在投遞時也有一個持久化標誌的,PHP中默認投遞到持久化交換機就是持久的消息,不用特別指定

queue: 隊列

事實上,隊列僅是針對接收方(consumer)的,由接收方根據需求建立的。只有隊列建立了,交換機纔會將新接受到的消息送到隊列中,交換機是不會在隊列建立以前的消息放進來的。換句話說,在創建隊列以前,發出的全部消息都被丟棄了。

下面這個圖比RabbitMQ官方的圖更清楚——Queue是屬於ReceiveMessage的一部分。

相關文章
相關標籤/搜索