PHP實戰RabbitMQ之基礎使用篇

簡介

RabbitMQ是一個高可用的信息中間件,學習和使用RabbitMQ很是有必要。php

  • 異步消息傳遞
  • 支持各類開發語言java、python、php等
  • 可插拔的身份驗證、受權
  • RabbitMQ-Manager可用於管理和監視。

安裝

這裏直接使用docker,很方便的進行安裝java

拉取鏡像
docker pull rabbitmq:3.8.3-management-alpinepython

運行
docker run --name run-rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmqgit

15672端口是RabbitMQ Web管理頁面,直接訪問:http://localhost:15672/,初始用戶密碼:guest
image.pnggithub

使用

RabbitMQ做爲生產者和消費者來使用時,基本上有2中場景web

  • 一個/多個生產者,多個共享消費者
  • 一個/多個生產者,多個獨立消費者

共享的消費者能夠同時消費一個隊列的數據,增長吞吐量
獨立的消費者不共享隊列,每一個消費者都有本身的隊列,能夠定義規則從exchange中pull數據到本身的queue中docker

下面將經過代碼來實現各類場景異步

基礎概念

queue

數據隊列,數據能夠推送到queue,也能夠從queue中消費學習

exchange 交換機

將數據推送到交換機中,隊列能夠綁定交換機,交換機的類型不一樣所支持的綁定規則也不一樣ui

  • fanout 沒有規則,全部exchange中的數據
  • direct 精確匹配,只綁定routingkey指定值的數據
  • topic 更加靈活的規則,路由鍵routingkey必須是一個由.分隔開的詞語,* (星號) 用來表示一個單詞,# (井號) 用來表示任意數量(零個或多個)單詞

封裝RabbitMQ一些經常使用操做

<?php


namespace RabbitMQ;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitMQ
{

    private $host = '127.0.0.1';
    private $port = 5672;
    private $user = 'guest';
    private $password = 'guest';
    protected $connection;
    protected $channel;


    /**
     * RabbitMQ constructor.
     */
    public function __construct()
    {
        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
        $this->channel    = $this->connection->channel();
    }

    /**
     * @param $exchangeName
     * @param $type
     * @param $pasive
     * @param $durable
     * @param $autoDelete
     */
    public function createExchange($exchangeName, $type, $pasive = false, $durable = false, $autoDelete = false)
    {
        $this->channel->exchange_declare($exchangeName, $type, $pasive, $durable, $autoDelete);
    }

    /**
     * @param $queueName
     * @param $pasive
     * @param $durable
     * @param $exlusive
     * @param $autoDelete
     */
    public function createQueue($queueName, $pasive = false, $durable = false, $exlusive = false, $autoDelete = false, $nowait = false, $arguments = [])
    {
        $this->channel->queue_declare($queueName, $pasive, $durable, $exlusive, $autoDelete, $nowait, $arguments);
    }

    /**
     * 生成信息
     * @param $message
     */
    public function sendMessage($message, $routeKey, $exchange = '', $properties = [])
    {
        $data = new AMQPMessage(
            $message, $properties
        );
        $this->channel->basic_publish($data, $exchange, $routeKey);
    }

    /**
     * 消費消息
     * @param $queueName
     * @param $callback
     * @throws \ErrorException
     */
    public function consumeMessage($queueName, $callback, $tag = '', $noLocal = false, $noAck = false, $exclusive = false, $noWait = false)
    {
        $this->channel->basic_consume($queueName, $tag, $noLocal, $noAck, $exclusive, $noWait, $callback);
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }

    /**
     * @throws \Exception
     */
    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

多個共享消費者

多個消費者能夠增長消費速度,提供系統吞吐量
image.png

小二,直接上代碼吧
生產者代碼

<?php

require_once '../../vendor/autoload.php';

use RabbitMQ\RabbitMQ;
use PhpAmqpLib\Message\AMQPMessage;

$rabbit = new RabbitMQ();

$queueName = 'test-single-queue';
$rabbit->createQueue($queueName,false,true,false,false);
for ($i = 0; $i < 10000; $i++) {
    $rabbit->sendMessage($i . "this is a test message.", $queueName,'',[
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啓rabbitmq,消息不會丟失
    ]);
}

unset($rabbit);//關閉鏈接

運行生產者php Producer,在manager web頁面能夠可看到這個queue信息
image.png

消費者代碼

<?php

require_once '../../vendor/autoload.php';

use RabbitMQ\RabbitMQ;

$rabbit = new RabbitMQ();

$queueName = 'test-single-queue';
$callback = function ($message){
    var_dump("Received Message : " . $message->body);//print message
    sleep(2);//處理耗時任務
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);//ack
};
$rabbit->consumeMessage($queueName,$callback);

unset($rabbit);//關閉鏈接

運行消費者二次php Consumer.php
image.png
image.png
能夠看到二個消費者不會重複消費message
也可經過manager web看到此queue的message正在被消費
image.png

多個獨立消費者

RabbitMQ生產者將message推送到exchange,經過將多個queue與exchange進行綁定,來實現多個獨立消費者

定義一個topic類型的交換機,消費規則是:test.ex.加一個單詞

<?php

require_once '../../vendor/autoload.php';

use RabbitMQ\RabbitMQ;

$rabbit = new RabbitMQ();

$exchangeName = 'test-ex-topic';
$queueName    = 'test-consumer-ex-topic';
$routingKey   = 'test.ex.*';//消費規則定義

//建立隊列
$rabbit->createQueue($queueName, false, true);
//綁定到交換機
$rabbit->bindQueue($queueName, $exchangeName, $routingKey);
//消費
$callback = function ($message) {
    var_dump("Received Message : " . $message->body);//print message
    sleep(2);//處理耗時任務
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);//ack
};
$rabbit->consumeMessage($queueName, $callback);

unset($rabbit);//關閉鏈接

啓動消費者php Consumer.php

定義生產者,會向2個不一樣的routingkey中推送message

<?php

require_once '../../vendor/autoload.php';

use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;


$rabbit = new RabbitMQ();

$routingKey1  = 'test.ex.queue1';
$routingKey2  = 'test.ex.queue2';
$exchangeName = 'test-ex-topic';
$rabbit->createExchange($exchangeName, AMQPExchangeType::TOPIC, false, true, false);

//向交換機和routingkey = test-ex-queue1中推送10000條數據
for ($i = 0; $i < 10000; $i++) {
    $rabbit->sendMessage($i . "this is a queue1 message.", $routingKey1, $exchangeName, [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啓rabbitmq,消息不會丟失
    ]);
}
//向交換機和routingkey = test-ex-queue2中推送10000條數據
for ($i = 0; $i < 10000; $i++) {
    $rabbit->sendMessage($i . "this is a queue2 message.", $routingKey2, $exchangeName, [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啓rabbitmq,消息不會丟失
    ]);
}

unset($rabbit);//關閉鏈接

運行生產者php Producer.php,能夠看到消費者有2萬條message能夠消費,包含了2個routingkey中的數據
image.png

代碼

代碼見:https://github.com/jiaoyang3/...

相關文章
相關標籤/搜索