<?php /** * @author 魔芋紅茶 * */ class MsgQuery { // TODO - Insert your code here private static $KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息緩衝key前綴 private static $KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key private static $KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已處理緩衝key前綴 const SCORE_NUM = 5; // 優先級劃分數目 const MIN_SCORE = 1; // 最小優先級 private static $MAX_SCORE; private static $instance = null; private $redis; private $curQuery; const QUERY_DEFAULT = 'ms.q.dft'; const QUERY_ITEM_TASK = 'ms.q.itm'; const QUERY_ITEM_SHELVES_ADD = 'ms.q.shelves_add'; // 上架 const QUERY_ITEM_SHELVES_MOVE = 'ms.q.shelves_move'; // 下架 const QUERY_ITEM_IMPORT = 'ms.q.import'; // 導入 const QUERY_ITEM_PRICE = 'ms.q.price'; // 修改售價 const QUERY_ITEM_INV_QTY = 'ms.q.inv_qty'; // 修改庫存 const QUERY_ITEM_POINT = 'ms.q.sell_point'; // 修改賣點 const QUERY_ITEM_DEL = 'ms.q.itm.delete'; // 商品刪除 const QUERY_ITEM_RESTORE = 'ms.q.itm.restore'; // 商品恢復 const QUERY_ITEM_EDIT = 'ms.q.itm.edit'; // 商品修改 const QUERY_ITEM_ADD = 'ms.q.itm.add'; // 商品添加 private static $querys = array ( self::QUERY_DEFAULT, self::QUERY_ITEM_TASK, self::QUERY_ITEM_SHELVES_ADD, self::QUERY_ITEM_SHELVES_MOVE, self::QUERY_ITEM_IMPORT, self::QUERY_ITEM_PRICE, self::QUERY_ITEM_INV_QTY, self::QUERY_ITEM_POINT, self::QUERY_ITEM_DEL, self::QUERY_ITEM_RESTORE, self::QUERY_ITEM_ADD, self::QUERY_ITEM_EDIT ); /** * 清理已通過期的query數據 */ public function clean() { $ystDay = date ( 'Ymd', strtotime ( '-1 day' ) ); $cacheKey = self::$KEY_CACHE_PREFIX . $ystDay; $cacheDealKey = self::$KEY_CACHE_DEAL_PREFIX . $ystDay; // 清理前嘗試移動緩衝區裏的殘留消息到隊列 $this->moveToQuery ( $ystDay ); // 刪除過時緩衝區 $this->redis->del ( $cacheKey ); $this->redis->del ( $cacheDealKey ); } public static function getQuerys() { return self::$querys; } /** * 將當前query切換到指定query * * @param string $query */ public function selectQuery($query) { if (! in_array ( $query, self::$querys )) { $query = self::QUERY_DEFAULT; } $this->curQuery = $query; self::$KEY_CACHE_PREFIX = $query . '.cache'; self::$KEY_QUERY_PREFIX = $query . '.lv'; self::$KEY_CACHE_DEAL_PREFIX = $query . '.deal'; } /** * 獲取一個query實例 * @param redis $redis * @param string $query * @return MsgQuery */ public static function getInstance($redis, $query) { if (null == self::$instance) { self::$instance = new MsgQuery ( $redis, $query ); } if (self::$instance->curQuery != $query) { self::$instance->selectQuery ( $query ); } return self::$instance; } /** * 添加消息到消息緩衝區 * * @param int $score * 優先級(1-5) * @param string $msg * 消息 */ public function add($score, $msg) { // 添加到消息緩衝 $socre = intval ( $score ); if ($socre < self::MIN_SCORE) { $score = self::MIN_SCORE; } if ($score > self::$MAX_SCORE) { $score = self::$MAX_SCORE; } $cacheKey = self::$KEY_CACHE_PREFIX . date ( 'Ymd' ); $cacheData = array ( 'score' => $score, 'msg' => $msg ); // 被添加到集合中的新元素的數量,不包括被忽略的元素,故重複添加返回false return $this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) ); } /** * 將消息從緩衝區移動到相應的優先級隊列中 */ public function moveToQuery($day = null) { if ($day === null) { $day = date ( 'Ymd' ); } // 獲取當前緩衝區沒有入隊列的消息 $dealKey = self::$KEY_CACHE_DEAL_PREFIX . $day; $cacheKey = self::$KEY_CACHE_PREFIX . $day; $msgs = $this->redis->sDiff ( $cacheKey, $dealKey ); foreach ( $msgs as $cachedData ) { // 放入已處理集合 $this->redis->sAdd ( $dealKey, $cachedData ); // 壓入相應的優先級隊列 $cachedData = unserialize ( $cachedData ); $score = $cachedData ['score']; $msg = $cachedData ['msg']; $queryKey = self::$KEY_QUERY_PREFIX . $score; $this->redis->rPush ( $queryKey, serialize($msg) ); } unset ( $cachedData ); } /** * 從隊列阻塞式出棧一個最高優先級消息 * * @return string msg */ public function bPop() { $queryKeys = array (); for($score = self::$MAX_SCORE; $score >= self::MIN_SCORE; $score --) { $queryKeys [] = self::$KEY_QUERY_PREFIX . $score; } $msg = @$this->redis->blPop ( $queryKeys, 0 ); return $msg [1]; } private function __construct($redis, $query) { $this->redis = $redis; // $this->redis->connect (); self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1; $this->selectQuery ( $query ); } public function __destruct() { if ($this->redis) $this->redis->close (); } } ?>
主體沒有任何改變, 依然是2個set作緩衝, 多個list作實際隊列的實現, 只是擴展爲支持多個隊列的實現.php
其它隊列使用中遇到過的問題:mysql
1 feed程序由於長時間阻塞而斷開了和redis的鏈接redis
解決方法: 加入代碼sql
ini_set('default_socket_timeout', -1);
以上代碼可讓php進程和redis經過socket長鏈接socket
2 feed程序由於長時間阻塞而斷開了和mysql的鏈接ui
解決方法: 查看mysql驅動使用的是哪一種, 咱們項目中用的是mysqli鏈接, 具體驅動是mysqlnd, 若是是mysqld, 最簡單的方式是先設置php變量mysqli.reconnect=1再加入ping命令this
但有個問題是mysqlnd驅動下ping命令並不能自動重連, 即便改了php變量也沒用, 只能顯示進行重連, 以下spa
if(!$this->connection->ping()){ //mysql 鏈接丟失且mysqlnd不會自動重連, 手動重連 $this->connectDB(); }
3 從新發布/變動了feed程序可能須要重啓進程加載新代碼線程
最後附上一個隊列分發和清理任務腳本, 腳本中多進程分發的代碼存在問題, 會產生不少殭屍進程, 目前僅使用單進程版本, 處理時間爲天天凌晨調用clean腳本清理前一天的隊列, 分發程序爲每5分鐘執行一次, 具體執行間隔視狀況調整rest
<?php class ControllerMsgQuery extends Controller { /** * 處理隊列緩衝區的數據, 移動到隊列 */ public function index() { //多進程版本存在問題, 先使用單進程版本 $this->dealWithSingleProcess(); } private function dealWithMuitlProcess(){ set_time_limit ( 0 ); global $global; $this->load->ventor ( 'msg_query/MsgQuery' ); $querys = MsgQuery::getQuerys (); $redis = new Redis (); $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] ); foreach ( $querys as $query ) { // 每一個query用獨立子線程完成move to list的工做 $pid = pcntl_fork (); if ($pid > 0) { // 父進程 } else if ($pid == 0) { // 子進程 $MsgQuery = MsgQuery::getInstance ( $redis, $query ); $MsgQuery->moveToQuery (); exit(); } else { exit(); } } // 父進程等待子進程以釋放資源 // if ($pid > 0){ pcntl_wait ( $status ); // } } private function dealWithSingleProcess(){ set_time_limit ( 0 ); global $global; $this->load->ventor ( 'msg_query/MsgQuery' ); $querys = MsgQuery::getQuerys (); $redis = new Redis (); $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] ); foreach ( $querys as $query ) { $MsgQuery = MsgQuery::getInstance ( $redis, $query ); $MsgQuery->moveToQuery (); } } /** * 清理已通過期的query數據 */ public function clean() { set_time_limit ( 0 ); global $global; $this->load->ventor ( 'msg_query/MsgQuery' ); $querys = MsgQuery::getQuerys (); $redis = new Redis (); $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] ); foreach ( $querys as $query ) { $MsgQuery = MsgQuery::getInstance ( $redis, $query ); $MsgQuery->clean (); } } }