隊列擴展, 支持多個隊列

<?php

/** 
 * @author 魔芋紅茶
 * 
 */
class MsgQuery {
    // TODO - Insert your code here
    private static $KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息緩衝key前綴
    private static $KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key
    private static $KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已處理緩衝key前綴
    const SCORE_NUM = 5; // 優先級劃分數目
    const MIN_SCORE = 1; // 最小優先級
    private static $MAX_SCORE;
    private static $instance = null;
    private $redis;
    private $curQuery;
    const QUERY_DEFAULT           = 'ms.q.dft';
    const QUERY_ITEM_TASK            = 'ms.q.itm';
    const QUERY_ITEM_SHELVES_ADD  = 'ms.q.shelves_add';  // 上架
    const QUERY_ITEM_SHELVES_MOVE = 'ms.q.shelves_move'; // 下架
    const QUERY_ITEM_IMPORT          = 'ms.q.import';          // 導入
    const QUERY_ITEM_PRICE           = 'ms.q.price';          // 修改售價
    const QUERY_ITEM_INV_QTY      = 'ms.q.inv_qty';      // 修改庫存
    const QUERY_ITEM_POINT            = 'ms.q.sell_point';      // 修改賣點
    const QUERY_ITEM_DEL           = 'ms.q.itm.delete';      // 商品刪除
    const QUERY_ITEM_RESTORE      = 'ms.q.itm.restore';  // 商品恢復
    const QUERY_ITEM_EDIT           = 'ms.q.itm.edit';      // 商品修改
    const QUERY_ITEM_ADD           = 'ms.q.itm.add';      // 商品添加
    
    private static $querys = array (
        self::QUERY_DEFAULT,
        self::QUERY_ITEM_TASK,
        self::QUERY_ITEM_SHELVES_ADD,
        self::QUERY_ITEM_SHELVES_MOVE,
        self::QUERY_ITEM_IMPORT,
        self::QUERY_ITEM_PRICE,
        self::QUERY_ITEM_INV_QTY,
        self::QUERY_ITEM_POINT,
        self::QUERY_ITEM_DEL,
        self::QUERY_ITEM_RESTORE,
        self::QUERY_ITEM_ADD,
        self::QUERY_ITEM_EDIT
    );

    /**
     * 清理已通過期的query數據
     */
    public function clean() {
        $ystDay = date ( 'Ymd', strtotime ( '-1 day' ) );
        $cacheKey = self::$KEY_CACHE_PREFIX . $ystDay;
        $cacheDealKey = self::$KEY_CACHE_DEAL_PREFIX . $ystDay;
        // 清理前嘗試移動緩衝區裏的殘留消息到隊列
        $this->moveToQuery ( $ystDay );
        // 刪除過時緩衝區
        $this->redis->del ( $cacheKey );
        $this->redis->del ( $cacheDealKey );
    }

    public static function getQuerys() {
        return self::$querys;
    }

    /**
     * 將當前query切換到指定query
     *
     * @param string $query            
     */
    public function selectQuery($query) {
        if (! in_array ( $query, self::$querys )) {
            $query = self::QUERY_DEFAULT;
        }
        $this->curQuery = $query;
        self::$KEY_CACHE_PREFIX = $query . '.cache';
        self::$KEY_QUERY_PREFIX = $query . '.lv';
        self::$KEY_CACHE_DEAL_PREFIX = $query . '.deal';
    }

    /**
     * 獲取一個query實例
     * @param redis $redis
     * @param string $query
     * @return MsgQuery
     */
    public static function getInstance($redis, $query) {
        if (null == self::$instance) {
            self::$instance = new MsgQuery ( $redis, $query );
        }
        if (self::$instance->curQuery != $query) {
            self::$instance->selectQuery ( $query );
        }
        return self::$instance;
    }

