redis+PHP實現的一個優先級去重隊列

主要思路是用一個set作前端去重緩衝, 若干個list作後端的多優先級消息隊列, 用一個進程來進行分發, 即從set中分發消息到隊列.php

set緩衝的設計爲當天有效, 因此有個零點問題,有可能在零點前set中剛放進去的消息沒有分發即失效, 這一點能夠用另外一個進程彌補處理前一天的遺留消息和刪除前一天的緩衝前端

<?php

/** 
 * @author 
 * 
 */
class MsgQuery {
    // TODO - Insert your code here
    const KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息緩衝key前綴
    const KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key
    const KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已處理緩衝key前綴
    const SCORE_NUM = 5; // 優先級劃分數目
    const MIN_SCORE = 1; // 最小優先級
    static $MAX_SCORE;
    static $instance = null;
    private $redis;

    public static function getInstance($redis) {
        if (null == self::$instance) {
            self::$instance = new MsgQuery ( $redis );
        }
        return self::$instance;
    }

    /**
     * 添加消息到消息緩衝區
     * @param int $score 優先級(1-5)
     * @param string $msg 消息
     */
    public function add($score, $msg) {
        // 添加到消息緩衝
        $socre = intval ( $score );
        if ($socre < self::MIN_SCORE) {
            $score = self::MIN_SCORE;
        }
        if ($score > self::$MAX_SCORE) {
            $score = self::$MAX_SCORE;
        }
        $cacheKey = self::KEY_CACHE_PREFIX . date ( 'Ymd' );
        $cacheData = array (
                'score' => $score,
                'msg' => $msg 
        );
        $this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );
    }

    /**
     * 將消息從緩衝區移動到相應的優先級隊列中
     */
    public function moveToQuery() {
        // 獲取當前緩衝區沒有入隊列的消息
        $dealKey = self::KEY_CACHE_DEAL_PREFIX.date('Ymd');
        $cacheKey = self::KEY_CACHE_PREFIX.date('Ymd');
        $msgs = $this->redis->sDiff($cacheKey, $dealKey);
        foreach ($msgs as $cachedData){
            // 放入已處理集合
            $this->redis->sAdd ( $dealKey, $cachedData );
            // 壓入相應的優先級隊列
            $cachedData = unserialize($cachedData);
            $score = $cachedData['score'];
            $msg = $cachedData['msg'];
            $queryKey = self::KEY_QUERY_PREFIX.$score;
            $this->redis->rPush($queryKey, $msg);
        }
        unset($cachedData);
    }
    
    /**
     * 從隊列阻塞式出棧一個最高優先級消息
     * @return string msg
     */
    public function bPop(){
        $queryKeys = array();
        for($score=self::$MAX_SCORE;$score>=self::MIN_SCORE;$score--){
            $queryKeys[] = self::KEY_QUERY_PREFIX.$score;
        }
        $msg = $this->redis->blPop($queryKeys, 0);
        return $msg[1];
    }

    private function __construct($redis) {
        $this->redis = $redis;
        $this->redis->connect ();
        self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;
    }

    private function __destruct() {
        $this->redis->close ();
    }
}

?>
相關文章
相關標籤/搜索