PHP使用RabbitMQ實例

相關博文:
CentOS6.9安裝RabbitMQ和源碼編譯安裝php的RabbitMQ擴展
RabbitMQ入門基礎
CentOS7源碼編譯安裝nginx+php7.2+mysql5.7並使用systemctl管理
RabbitMQ的安裝過程,工做流程,和一些基礎概念已經在前面的筆記中提到了,今天在本地實現了php鏈接RabbitMQ,以及消息的生產和消費的過程,首先看下沒有生產者和消費者的默認RabbitMQ管理界面截圖:
Connections:

尚未任何鏈接(Connections)
Channels:

尚未任何通道(Channels)
Exchanges:

交換機只有系統默認的
Queues:

尚未任何隊列
先上消費者代碼consumer.phpphp

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/15
 * Time: 13:16
 */
//聲明鏈接參數
$config = array(
    'host' => '192.168.75.132',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'test',
    'password' => 'test'
);
//鏈接broker
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}
//在鏈接內建立一個通道
$ch = new AMQPChannel($cnn);
//建立一個交換機
$ex = new AMQPExchange($ch);
//聲明路由鍵
$routingKey = 'key_1';
//聲明交換機名稱
$exchangeName = 'exchange_1';
//設置交換機名稱
$ex->setName($exchangeName);
//設置交換機類型
//AMQP_EX_TYPE_DIRECT:直連交換機
//AMQP_EX_TYPE_FANOUT:扇形交換機
//AMQP_EX_TYPE_HEADERS:頭交換機
//AMQP_EX_TYPE_TOPIC:主題交換機
$ex->setType(AMQP_EX_TYPE_DIRECT);
//設置交換機持久
$ex->setFlags(AMQP_DURABLE);
//聲明交換機
$ex->declareExchange();
//建立一個消息隊列
$q = new AMQPQueue($ch);
//設置隊列名稱
$q->setName('queue_1');
//設置隊列持久
$q->setFlags(AMQP_DURABLE);
//聲明消息隊列
$q->declareQueue();
//交換機和隊列經過$routingKey進行綁定
$q->bind($ex->getName(), $routingKey);
//接收消息並進行處理的回調方法
function receive($envelope, $queue) {
    //休眠兩秒,
    sleep(2);
    //echo消息內容
    echo $envelope->getBody()."\n";
    //顯式確認,隊列收到消費者顯式確認後,會刪除該消息
    $queue->ack($envelope->getDeliveryTag());
}
//設置消息隊列消費者回調方法,並進行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隱式確認,不推薦

 

以上是消費者代碼,打開兩個命令行/終端
輸入php consumer.php,消費者開始阻塞獲取消息,以下圖

此時再看RabbitMQ管理界面:
Connections

出現兩個鏈接,這兩個就是消費者,由於他們在阻塞着等待消息
Channels

消費者在各自的鏈接裏都打開了一個通道
Exchanges

其中一個消費者建立了一個持久的直連交換機
Queues

消息隊列已經建立,但消息數是0,由於此時尚未生產者
生產者代碼publisher.phphtml

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/15
 * Time: 13:15
 */

$config = array(
    'host' => '192.168.75.132',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'test',
    'password' => 'test'
);
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}
$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);
//消息的路由鍵,必定要和消費者端一致
$routingKey = 'key_1';
//交換機名稱,必定要和消費者端一致,
$exchangeName = 'exchange_1';
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
//建立10個消息
for ($i=1;$i<=10;$i++){
    //消息內容
    $msg = array(
        'data'  => 'message_'.$i,
        'hello' => 'world',
    );
    //發送消息到交換機,並返回發送結果
    //delivery_mode:2聲明消息持久,持久的隊列+持久的消息在RabbitMQ重啓後纔不會丟失
    echo "Send Message:".$ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
    //代碼執行完畢後進程會自動退出
}

 

 

以上是生產者代碼
在執行以前,先關掉前面的兩個消費者,打開一個命令行/終端,輸入php publisher.php,因爲生產者不須要阻塞,執行完進程便退出,因此如今RabbitMQ管理界面中既沒有Connections也沒有Channels,可是Queues已經被Exchanges投遞過去了10條消息,以下圖:

由於咱們執行生產者以前已經關掉了所有消費者,因此此時消息在隊列中等待獲取;
由於在發送消息時設置了delivery_mode:2來聲明消息持久化,此時若是重啓RabbitMQ,消息還會恢復;此時從新執行消費者,假設仍是兩個,打開兩個命令行/終端,輸入php consumer.php,咱們能夠看到消息被消費,以下圖:

提醒:生產者在生產消息時,若是不存在指定隊列,而且沒有建立隊列,或者隊列存在但消息路由鍵和交換機與隊列綁定的鍵(路由規則)不一致(直連交換機必須一致),則消息會被交換機丟棄。
原文地址:PHP使用RabbitMQ實例mysql

相關文章
相關標籤/搜索