rpush + blpop 或 lpush + brpopphp
rpush : 往列表右側推入數據
blpop : 客戶端阻塞直到隊列有值輸出
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000'); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row)); } $redis->close();
獲取20000萬個商品,並把json化後的數據推入goods:task隊列git
// 出隊 while (true) { // 阻塞設置超時時間爲3秒 $task = $redis->blPop(array('goods:task'), 3); if ($task) { $redis->rPush('goods:success:task', $task[1]); $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); } }
設置blpop阻塞時間爲3秒,當有數據出隊時保存到goods:success:task表示執行成功,當隊列沒有數據時,程序睡眠10秒從新檢查goods:task是否有數據出隊github
php simple.php php queueBlpop.php
blpop 有多個鍵時,blpop會從左至右遍歷鍵,一旦一個鍵能彈出元素,客戶端當即返回。例如:redis
blpop key1 key2 key3 key4
從key1到key4遍歷,若是哪一個key有值,則彈出這個值,若多個key同時有值時,優先彈出排在左邊的key。json
// 設置優先級隊列 $high = 'goods:high:task'; $mid = 'goods:mid:task'; $low = 'goods:low:task'; $stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000'); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { // cid 小於100放在低級隊列 if ($row['cid'] < 100) { $redis->rPush($low, json_encode($row)); } // cid 100到600之間放在中級隊列 elseif ($row['cid'] > 100 && $row['cid'] < 600) { $redis->rPush($mid, json_encode($row)); } // cid 大於600放在高級隊列 else { $redis->rPush($high, json_encode($row)); } } $redis->close();
// 優先級隊列 $high = 'goods:high:task'; $mid = 'goods:mid:task'; $low = 'goods:low:task'; // 出隊 while(true){ // 優先級高的隊列放在左側 $task = $redis->blPop(array($high, $mid, $low), 3); if ($task) { $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); } }
優先級高的隊列放在blpop命令左側,依次排序,blpop命令會依次彈出high, mid, low隊列的值bash
php priority.php php priorityBlpop.php
能夠用一個有序集合來保存延遲任務,member保存任務內容,score保存(當前時間 + 延時時間)。用時間做爲score。程序只要用有序集合的第一條任務的score和當前時間作比較,若是當前時間比score小,說明有序集合的全部任務還沒到執行時間。併發
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000'); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row)); }
將20萬條任務導入有序集合goods:delay:task,全部任務延遲到以後的1秒到300秒內執行ide
while (true) { // 由於是有序集合,只要判斷第一條記錄的延時時間,例如第一條未到執行時間 // 相對說明集合的其餘任務未到執行時間 $rs = $redis->zRange('goods:delay:task', 0, 0, true); // 集合沒有任務,睡眠時間設置爲5秒 if (empty($rs)) { echo 'no tasks , sleep 5 seconds' . PHP_EOL; sleep(5); continue; } $taskJson = key($rs); $delay = $rs[$taskJson]; $task = json_decode($taskJson, true); $now = time(); // 到時間執行延時任務 if ($delay <= $now) { // 對當前任務加鎖,避免移動移動延時任務到任務隊列時被其餘客戶端修改 if (!($identifier = acquireLock($task['id']))) { continue; } // 移動延時任務到任務隊列 $redis->zRem('goods:delay:task', $taskJson); $redis->rPush('goods:task', $taskJson); echo $task['id'] . ' run ' . PHP_EOL; // 釋放鎖 releaseLock($task['id'], $identifier); } else { // 延時任務未到執行時間 $sleep = $delay - $now; // 最大值設置爲2秒,保證若是有新的任務(延時時間1秒)進入集合時可以及時的被處理 // $sleep = $sleep > 2 ? 2 :$sleep; echo 'wait ' . $sleep . ' seconds ' . PHP_EOL; sleep($sleep); } }
這個文件對有序集合內的延遲任務作處理,若是延遲任務到了執行時間,則把延遲任務移動到任務隊列中高併發
// 出隊 while (true) { // 阻塞設置超時時間爲3秒 $task = $redis->blPop(array('goods:task'), 3); if ($task) { $redis->rPush('goods:success:task', $task[1]); $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); } }
處理任務隊列中的任務fetch
php delay.php php delayHanlde.php php queueBlpop.php
完整代碼:https://github.com/wuzhc/demo...
相關項目: https://github.com/wuzhc/gmq