相關博文:
CentOS6.9安裝RabbitMQ和源碼編譯安裝php的RabbitMQ擴展
RabbitMQ入門基礎
CentOS7源碼編譯安裝nginx+php7.2+mysql5.7並使用systemctl管理
RabbitMQ的安裝過程,工做流程,和一些基礎概念已經在前面的筆記中提到了,今天在本地實現了php鏈接RabbitMQ,以及消息的生產和消費的過程,首先看下沒有生產者和消費者的默認RabbitMQ管理界面截圖:
Connections:
尚未任何鏈接(Connections)
Channels:
尚未任何通道(Channels)
Exchanges:
交換機只有系統默認的
Queues:
尚未任何隊列
先上消費者代碼consumer.php
php
<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/15 * Time: 13:16 */ //聲明鏈接參數 $config = array( 'host' => '192.168.75.132', 'vhost' => '/', 'port' => 5672, 'login' => 'test', 'password' => 'test' ); //鏈接broker $cnn = new AMQPConnection($config); if (!$cnn->connect()) { echo "Cannot connect to the broker"; exit(); } //在鏈接內建立一個通道 $ch = new AMQPChannel($cnn); //建立一個交換機 $ex = new AMQPExchange($ch); //聲明路由鍵 $routingKey = 'key_1'; //聲明交換機名稱 $exchangeName = 'exchange_1'; //設置交換機名稱 $ex->setName($exchangeName); //設置交換機類型 //AMQP_EX_TYPE_DIRECT:直連交換機 //AMQP_EX_TYPE_FANOUT:扇形交換機 //AMQP_EX_TYPE_HEADERS:頭交換機 //AMQP_EX_TYPE_TOPIC:主題交換機 $ex->setType(AMQP_EX_TYPE_DIRECT); //設置交換機持久 $ex->setFlags(AMQP_DURABLE); //聲明交換機 $ex->declareExchange(); //建立一個消息隊列 $q = new AMQPQueue($ch); //設置隊列名稱 $q->setName('queue_1'); //設置隊列持久 $q->setFlags(AMQP_DURABLE); //聲明消息隊列 $q->declareQueue(); //交換機和隊列經過$routingKey進行綁定 $q->bind($ex->getName(), $routingKey); //接收消息並進行處理的回調方法 function receive($envelope, $queue) { //休眠兩秒, sleep(2); //echo消息內容 echo $envelope->getBody()."\n"; //顯式確認,隊列收到消費者顯式確認後,會刪除該消息 $queue->ack($envelope->getDeliveryTag()); } //設置消息隊列消費者回調方法,並進行阻塞 $q->consume("receive"); //$q->consume("receive", AMQP_AUTOACK);//隱式確認,不推薦
以上是消費者代碼,打開兩個命令行/終端
輸入php consumer.php
,消費者開始阻塞獲取消息,以下圖
此時再看RabbitMQ管理界面:
Connections
出現兩個鏈接,這兩個就是消費者,由於他們在阻塞着等待消息
Channels
消費者在各自的鏈接裏都打開了一個通道
Exchanges
其中一個消費者建立了一個持久的直連交換機
Queues
消息隊列已經建立,但消息數是0,由於此時尚未生產者
生產者代碼publisher.php
html
<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/15 * Time: 13:15 */ $config = array( 'host' => '192.168.75.132', 'vhost' => '/', 'port' => 5672, 'login' => 'test', 'password' => 'test' ); $cnn = new AMQPConnection($config); if (!$cnn->connect()) { echo "Cannot connect to the broker"; exit(); } $ch = new AMQPChannel($cnn); $ex = new AMQPExchange($ch); //消息的路由鍵,必定要和消費者端一致 $routingKey = 'key_1'; //交換機名稱,必定要和消費者端一致, $exchangeName = 'exchange_1'; $ex->setName($exchangeName); $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE); $ex->declareExchange(); //建立10個消息 for ($i=1;$i<=10;$i++){ //消息內容 $msg = array( 'data' => 'message_'.$i, 'hello' => 'world', ); //發送消息到交換機,並返回發送結果 //delivery_mode:2聲明消息持久,持久的隊列+持久的消息在RabbitMQ重啓後纔不會丟失 echo "Send Message:".$ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n"; //代碼執行完畢後進程會自動退出 }
以上是生產者代碼
在執行以前,先關掉前面的兩個消費者,打開一個命令行/終端,輸入php publisher.php
,因爲生產者不須要阻塞,執行完進程便退出,因此如今RabbitMQ管理界面中既沒有Connections也沒有Channels,可是Queues已經被Exchanges投遞過去了10條消息,以下圖:
由於咱們執行生產者以前已經關掉了所有消費者,因此此時消息在隊列中等待獲取;
由於在發送消息時設置了delivery_mode:2
來聲明消息持久化,此時若是重啓RabbitMQ,消息還會恢復;此時從新執行消費者,假設仍是兩個,打開兩個命令行/終端,輸入php consumer.php
,咱們能夠看到消息被消費,以下圖:
提醒:生產者在生產消息時,若是不存在指定隊列,而且沒有建立隊列,或者隊列存在但消息路由鍵和交換機與隊列綁定的鍵(路由規則)不一致(直連交換機必須一致),則消息會被交換機丟棄。
原文地址:PHP使用RabbitMQ實例mysql