Redis 實現隊列

場景說明:

  • 用於處理比較耗時的請求,例如批量發送郵件,若是直接在網頁觸發執行發送,程序會出現超時
  • 高併發場景,當某個時刻請求瞬間增長時,能夠把請求寫入到隊列,後臺在去處理這些請求
  • 搶購場景,先入先出的模式

命令:

rpush + blpop 或 lpush + brpopphp

rpush : 往列表右側推入數據
blpop : 客戶端阻塞直到隊列有值輸出

簡單隊列:

simple.php

$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
    $redis->rPush('goods:task', json_encode($row));
}

$redis->close();

獲取20000萬個商品,並把json化後的數據推入goods:task隊列git

queueBlpop.php

// 出隊
while (true) {
    // 阻塞設置超時時間爲3秒
    $task = $redis->blPop(array('goods:task'), 3);
    if ($task) {
        $redis->rPush('goods:success:task', $task[1]);
        $task = json_decode($task[1], true);
        echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
        echo PHP_EOL;
    } else {
        echo 'nothing' . PHP_EOL;
        sleep(5);
    }
}

設置blpop阻塞時間爲3秒,當有數據出隊時保存到goods:success:task表示執行成功,當隊列沒有數據時,程序睡眠10秒從新檢查goods:task是否有數據出隊github

cli 模式執行命令:
php simple.php
php queueBlpop.php

優先級隊列

思路:

blpop 有多個鍵時,blpop會從左至右遍歷鍵,一旦一個鍵能彈出元素,客戶端當即返回。例如:redis

blpop key1 key2 key3 key4

從key1到key4遍歷,若是哪一個key有值,則彈出這個值,若多個key同時有值時,優先彈出排在左邊的key。json

priority.php

// 設置優先級隊列
$high = 'goods:high:task';
$mid = 'goods:mid:task';
$low = 'goods:low:task';

$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
    // cid 小於100放在低級隊列
    if ($row['cid'] < 100) {
        $redis->rPush($low, json_encode($row));
    }
    // cid 100到600之間放在中級隊列
    elseif ($row['cid'] > 100 && $row['cid'] < 600) {
        $redis->rPush($mid, json_encode($row));
    }
    // cid 大於600放在高級隊列 
    else {
        $redis->rPush($high, json_encode($row));
    }
}
$redis->close();

priorityBlop.php

// 優先級隊列
$high = 'goods:high:task';
$mid = 'goods:mid:task';
$low = 'goods:low:task';

// 出隊
while(true){
    // 優先級高的隊列放在左側
    $task = $redis->blPop(array($high, $mid, $low), 3);
    if ($task) {
        $task = json_decode($task[1], true);
        echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
        echo PHP_EOL;
    } else {
        echo 'nothing' . PHP_EOL;
        sleep(5);
    }
}

優先級高的隊列放在blpop命令左側,依次排序,blpop命令會依次彈出high, mid, low隊列的值bash

cli 模式執行命令:
php priority.php
php priorityBlpop.php

延遲隊列

思路:

能夠用一個有序集合來保存延遲任務,member保存任務內容,score保存(當前時間 + 延時時間)。用時間做爲score。程序只要用有序集合的第一條任務的score和當前時間作比較,若是當前時間比score小,說明有序集合的全部任務還沒到執行時間。併發

delay.php

$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
    $redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row));
}

將20萬條任務導入有序集合goods:delay:task,全部任務延遲到以後的1秒到300秒內執行ide

delayHandle.php

while (true) {
    // 由於是有序集合,只要判斷第一條記錄的延時時間,例如第一條未到執行時間
    // 相對說明集合的其餘任務未到執行時間
    $rs = $redis->zRange('goods:delay:task', 0, 0, true);
    // 集合沒有任務,睡眠時間設置爲5秒
    if (empty($rs)) {
        echo 'no tasks , sleep 5 seconds' . PHP_EOL;
        sleep(5);
        continue;
    }

    $taskJson = key($rs);
    $delay = $rs[$taskJson];
    $task = json_decode($taskJson, true);
    $now = time();

    // 到時間執行延時任務
    if ($delay <= $now) {
        // 對當前任務加鎖,避免移動移動延時任務到任務隊列時被其餘客戶端修改
        if (!($identifier = acquireLock($task['id']))) {
            continue;
        }

        // 移動延時任務到任務隊列
        $redis->zRem('goods:delay:task', $taskJson);
        $redis->rPush('goods:task', $taskJson);
        echo $task['id'] . ' run ' . PHP_EOL;

        // 釋放鎖
        releaseLock($task['id'], $identifier);
    } else {
        // 延時任務未到執行時間
        $sleep = $delay - $now;
        // 最大值設置爲2秒,保證若是有新的任務(延時時間1秒)進入集合時可以及時的被處理
//        $sleep = $sleep > 2 ? 2 :$sleep;
        echo 'wait ' . $sleep . ' seconds ' . PHP_EOL;
        sleep($sleep);
    }
}

這個文件對有序集合內的延遲任務作處理,若是延遲任務到了執行時間,則把延遲任務移動到任務隊列中高併發

queueBlpop.php

// 出隊
while (true) {
    // 阻塞設置超時時間爲3秒
    $task = $redis->blPop(array('goods:task'), 3);
    if ($task) {
        $redis->rPush('goods:success:task', $task[1]);
        $task = json_decode($task[1], true);
        echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
        echo PHP_EOL;
    } else {
        echo 'nothing' . PHP_EOL;
        sleep(5);
    }
}

處理任務隊列中的任務fetch

cli模式下執行命令:
php delay.php
php delayHanlde.php
php queueBlpop.php

完整代碼:https://github.com/wuzhc/demo...
相關項目: https://github.com/wuzhc/gmq

相關文章
相關標籤/搜索