RabbitMQ是一個高可用的信息中間件,學習和使用RabbitMQ很是有必要。php
這裏直接使用docker,很方便的進行安裝java
拉取鏡像docker pull rabbitmq:3.8.3-management-alpine
python
運行docker run --name run-rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq
git
15672端口是RabbitMQ Web管理頁面,直接訪問:http://localhost:15672/,初始用戶密碼:guest
github
RabbitMQ做爲生產者和消費者來使用時,基本上有2中場景web
共享的消費者能夠同時消費一個隊列的數據,增長吞吐量
獨立的消費者不共享隊列,每一個消費者都有本身的隊列,能夠定義規則從exchange中pull數據到本身的queue中docker
下面將經過代碼來實現各類場景異步
數據隊列,數據能夠推送到queue,也能夠從queue中消費學習
將數據推送到交換機中,隊列能夠綁定交換機,交換機的類型不一樣所支持的綁定規則也不一樣ui
.
分隔開的詞語,*
(星號) 用來表示一個單詞,#
(井號) 用來表示任意數量(零個或多個)單詞<?php namespace RabbitMQ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitMQ { private $host = '127.0.0.1'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; protected $connection; protected $channel; /** * RabbitMQ constructor. */ public function __construct() { $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); } /** * @param $exchangeName * @param $type * @param $pasive * @param $durable * @param $autoDelete */ public function createExchange($exchangeName, $type, $pasive = false, $durable = false, $autoDelete = false) { $this->channel->exchange_declare($exchangeName, $type, $pasive, $durable, $autoDelete); } /** * @param $queueName * @param $pasive * @param $durable * @param $exlusive * @param $autoDelete */ public function createQueue($queueName, $pasive = false, $durable = false, $exlusive = false, $autoDelete = false, $nowait = false, $arguments = []) { $this->channel->queue_declare($queueName, $pasive, $durable, $exlusive, $autoDelete, $nowait, $arguments); } /** * 生成信息 * @param $message */ public function sendMessage($message, $routeKey, $exchange = '', $properties = []) { $data = new AMQPMessage( $message, $properties ); $this->channel->basic_publish($data, $exchange, $routeKey); } /** * 消費消息 * @param $queueName * @param $callback * @throws \ErrorException */ public function consumeMessage($queueName, $callback, $tag = '', $noLocal = false, $noAck = false, $exclusive = false, $noWait = false) { $this->channel->basic_consume($queueName, $tag, $noLocal, $noAck, $exclusive, $noWait, $callback); while ($this->channel->is_consuming()) { $this->channel->wait(); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); } }
多個消費者能夠增長消費速度,提供系統吞吐量
小二,直接上代碼吧
生產者代碼
<?php require_once '../../vendor/autoload.php'; use RabbitMQ\RabbitMQ; use PhpAmqpLib\Message\AMQPMessage; $rabbit = new RabbitMQ(); $queueName = 'test-single-queue'; $rabbit->createQueue($queueName,false,true,false,false); for ($i = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a test message.", $queueName,'',[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啓rabbitmq,消息不會丟失 ]); } unset($rabbit);//關閉鏈接
運行生產者php Producer
,在manager web頁面能夠可看到這個queue信息
消費者代碼
<?php require_once '../../vendor/autoload.php'; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $queueName = 'test-single-queue'; $callback = function ($message){ var_dump("Received Message : " . $message->body);//print message sleep(2);//處理耗時任務 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);//ack }; $rabbit->consumeMessage($queueName,$callback); unset($rabbit);//關閉鏈接
運行消費者二次php Consumer.php
能夠看到二個消費者不會重複消費message
也可經過manager web看到此queue的message正在被消費
RabbitMQ生產者將message推送到exchange,經過將多個queue與exchange進行綁定,來實現多個獨立消費者
定義一個topic類型的交換機,消費規則是:test.ex.加一個單詞
<?php require_once '../../vendor/autoload.php'; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $exchangeName = 'test-ex-topic'; $queueName = 'test-consumer-ex-topic'; $routingKey = 'test.ex.*';//消費規則定義 //建立隊列 $rabbit->createQueue($queueName, false, true); //綁定到交換機 $rabbit->bindQueue($queueName, $exchangeName, $routingKey); //消費 $callback = function ($message) { var_dump("Received Message : " . $message->body);//print message sleep(2);//處理耗時任務 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);//ack }; $rabbit->consumeMessage($queueName, $callback); unset($rabbit);//關閉鏈接
啓動消費者php Consumer.php
定義生產者,會向2個不一樣的routingkey中推送message
<?php require_once '../../vendor/autoload.php'; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $routingKey1 = 'test.ex.queue1'; $routingKey2 = 'test.ex.queue2'; $exchangeName = 'test-ex-topic'; $rabbit->createExchange($exchangeName, AMQPExchangeType::TOPIC, false, true, false); //向交換機和routingkey = test-ex-queue1中推送10000條數據 for ($i = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a queue1 message.", $routingKey1, $exchangeName, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啓rabbitmq,消息不會丟失 ]); } //向交換機和routingkey = test-ex-queue2中推送10000條數據 for ($i = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a queue2 message.", $routingKey2, $exchangeName, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啓rabbitmq,消息不會丟失 ]); } unset($rabbit);//關閉鏈接
運行生產者php Producer.php
,能夠看到消費者有2萬條message能夠消費,包含了2個routingkey中的數據