php+redis實現延遲隊列

基於redis有序集實現延遲任務執行,好比某個時間給某個用戶發短信,訂單過時處理,等等
我是在tp5框架上寫的,實現起來很簡單,對於一些不是很複雜的應用足夠了,目前在公司項目中使用,後臺進程並無實現多進程,
很少說,貼代碼,不回排版,見諒php

一、命令行腳本 執行方法:php think delay-queue queuename(這是有序集的key)redis

namespace app\command;

use app\common\lib\delayqueue\DelayQueue;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Db;

class DelayQueueWorker extends Command
{
    const COMMAND_ARGV_1 = 'queue';

    protected function configure()
    {
        $this->setName('delay-queue')->setDescription('延遲隊列任務進程');
        $this->addArgument(self::COMMAND_ARGV_1);
    }

    protected function execute(Input $input, Output $output)
    {
        $queue = $input->getArgument(self::COMMAND_ARGV_1);
        //參數1 延遲隊列表名,對應與redis的有序集key名
        while (true) {
            DelayQueue::getInstance($queue)->perform();
            usleep(300000);
        }
    }
}

庫類目錄結構
clipboard.pngapp

config.php 裏是redis鏈接參數配置框架

RedisHandler.php只實現有序集的操做,重連機制尚未實現異步

namespace app\common\lib\delayqueue;

class RedisHandler
{
    public $provider;
    private static $_instance = null;

    private function __construct() {
        $this->provider = new \Redis();
        //host port
        $config = require_once 'config.php';
        $this->provider->connect($config['redis_host'], $config['redis_port']);
    }

    final private function __clone() {}

    public static function getInstance() {
        if(!self::$_instance) {
            self::$_instance = new RedisHandler();
        }
        return self::$_instance;
    }

    /**
     * @param string $key 有序集key
     * @param number $score 排序值
     * @param string $value 格式化的數據
     * @return int
     */
    public function zAdd($key, $score, $value)
    {
        return $this->provider->zAdd($key, $score, $value);
    }

    /**
     * 獲取有序集數據
     * @param $key
     * @param $start
     * @param $end
     * @param null $withscores
     * @return array
     */
    public function zRange($key, $start, $end, $withscores = null)
    {
        return $this->provider->zRange($key, $start, $end, $withscores);
    }

    /**
     * 刪除有序集數據
     * @param $key
     * @param $member
     * @return int
     */
    public function zRem($key,$member)
    {
        return $this->provider->zRem($key,$member);
    }

}

延遲隊列類ide

namespace app\common\lib\delayqueue;

class DelayQueue
{

    private $prefix = 'delay_queue:';

    private $queue;

    private static $_instance = null;

    private function __construct($queue) {
        $this->queue = $queue;
    }

    final private function __clone() {}

    public static function getInstance($queue = '') {
        if(!self::$_instance) {
            self::$_instance = new DelayQueue($queue);
        }
        return self::$_instance;
    }

    /**
     * 添加任務信息到隊列
     *
     * demo DelayQueue::getInstance('test')->addTask(
     *    'app\common\lib\delayqueue\job\Test',
     *    strtotime('2018-05-02 20:55:20'),
     *    ['abc'=>111]
     * );
     *
     * @param $jobClass
     * @param int $runTime 執行時間
     * @param array $args
     */
    public function addTask($jobClass, $runTime, $args = null)
    {

        $key = $this->prefix.$this->queue;

        $params = [
            'class' => $jobClass,
            'args'  => $args,
            'runtime' => $runTime,
        ];

        RedisHandler::getInstance()->zAdd(
            $key,
            $runTime,
            serialize($params)
        );
    }

    /**
     * 執行job
     * @return bool
     */
    public function perform()
    {
        $key = $this->prefix.$this->queue;
        //取出有序集第一個元素
        $result = RedisHandler::getInstance()->zRange($key, 0 ,0);

        if (!$result) {
            return false;
        }

        $jobInfo = unserialize($result[0]);

        print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);

        $jobClass = $jobInfo['class'];

        if(!@class_exists($jobClass)) {
            print_r($jobClass.' undefined'. PHP_EOL);
            RedisHandler::getInstance()->zRem($key, $result[0]);
            return false;
        }

        // 到時間執行
        if (time() >= $jobInfo['runtime']) {
            $job = new $jobClass;
            $job->setPayload($jobInfo['args']);
            $jobResult = $job->preform();
            if ($jobResult) {
                // 將任務移除
                RedisHandler::getInstance()->zRem($key, $result[0]);
                return true;
            }
        }

        return false;
    }

}

異步任務基類:ui

namespace app\common\lib\delayqueue;

class DelayJob
{

    protected $payload;

    public function preform ()
    {
        // todo
        return true;
    }


    public function setPayload($args = null)
    {
        $this->payload = $args;
    }

}

全部異步執行的任務都卸載job目錄下,且要繼承DelayJob,你能夠實現任何你想延遲執行的任務this

如:spa

namespace app\common\lib\delayqueue\job;

use app\common\lib\delayqueue\DelayJob;

class Test extends DelayJob
{

    public function preform()
    {
        // payload 裏應該有處理任務所需的參數,經過DelayQueue的addTask傳入
        print_r('test job'.PHP_EOL);
        return true;
    }

}

使用方法:命令行

假設用戶建立了一個訂單,訂單在10分鐘後失效,那麼在訂單建立後加入:

DelayQueue::getInstance('close_order')->addTask(
     'app\common\lib\delayqueue\job\CloseOrder', // 本身實現的job
     strtotime('2018-05-02 20:55:20'), // 訂單失效時間
     ['order_id'=>123456] // 傳遞給job的參數
 );

close_order 是有序集的key

命令行啓動進程

php think delay-queue close_order

相關文章
相關標籤/搜索