PHP中RabbitMQ之phpAmqplib實現(五)

本章講訴如何使用php-amqplib實現RabbitMQ。php

環境:CoentOS,PHP 7html

簡單介紹一下php-amqpliblinux

php-amqplib是Advanced Message Queuing Protocol (AMQP)的一個PHP開源實現。高級消息隊列協議(AMQP)是一個異步消息傳遞所使用的應用層協議規範。做爲線路層協議,而不是API(例如JMS),AMQP 客戶端可以無視消息的來源任意發送和接受信息json

一、RabbitMQ的安裝centos

須要下載的兩個包數組

erlang-21.0.7-1.el7.centos.x86_64.rpm服務器

rabbitmq-server-3.7.7-1.el7.noarch.rpm併發

這兩個包我已經放在了百度雲盤的分享上composer

連接:https://pan.baidu.com/s/1rMv_yFpLnH-D1S5wrOZrbA#list/path=%2FRabbitMQ異步

提取碼:ipyu

而後參照 weixin_41368339的博客linux rabbitmq3.7.7安裝與使用一文中的步驟安裝步,基本上沒有什麼問題

二、composer的安裝(已安裝的請忽略此步)

爲何要裝這個?咱們能夠經過composer來下載安裝php-amqplib

如何安裝composer,能夠百度一下composer的全局安裝或者直接去composer中文網

三、php-amqplib的下載及安裝

新建一個composer.json的文件,內容以下所示

{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }

而後執行

composer install

會生成一個composer.lock文件及vendor文件夾,vendor文件夾裏有php-amqplib庫,且有一個autoload.php文件可使用自動加載

四、Demo示例

本Demo示例只建立了一個直連交換機,共有四個文件Consumer.php (消費者),Publisher.php (生產者) ,Parenter.php (本身封裝的RabbitMQ的方法) ,以及test.php (測試數據),目錄如圖所示

Parenter.php 代碼以下圖所示

<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; abstract class Parenter { //MQ的默認鏈接配置
    public $config = array( 'host' => '127.0.0.1', //ip
        'port' => '5672',      //端口號
        'user' => 'guest',     //用戶
        'password' => 'guest', //密碼
        'vhost' => '/'         //虛擬host
 ); public $connection;     //連接
    public $channel;        //信道
        
    public $exchangeName = '';     //交換機名
    public $queueName = '';        //隊列名
    public $routeKey = '';         //路由鍵
    public $exchangeType = 'direct';    //交換機類型
 
    public $autoAck = true; //是否自動ack應答
 
    public function __construct($exchangeName, $queueName, $routeKey, $exchangeType = 'direct', $config=array()) { $this->exchangeName = empty($exchangeName) ? '' : $exchangeName; $this->queueName = empty($queueName) ? '' : $queueName; $this->routeKey = empty($routeKey) ? '' : $routeKey; $this->exchangeType = empty($exchangeType) ? '' : 'direct'; if(!empty($config)) { $this->setConfig($config); } $this->createConnect(); } //建立鏈接與信道
    private function createConnect() { $host = $this->config['host']; $port = $this->config['port']; $user = $this->config['user']; $password = $this->config['password']; $vhost = $this->config['vhost']; if(empty($host) || empty($port) || empty($user) || empty($password)) { throw new Exception('RabbitMQ的鏈接配置不正確'); } //建立連接
        $this->connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost); //建立信道
        $this->channel = $this->connection->channel(); $this->createExchange(); } //建立交換機
    private function createExchange() { //建立交換機$channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete); //passive: 消極處理, 判斷是否存在隊列,存在則返回,不存在直接拋出 PhpAmqpLib\Exception\AMQPProtocolChannelException 異常 //durable:true、false true:服務器重啓會保留下來Exchange。警告:僅設置此選項,不表明消息持久化。即不保證重啓後消息還在 //autoDelete:true、false.true:當已經沒有消費者時,服務器是否能夠刪除該Exchange
        $this->channel->exchange_declare($this->exchangeName, $this->exchangeType, false, true, false); //passive: 消極處理, 判斷是否存在隊列,存在則返回,不存在直接拋出 PhpAmqpLib\Exception\AMQPProtocolChannelException 異常 //durable:true、false true:在服務器重啓時,可以存活 //exclusive :是否爲當前鏈接的專用隊列,在鏈接斷開後,會自動刪除該隊列 //autodelete:當沒有任何消費者使用時,自動刪除該隊列 //arguments: 自定義規則
        $this->channel->queue_declare($this->queueName, false, true, false, false); } //發送消息
    public function sendMessage($data) { //建立消息$msg = new AMQPMessage($data,$properties) //#$data string類型 要發送的消息 //#roperties array類型 設置的屬性,好比設置該消息持久化[‘delivery_mode’=>2]
        $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $this->channel->basic_publish($msg,$this->exchangeName, $this->routeKey); } //處理消息
    public function dealMq($flag) { $this->autoAck = $flag; $this->channel->queue_bind($this->queueName,$this->exchangeName, $this->routeKey); //prefetchSize:0 //prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個消息,即一旦有N個消息尚未ack,則該consumer將block掉,直到有消息ack //global:true\false 是否將上面設置應用於channel,簡單點說,就是上面限制是channel級別的仍是consumer級別 //$this->channel->basic_qos(0, 1, false); //1:queue 要取得消息的隊列名 //2:consumer_tag 消費者標籤 //3:no_local false這個功能屬於AMQP的標準,可是rabbitMQ並無作實現.參考 //4:no_ack false收到消息後,是否不須要回復確認即被認爲被消費 //5:exclusive false排他消費者,即這個隊列只能由一個消費者消費.適用於任務不容許進行併發處理的狀況下.好比系統對接 //6:nowait false不返回執行結果,可是若是排他開啓的話,則必須須要等待結果的,若是兩個一塊兒開就會報錯 //7:callback null回調函數 //8:ticket null //9:arguments null
        $this->channel->basic_consume($this->queueName, '', false, $this->autoAck, false, false, function($msg){$this->get($msg);}); //監聽消息
        while(count($this->channel->callbacks)){ $this->channel->wait(); } } public function get($msg) { $param = $msg->body; $this->doProcess($param); if(!$this->autoAck) { //手動ack應答
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } } abstract public function doProcess($param); public function closeConnetct() { $this->channel->close(); $this->connection->close(); } //從新設置MQ的連接配置
    public function setConfig($config) { if (!is_array($config)) { throw new Exception('config不是一個數組'); } foreach($config as $key => $value) { $this->config[$key] = $value; } } }

