爲何 Laravel 會重複執行同一個隊列任務?

(原文連接:https://blog.tanteng.me/2017/...php

在 Laravel 中使用 Redis 處理隊列任務,框架提供的功能很是強大,可是最近遇到一個問題,就是發現一個任務被屢次執行,這是爲何呢?laravel

先說緣由:由於在 Laravel 中若是一個隊列(任務)執行時間大於 60 秒,就會被認爲執行失敗並從新加入隊列中,這樣就會致使重複執行同一個任務。redis

這個任務的邏輯就是給用戶推送內容,須要根據隊列內容取出用戶並遍歷,經過請求後端 HTTP 接口發送。好比有 10000 個用戶,在用戶數量多或接口處理速度沒那麼快的狀況下,執行時間確定會大於 60 秒,因而這個任務就被從新加入隊列。狀況更糟糕一點,前面的任務若是都沒有在 60 秒執行完,就都會從新加入隊列,這樣同一個任務就不止重複執行一次了,而是屢次。json

下面從 Laravel 源代碼找一下罪魁禍首。後端

源代碼文件:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php框架

/**
 * The expiration time of a job.
 *
 * @var int|null
 */
protected $expire = 60;

這個 $expire 成員變量是一個固定的值,Laravel 認爲一個隊列再怎麼 60 秒也該執行完了吧。取隊列方法:this

public function pop($queue = null)
{
    $original = $queue ?: $this->default;
 
    $queue = $this->getQueue($queue);
 
    $this->migrateExpiredJobs($queue.':delayed', $queue);
 
    if (! is_null($this->expire)) {
        $this->migrateExpiredJobs($queue.':reserved', $queue);
    }
 
    list($job, $reserved) = $this->getConnection()->eval(
        LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
    );
 
    if ($reserved) {
        return new RedisJob($this->container, $this, $job, $reserved, $original);
    }
}

取隊列有幾步操做,由於隊列執行失敗,或執行超時等都會放入另外的集合保存起來,以便重試,過程以下:lua

1.把因執行失敗的隊列從 delayed 集合從新 rpush 到當前執行的隊列中。code

2.把因執行超時的隊列從 reserved 集合從新 rpush 到當前執行的隊列中。blog

3.而後纔是從隊列中取任務開始執行,同時把隊列放入 reserved 的有序集合。

這裏使用了 eval 命令執行這個過程,用到了幾個 lua 腳本。

從要執行的隊列中取任務:

local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}

能夠看到 Laravel 在取 Redis 要執行的隊列的時候,同時會放一份到一個有序集合中,並使用過時時間戳做爲分值。

只有當這個任務完成後,再把有序集合中這個任務移除。從這個有序集合移除隊列的代碼就省略,咱們看一下 Laravel 如何處理執行時間大於 60 秒的隊列。

也就是這段 lua 腳本執行的操做:

local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
    for i = 1, #val, 100 do
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end
return true

這裏 zrangebyscore 找出分值從無限小到當前時間戳的元素,也就是 60 秒以前加入到集合的任務,而後經過 zremrangebyrank 從集合移除這些元素並 rpush 到隊列中。

看到這裏應該就恍然大悟了。

若是一個隊列 60 秒沒執行完,那麼進程在取隊列的時候從 reserved 集合中把這些任務又從新 rpush 到隊列中。

相關文章
相關標籤/搜索