Laravel 基於redis隊列的解析

Last-Modified: 2019年5月10日15:04:22php

參考連接

本文環境

  • Laravel 5.5
  • 隊列 Redis

爲何使用隊列

使用隊列的目的通常是:html

  1. 異步執行
  2. 出錯重試

解釋一下:mysql

異步執行: 部分代碼執行很耗時, 爲了提升響應速度及避免佔用過多鏈接資源, 能夠將這部分代碼放到隊列中異步執行.laravel

Eg. 網站新用戶註冊後, 須要發送歡迎的郵件, 涉及到網絡IO沒法控制耗時的這一類就很適合放到隊列中來執行.

出錯重試: 爲了保證一些任務的正常執行, 能夠將任務放到隊列中執行, 若執行出錯則能夠延遲一段時間後重試, 直到任務處理成功或出錯超過N次後取消執行.redis

Eg. 用戶須要綁定手機號, 此時發送短信的接口是依賴第三方, 一個是不肯定耗時, 一個是不肯定調用的成功, 爲了保證調用成功, 必然須要在出錯後重試

Laravel 中的隊列

如下分析默認使用的隊列及其配置以下sql

  • 默認隊列引擎: redisjson

    經過在 redis-cli 中使用 monitor 命令查看具體執行的命令語句
  • 默認隊列名: default

分發任務

此處以分發 異步通知(class XxxNotification implement ShouldQueue)爲例.網絡

在Laravel中發起異步通知時, Laravel 會往redis中的任務隊列添加一條新任務併發

redis 執行語句app

redis> RPUSH queues:default

{
    "displayName": "App\\Listeners\\RebateEventListener",
    "job": "Illuminate\\Queue\\CallQueuedHandler@call",
    "maxTries": null,
    "timeout": null,
    "timeoutAt": null,
    "data": {
        "commandName": "Illuminate\\Events\\CallQueuedListener",
        "command": "O:36:\"Illuminate\\Events\\CallQueuedListener\":7:{s:5:\"class\";s:33:\"App\\Listeners\\RebateEventListener\";s:6:\"method\";s:15:\"onRebateCreated\";s:4:\"data\";a:1:{i:0;O:29:\"App\\Events\\RebateCreatedEvent\":4:{s:11:\"\u0000*\u0000tbkOrder\";O:45:\"Illuminate\\Contracts\\Database\\ModelIdentifier\":3:{s:5:\"class\";s:19:\"App\\Models\\TbkOrder\";s:2:\"id\";i:416;s:10:\"connection\";s:5:\"mysql\";}s:15:\"\u0000*\u0000notifyAdmins\";b:1;s:13:\"\u0000*\u0000manualBind\";b:0;s:6:\"socket\";N;}}s:5:\"tries\";N;s:9:\"timeoutAt\";N;s:7:\"timeout\";N;s:6:\"\u0000*\u0000job\";N;}"
    },
    "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB",
    "attempts": 0
}

上面的redis語句是將任務信息(json格式) rpush 到 redis 隊列 queues:default 的尾部.

任務隊列 Worker

Laravel 處理任務隊列的進程開啓方式: php artisan queue:work, 爲了更好的觀察, 這裏使用 --once 選項來指定隊列中的單一任務進行處理, 具體的更多參數請自行參考文檔

php artisan queue:work --once --delay=1 --tries=3

上述執行語句參數含義:

  1. --once 僅執行一次任務, 默認是常駐進程一直執行
  2. --tries=3 任務出錯最多重試3次, 默認是無限制重試
  3. --delay=1 任務出錯後, 每次延遲1秒後再次執行, 默認是延遲0秒

當 Worker 啓動時, 它依次執行以下步驟:

