Last-Modified: 2019年5月10日15:04:22php
使用隊列的目的通常是:html
解釋一下:mysql
異步執行: 部分代碼執行很耗時, 爲了提升響應速度及避免佔用過多鏈接資源, 能夠將這部分代碼放到隊列中異步執行.laravel
Eg. 網站新用戶註冊後, 須要發送歡迎的郵件, 涉及到網絡IO沒法控制耗時的這一類就很適合放到隊列中來執行.
出錯重試: 爲了保證一些任務的正常執行, 能夠將任務放到隊列中執行, 若執行出錯則能夠延遲一段時間後重試, 直到任務處理成功或出錯超過N次後取消執行.redis
Eg. 用戶須要綁定手機號, 此時發送短信的接口是依賴第三方, 一個是不肯定耗時, 一個是不肯定調用的成功, 爲了保證調用成功, 必然須要在出錯後重試
如下分析默認使用的隊列及其配置以下sql
默認隊列引擎: redis
json
經過在redis-cli
中使用monitor
命令查看具體執行的命令語句
default
此處以分發 異步通知(class XxxNotification implement ShouldQueue
)爲例.網絡
在Laravel中發起異步通知時, Laravel 會往redis中的任務隊列添加一條新任務併發
redis 執行語句app
redis> RPUSH queues:default { "displayName": "App\\Listeners\\RebateEventListener", "job": "Illuminate\\Queue\\CallQueuedHandler@call", "maxTries": null, "timeout": null, "timeoutAt": null, "data": { "commandName": "Illuminate\\Events\\CallQueuedListener", "command": "O:36:\"Illuminate\\Events\\CallQueuedListener\":7:{s:5:\"class\";s:33:\"App\\Listeners\\RebateEventListener\";s:6:\"method\";s:15:\"onRebateCreated\";s:4:\"data\";a:1:{i:0;O:29:\"App\\Events\\RebateCreatedEvent\":4:{s:11:\"\u0000*\u0000tbkOrder\";O:45:\"Illuminate\\Contracts\\Database\\ModelIdentifier\":3:{s:5:\"class\";s:19:\"App\\Models\\TbkOrder\";s:2:\"id\";i:416;s:10:\"connection\";s:5:\"mysql\";}s:15:\"\u0000*\u0000notifyAdmins\";b:1;s:13:\"\u0000*\u0000manualBind\";b:0;s:6:\"socket\";N;}}s:5:\"tries\";N;s:9:\"timeoutAt\";N;s:7:\"timeout\";N;s:6:\"\u0000*\u0000job\";N;}" }, "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB", "attempts": 0 }
上面的redis語句是將任務信息(json格式) rpush
到 redis 隊列 queues:default
的尾部.
Laravel 處理任務隊列的進程開啓方式: php artisan queue:work
, 爲了更好的觀察, 這裏使用 --once
選項來指定隊列中的單一任務進行處理, 具體的更多參數請自行參考文檔
php artisan queue:work --once --delay=1 --tries=3
上述執行語句參數含義:
--once
僅執行一次任務, 默認是常駐進程一直執行--tries=3
任務出錯最多重試3次, 默認是無限制重試--delay=1
任務出錯後, 每次延遲1秒後再次執行, 默認是延遲0秒當 Worker 啓動時, 它依次執行以下步驟:
此處仍以默認隊列
default
爲例講解, 且
只講解redis的相關操做
從 queues:default:delayed
有序集合中獲取能夠處理的 "延遲任務", 並 rpush
到 queue:default
隊列的尾部
具體的執行語句:
redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當前時間戳
Lua 腳本內容以下:
-- Get all of the jobs with an expired \"score\"... local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto thedestination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return val
從 queue:default:reserved
有序集合中獲取已過時的 "reserved 任務", 並 rpush
到 queue:default
隊列的尾部
具體的執行語句:
redis> eval "Lua腳本" 2 queues:default:reserved queues:default 當前時間戳
使用的Lua腳本同步驟 1
從 queue:default
隊列中獲取(lpop
)一個任務, 增長其 attempts
次數, 並將該任務保存到 queu:default:reserved
有序集合中, 該任務的 score
值爲 當前時間 + 90(任務執行超時時間)
具體的執行語句:
redis> eval 「Lua腳本」 2 queues:default queues:default:reserved 任務超時時間戳
Lua腳本
-- Pop the first job off of the queue... local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) end return {job, reserved}
這裏的 90 是根據配置而定:config('queue.connections.redis.retry_after')
若預計任務耗時太久, 則應增長該數值, 防止任務還在執行時就被重置
queues:default:reserved
隊列中移除掉具體執行語句: ZREM queues:default:reserved "具體任務"
若是執行任務失敗, 此時分爲2種狀況:
任務失敗次數未達到指定的重試次數閥值
將該任務從 queues:default:reserved
中移除, 並將該任務添加到 queue:default:delayed
有序集合中, score
爲該任務下一次執行的時間戳
執行語句:
redis> EVAL "Lua腳本" 2 queues:default:delayed queues:default:reserved "失敗的任務" 任務延遲執行的時間戳
Lua腳本
-- Remove the job from the current queue... redis.call('zrem', KEYS[2], ARGV[1]) -- Add the job onto the \"delayed\" queue... redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) return true
若是任務失敗次數超過指定的重試閥值
將該任務從 queue:default:reserved
中移除
執行語句:
redis> ZREM queue:default:reserved
注意, 上述使用 Lua 腳本的目的在於操做的原子性, Redis 是單進程單線程模式, 以Lua腳本形式執行命令時能夠確保執行腳本的原子性, 而不會有併發問題.
上面 Laravel 使用redis做爲隊列存儲引擎時, 在操做redis時使用到了 exec
執行Lua腳本, 以確保原子性.
這裏給不熟悉redis的同窗簡單講一下.
以上面 Worker 啓動時的步驟1爲例:
從
queues:default:delayed
有序集合中獲取能夠處理的 "延遲任務", 並rpush
到queue:default
隊列的尾部具體的執行語句:
redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當前時間戳Lua 腳本內容以下:
-- Get all of the jobs with an expired \"score\"... local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto thedestination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return val
上述步驟首先從 queues:default:delayed
有序集合中獲取能夠處理的 "延遲任務" 並 rpush
到 queue:default
隊列的尾部.
那麼若是不使用Lua腳本的話, 通常作法會是以下:
$jobs = $redis->zRangeByScore("queues:default:delayed", "-inf", time()) if (!empty($jobs)) { $redis->zRem("queues:default:delayed", ...$jobs); $redis->rPush("queues:default", ...$jobs); }
若是是單個Worker的話, 上述腳本不會有問題, 可是若是有多個Worker呢? 在php層面上執行上述操做是會有併發問題的.
Worker_1 和 Worker_2 從 queues:default:delayed
隊列中獲取多個任務後, 執行 rPush
語句會致使任務被執行2次, 若是有多個 Worker 甚至會執行更屢次.
只要是有可能引發併發問題的狀況, 那麼就必定會發生.
鎖的兩大基本操做:
Lock 操做
// 生成惟一的鎖id $identifier = uniqid(php_uname("n") . "_", true); // 僅在該key不存在時設置, 過時時間5秒 $result = $redis->set("lock_key", $identifier, ["NX", "EX" => 5]);
Unlock 操做
$script = <<<LUA if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end LUA; // 此處的 $identifier 必須和 lock 時的鎖id一致 $result = $redis->evaluate($script, ["lock_key", $identifier], 1);
至於 Unlock 操做爲何要這麼麻煩, 能夠看一下如下兩種有問題的方案, 再想想.
有問題的方案一
$redis->del("lock_key");
有問題的方案二
if ($redis->get("lock_key") == $identifier) { $redis->del("lock_key"); }