Consumer.php (消費者)

<?php include_once('Parenter.php'); class Consumer extends Parenter { public function __construct() { parent::__construct('exchange', 'queue', 'routeKey'); } public function doProcess($msg) { echo $msg."\n"; } } $consumer = new Consumer(); //$consumer->dealMq(false);
$consumer->dealMq(true);

Publisher.php (生產者)

<?php include_once('Parenter.php'); class Publisher extends Parenter { public function __construct() { parent::__construct('exchange', '', 'routeKey'); } public function doProcess($msg) { } }

test.php(測試數據)

五、添加交換機與隊列

打開http://ip(你的RabbitMQ安裝的主機):15672/,會進入到RabbitMQ的可視化管理後臺登陸頁面,登陸你的帳號密碼(若是你是按照第一步提到的博客裏的教程來裝的,那你的帳號密碼就是guest),而後新加交換機和隊列,

如下是新加交換機的操做,注意vhost與以及交換機的名稱要與代碼裏的消費者與生產者傳入的參數值保持一致,若是你不想使用"/"這個默認的vhost,也能夠新建一個vhost(什麼?你問我如何新建,那麼請百度一下),可是要記住在代碼裏建立消費者與生產者時把你新加的這個vhost傳進去,覆蓋RabbitMqParernt.php裏的vhost

如下是新加隊列,這裏的vhost要與上一步的vhost保持一致,保證交換機與隊列在同一個vhost下,否則交換機會找不到隊列的,隊列名與消費者代碼裏傳入進去的隊列名保持一致

六、運行代碼

先打開一個窗口啓動消費者

運行測試腳本

若是打印出來字符串就成功了

注意:消費者與生產者傳入的交換機名稱,路由鍵必須相同

            交換機類型請務必選擇直連,各類交換機的路由鍵形式不大相同,有興趣的同窗能夠去試試其它類型的交換機實現哦

            當修改了vhost或者交換機名稱,隊列名稱等時,須要修改對應代碼

            至於註釋裏的ack應答,我會在以後的博客裏詳細介紹,包括RabbitMQ的持久化,這裏使用默認的ack應答便可

            代碼裏不少註釋都是我後來學習php-amqplib庫中類的方法時加的,表示的是參數的意義,你們也能夠去研究一下,這裏提供個網址: Rabbitmq各方法的做用詳解

            關於管理後臺及RabbitMQ的命令,我這裏就很少介紹了,有興趣的同窗去網上搜索一下就能搜到好多

【轉】:https://blog.csdn.net/yeyun666/article/details/86743784

相關文章
相關標籤/搜索