此處仍以默認隊列 default 爲例講解, 且 只講解redis的相關操做
  1. queues:default:delayed 有序集合中獲取能夠處理的 "延遲任務", 並 rpushqueue:default隊列的尾部

    具體的執行語句:

    redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當前時間戳

    Lua 腳本內容以下:

    -- Get all of the jobs with an expired \"score\"...
    local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
    
    -- If we have values in the array, we will remove them from the first queue
    -- and add them onto thedestination queue in chunks of 100, which moves
    -- all of the appropriate jobs onto the destination queue very safely.
    if(next(val) ~= nil) then
        redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
    
        for i = 1, #val, 100 do
            redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
        end
    end
    
    return val
  2. queue:default:reserved有序集合中獲取已過時的 "reserved 任務", 並 rpushqueue:default隊列的尾部

    具體的執行語句:

    redis> eval "Lua腳本" 2 queues:default:reserved queues:default 當前時間戳

    使用的Lua腳本同步驟 1

  3. queue:default 隊列中獲取(lpop)一個任務, 增長其 attempts 次數, 並將該任務保存到 queu:default:reserved 有序集合中, 該任務的 score 值爲 當前時間 + 90(任務執行超時時間)

    具體的執行語句:

    redis> eval 「Lua腳本」 2 queues:default queues:default:reserved 任務超時時間戳

    Lua腳本

    -- Pop the first job off of the queue...
    local job = redis.call('lpop', KEYS[1])
    local reserved = false
    
    if(job ~= false) then
        -- Increment the attempt count and place job on the reserved queue...
        reserved = cjson.decode(job)
        reserved['attempts'] = reserved['attempts'] + 1
        reserved = cjson.encode(reserved)
        redis.call('zadd', KEYS[2], ARGV[1], reserved)
    end
    
    return {job, reserved}
    這裏的 90 是根據配置而定: config('queue.connections.redis.retry_after')

    若預計任務耗時太久, 則應增長該數值, 防止任務還在執行時就被重置

  4. 在成功執行上面獲取的任務後, 就將該任務從 queues:default:reserved 隊列中移除掉

    具體執行語句: ZREM queues:default:reserved "具體任務"

  5. 若是執行任務失敗, 此時分爲2種狀況:

    1. 任務失敗次數未達到指定的重試次數閥值

      將該任務從 queues:default:reserved 中移除, 並將該任務添加到 queue:default:delayed 有序集合中, score 爲該任務下一次執行的時間戳

      執行語句:

      redis> EVAL "Lua腳本" 2 queues:default:delayed queues:default:reserved "失敗的任務" 任務延遲執行的時間戳

      Lua腳本

      -- Remove the job from the current queue...
      redis.call('zrem', KEYS[2], ARGV[1])
      
      -- Add the job onto the \"delayed\" queue...
      redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
      
      return true
    2. 若是任務失敗次數超過指定的重試閥值

      將該任務從 queue:default:reserved 中移除

      執行語句:

      redis> ZREM queue:default:reserved

注意, 上述使用 Lua 腳本的目的在於操做的原子性, Redis 是單進程單線程模式, 以Lua腳本形式執行命令時能夠確保執行腳本的原子性, 而不會有併發問題.

關於Redis的原子操做

上面 Laravel 使用redis做爲隊列存儲引擎時, 在操做redis時使用到了 exec 執行Lua腳本, 以確保原子性.

這裏給不熟悉redis的同窗簡單講一下.

以上面 Worker 啓動時的步驟1爲例:

queues:default:delayed 有序集合中獲取能夠處理的 "延遲任務", 並 rpushqueue:default隊列的尾部

具體的執行語句:

redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當前時間戳

Lua 腳本內容以下:

-- Get all of the jobs with an expired \"score\"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto thedestination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end

return val

上述步驟首先從 queues:default:delayed 有序集合中獲取能夠處理的 "延遲任務" 並 rpushqueue:default隊列的尾部.

那麼若是不使用Lua腳本的話, 通常作法會是以下:

$jobs = $redis->zRangeByScore("queues:default:delayed", "-inf", time())
if (!empty($jobs)) {
    $redis->zRem("queues:default:delayed", ...$jobs);
    $redis->rPush("queues:default", ...$jobs);   
}

若是是單個Worker的話, 上述腳本不會有問題, 可是若是有多個Worker呢? 在php層面上執行上述操做是會有併發問題的.

Worker_1 和 Worker_2 從 queues:default:delayed 隊列中獲取多個任務後, 執行 rPush 語句會致使任務被執行2次, 若是有多個 Worker 甚至會執行更屢次.

只要是有可能引發併發問題的狀況, 那麼就必定會發生.

以 分佈式鎖 爲例

鎖的兩大基本操做:

  • Lock
  • Unlock

Lock 操做

// 生成惟一的鎖id
$identifier = uniqid(php_uname("n") . "_", true);
// 僅在該key不存在時設置, 過時時間5秒
$result = $redis->set("lock_key", $identifier, ["NX", "EX" => 5]);

Unlock 操做

$script = <<<LUA
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end
LUA;

// 此處的 $identifier 必須和 lock 時的鎖id一致
$result = $redis->evaluate($script, ["lock_key", $identifier], 1);

至於 Unlock 操做爲何要這麼麻煩, 能夠看一下如下兩種有問題的方案, 再想想.

有問題的方案一

$redis->del("lock_key");

有問題的方案二

if ($redis->get("lock_key") == $identifier) {
    $redis->del("lock_key");
}
相關文章
相關標籤/搜索