    /**
     * 添加消息到消息緩衝區
     *
     * @param int $score
     *            優先級(1-5)
     * @param string $msg
     *            消息
     */
    public function add($score, $msg) {
        // 添加到消息緩衝
        $socre = intval ( $score );
        if ($socre < self::MIN_SCORE) {
            $score = self::MIN_SCORE;
        }
        if ($score > self::$MAX_SCORE) {
            $score = self::$MAX_SCORE;
        }
        $cacheKey = self::$KEY_CACHE_PREFIX . date ( 'Ymd' );
        $cacheData = array (
                'score' => $score,
                'msg' => $msg 
        );
        
        // 被添加到集合中的新元素的數量,不包括被忽略的元素,故重複添加返回false
        return $this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );
    }

    /**
     * 將消息從緩衝區移動到相應的優先級隊列中
     */
    public function moveToQuery($day = null) {
        if ($day === null) {
            $day = date ( 'Ymd' );
        }
        // 獲取當前緩衝區沒有入隊列的消息
        $dealKey = self::$KEY_CACHE_DEAL_PREFIX . $day;
        $cacheKey = self::$KEY_CACHE_PREFIX . $day;
        $msgs = $this->redis->sDiff ( $cacheKey, $dealKey );
        foreach ( $msgs as $cachedData ) {
            // 放入已處理集合
            $this->redis->sAdd ( $dealKey, $cachedData );
            // 壓入相應的優先級隊列
            $cachedData = unserialize ( $cachedData );
            $score = $cachedData ['score'];
            $msg = $cachedData ['msg'];
            $queryKey = self::$KEY_QUERY_PREFIX . $score;
            $this->redis->rPush ( $queryKey, serialize($msg) );
        }
        unset ( $cachedData );
    }

    /**
     * 從隊列阻塞式出棧一個最高優先級消息
     *
     * @return string msg
     */
    public function bPop() {
        $queryKeys = array ();
        for($score = self::$MAX_SCORE; $score >= self::MIN_SCORE; $score --) {
            $queryKeys [] = self::$KEY_QUERY_PREFIX . $score;
        }
        $msg = @$this->redis->blPop ( $queryKeys, 0 );
        return $msg [1];
    }

    private function __construct($redis, $query) {
        $this->redis = $redis;
//         $this->redis->connect ();
        self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;
        $this->selectQuery ( $query );
    }

    public function __destruct() {
        if ($this->redis)     $this->redis->close ();
    }
}

?>

主體沒有任何改變, 依然是2個set作緩衝, 多個list作實際隊列的實現, 只是擴展爲支持多個隊列的實現.php

其它隊列使用中遇到過的問題:mysql

1 feed程序由於長時間阻塞而斷開了和redis的鏈接redis

解決方法: 加入代碼sql

ini_set('default_socket_timeout', -1);

以上代碼可讓php進程和redis經過socket長鏈接socket

2 feed程序由於長時間阻塞而斷開了和mysql的鏈接ui

解決方法: 查看mysql驅動使用的是哪一種, 咱們項目中用的是mysqli鏈接, 具體驅動是mysqlnd, 若是是mysqld, 最簡單的方式是先設置php變量mysqli.reconnect=1再加入ping命令this

但有個問題是mysqlnd驅動下ping命令並不能自動重連, 即便改了php變量也沒用, 只能顯示進行重連, 以下spa

        if(!$this->connection->ping()){
            //mysql 鏈接丟失且mysqlnd不會自動重連, 手動重連
            $this->connectDB();
        }

3 從新發布/變動了feed程序可能須要重啓進程加載新代碼線程

 

最後附上一個隊列分發和清理任務腳本, 腳本中多進程分發的代碼存在問題, 會產生不少殭屍進程, 目前僅使用單進程版本, 處理時間爲天天凌晨調用clean腳本清理前一天的隊列, 分發程序爲每5分鐘執行一次, 具體執行間隔視狀況調整rest

<?php
class ControllerMsgQuery extends Controller {

    /**
     * 處理隊列緩衝區的數據, 移動到隊列
     */
    public function index() {
        //多進程版本存在問題, 先使用單進程版本
        $this->dealWithSingleProcess();
    }
    
    private function dealWithMuitlProcess(){
        set_time_limit ( 0 );
        global $global;
        $this->load->ventor ( 'msg_query/MsgQuery' );
        $querys = MsgQuery::getQuerys ();
        $redis = new Redis ();
        $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] );
        foreach ( $querys as $query ) {
            // 每一個query用獨立子線程完成move to list的工做
            $pid = pcntl_fork ();
            if ($pid > 0) {
                // 父進程
            } else if ($pid == 0) {
                // 子進程
                $MsgQuery = MsgQuery::getInstance ( $redis, $query );
                $MsgQuery->moveToQuery ();
                exit();
            } else {
                exit();
            }
        }
        // 父進程等待子進程以釋放資源
        //         if ($pid > 0){
        pcntl_wait ( $status );
        //         }
    }
    
    private function dealWithSingleProcess(){
        set_time_limit ( 0 );
        global $global;
        $this->load->ventor ( 'msg_query/MsgQuery' );
        $querys = MsgQuery::getQuerys ();
        $redis = new Redis ();
        $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] );
        foreach ( $querys as $query ) {
            $MsgQuery = MsgQuery::getInstance ( $redis, $query );
            $MsgQuery->moveToQuery ();
        }
    }

    /**
     * 清理已通過期的query數據
     */
    public function clean() {
        set_time_limit ( 0 );
        global $global;
        $this->load->ventor ( 'msg_query/MsgQuery' );
        $querys = MsgQuery::getQuerys ();
        $redis = new Redis ();
        $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] );
        foreach ( $querys as $query ) {
            $MsgQuery = MsgQuery::getInstance ( $redis, $query );
            $MsgQuery->clean ();
        }
    }
}
相關文章
相關標籤/搜索