延時隊列的做用再也不累述
本文使用rabbitmq的queue能夠設置ttl時間,將到期的message設爲死信,message會被push到delay_queue,消費delay_queue便可實現延時隊列功能
先假設這樣一個場景:
小明在外賣平臺下個一個訂單,若是超過10分鐘未支付,則系統自動取消訂單,並推送給用戶「訂單已取消」信息。php
開發思路:
下訂單時就將訂單orderId push到訂單隊列order_queue,並設置次條message的有效期爲10分鐘,當10分鐘後此條message到期,會將此條message轉化爲死信push到exchange,將exchange和queue進行綁定,開一個/多個消費者消費queue,並判斷queue中message訂單是否已支付,若未支付則推送通知,取消訂單。git
流程圖,未考慮消息消費失敗的狀況github
對RabbitMQ進行簡單的封裝json
<?php namespace RabbitMQ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitMQ { private $host = '127.0.0.1'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; protected $connection; protected $channel; /** * RabbitMQ constructor. */ public function __construct() { $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); } /** * 生成信息 * @param $message */ public function sendMessage($message, $routeKey, $exchange = '', $properties = []) { $data = new AMQPMessage( $message, $properties ); $this->channel->basic_publish($data, $exchange, $routeKey); } /** * 消費消息 * @param $queueName * @param $callback * @throws \ErrorException */ public function consumeMessage($queueName,$callback) { $this->channel->basic_consume($queueName, '', false, false, false, false, $callback); while ($this->channel->is_consuming()) { $this->channel->wait(); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); } }
建立延時隊列ui
<?php namespace RabbitMQ; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; /** * 使用RabbitMQ實現延時隊列功能 * Class DelayQueue * @package RabbitMQ */ class DelayQueue extends RabbitMQ { /** * 建立延時隊列 * @param $ttl * @param $delayExName * @param $delayQueueName * @param $queueName */ public function createQueue($ttl, $delayExName, $delayQueueName, $queueName) { $args = new AMQPTable([ 'x-dead-letter-exchange' => $delayExName, 'x-message-ttl' => $ttl, //消息存活時間 'x-dead-letter-routing-key' => $queueName ]); $this->channel->queue_declare($queueName, false, true, false, false, false, $args); //綁定死信queue $this->channel->exchange_declare($delayExName, AMQPExchangeType::DIRECT, false, true, false); $this->channel->queue_declare($delayQueueName, false, true, false, false); $this->channel->queue_bind($delayQueueName, $delayExName, $queueName, false); } }
生產者,代碼很簡單,看看運行以後的效果,訂單的message愈來愈多this
<?php require_once '../vendor/autoload.php'; // 生產者 $delay = new \RabbitMQ\DelayQueue(); $ttl = 1000 * 100;//訂單100s後超時 $delayExName = 'delay-order-exchange';//超時exchange $delayQueueName = 'delay-order-queue';//超時queue $queueName = 'ttl-order-queue';//訂單queue $delay->createQueue($ttl, $delayExName, $delayQueueName, $queueName); //100個訂單信息,每一個訂單超時時間都是10s for ($i = 0; $i < 100; $i++) { $data = [ 'order_id' => $i + 1, 'remark' => 'this is a order test' ]; $delay->sendMessage(json_encode($data), $queueName); sleep(1); }
消費者,看看消費以後的,過一會會觀察到,已經有到期message被push到了delay_order_queue
消費者也消費到了messagespa
<?php require_once '../vendor/autoload.php'; // 消費者 $delay = new \RabbitMQ\DelayQueue(); $delayQueueName = 'delay-order-queue'; $callback = function ($msg) { echo $msg->body . PHP_EOL; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //處理訂單超時邏輯,給用戶推送提醒等等。。。 sleep(10); }; /** * 消費已經超時的訂單信息,進行處理 */ $delay->consumeMessage($delayQueueName, $callback);
代碼見:https://github.com/jiaoyang3/...code