Laravel5.2隊列驅動expire參數設置帶來的重複執行問題 數據庫驅動

'connections' => [
    ....
        'database' => [
            'driver' => 'database',
            'table' => 'jobs',
            'queue' => 'default',
            'expire' => 60,
        ],
        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
            'queue' => 'default',
            'expire' => 180,
        ],
    ....
    ],

Laravel5.2隊列驅動config/queue.php配置文件,「database」和「redis」有個expire參數,手冊上解釋是「隊列任務過時時間(秒)」,默認爲60秒。php

(注:5.2和以後的配置文件發生了變化,改成'retry_after' 參數,具體見手冊)html

網上搜了一下這個配置,沒有太多說明,可是實際使用的過程當中,發現對於執行時間超過expire設置時間的隊列進程,還有使用隊列進行分佈式程序部署,這個參數和這種設計模式是個大坑。。。mysql

發現這個問題是想使用分佈式程序部署處理隊列,兩臺服務器部署Laravel框架artisan腳本,鏈接一個MYSQL數據庫,使用一張jobs隊列表。redis

部署的後,分別啓動兩臺服務器的腳本,發現後執行的腳本,在隊列驅動中取數據,如MYSQL的jobs表,遇到先執行的腳本隊列數據時不會跳過,而是把這條數據視爲Failed,儲存一條新數據到failed_jobs表(Laravel隊列失敗時會將隊列數據儲存到failed_jobs表),形成數據重複。sql

以前在一臺服務器啓動3個進程執行腳本,並不會發生這種錯誤,後執行的腳本不會取得前一個進程的隊列數據,更不會判斷成Failed,多服務處理時是什麼緣由形成隊列驅動中的數據錯誤呢?shell

根據隊列執行的流程,程序執行時,隊列到隊列驅動中取任務,得到任務的過程隊列驅動應該作事物處理,這樣第二個進程取任務會跳過正在執行的隊列數據。數據庫

查了一些資料,瞭解了Laravel隊列的原理,最後還得看Queue的源碼。設計模式

Laravel的Queue的源碼都在IlluminateQueue目錄下。服務器

先分析以MYSQL爲驅動的jobs表:併發

CREATE TABLE `jobs` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) COLLATE utf8_unicode_ci NOT NULL,
  `payload` longtext COLLATE utf8_unicode_ci NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`),
  KEY `jobs_queue_reserved_reserved_at_index` (`queue`,`reserved`,`reserved_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

手冊中主要介紹了隊列任務的保存,payload字段儲存的是序列化後的任務,Laravel隊列能夠將數據模型序列化,執行時候隊列系統會自動從數據庫中獲取整個模型實例,具體說明見手冊。

可是其餘幾個狀態和時間字段纔是保證隊列事物處理的關鍵字段。

「attempts」執行次數,「reserved」執行的狀態,「reserved_at」執行開始時間,‘available_at’預訂執行時間,‘created_at’是隊列建立時間。

監聽事件的腳本有Listener.php和Worker.php兩個腳本,看源碼說明Listener能夠處理指定的隊列,connection參數,可是實際上最後都是經過work來處理隊列的。Laravel5.4已經取消了queue:listen參數,都用queue:work來執行。不過我這裏說的是Laravel5.2的問題,不知道是否是下面的緣由使Laravel優化去掉了listen。

繼續分析隊列處理的Worker類的源碼,取隊列數據時用pop方法,這個方法會根據傳遞的驅動類型如database或redis,調用該驅動的pop方法。

$connection = $this->manager->connection($connectionName);
$job = $this->getNextJob($connection, $queue);
// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out. If there is no job on the queue
// we will "sleep" the worker for the specified number of seconds.
if (! is_null($job)) {
    return $this->process(
        $this->manager->getName($connectionName), $job, $maxTries, $delay
    );
}

下面是DatabaseQueue.php的pop方法。

/**
 * Pop the next job off of the queue.
 *
 * @param  string  $queue
 * @return \Illuminate\Contracts\Queue\Job|null
 */
public function pop($queue = null)
{
    $queue = $this->getQueue($queue);
    $this->database->beginTransaction();
    if ($job = $this->getNextAvailableJob($queue)) {
        $job = $this->markJobAsReserved($job);
        $this->database->commit();
        return new DatabaseJob(
            $this->container, $this, $job, $queue
        );
    }
    $this->database->commit();
}

取數據的過程事物處理已經打開。

取隊列數據的核心仍是$this->getNextAvailableJob($queue)。

打開sql日誌,看看隊列數據是如何查詢出來的。

/**
 * Get the next available job for the queue.
 *
 * @param  string|null  $queue
 * @return \StdClass|null
 */
protected function getNextAvailableJob($queue)
{
    $this->database->enableQueryLog();
    $job = $this->database->table($this->table)
                ->lockForUpdate()
                ->where('queue', $this->getQueue($queue))
                ->where(function ($query) {
                    $this->isAvailable($query);
                    $this->isReservedButExpired($query);
                })
                ->orderBy('id', 'asc')
                ->first();
    var_dump($this->database->getQueryLog());
    return $job ? (object) $job : null;
}

array(1) {
  [0] =>
  array(3) {
    'query' =>
    string(165) "select * from `jobs` where `queue` = ? and ((`reserved` = ? and `available_at` <= ?) or (`reserved` = ? and `reserved_at` <= ?)) order by `id` asc limit 1 for update"
    'bindings' =>
    array(5) {
      [0] =>
      string(7) "default"
      [1] =>
      int(0)
      [2] =>
      int(1493634233)
      [3] =>
      int(1)
      [4] =>
      int(1493634173)
    }
    'time' =>
    double(1.55)
  }

從sql語句中能夠看出,取隊列數據有兩個條件

reserved爲0時,available_at時間小於當前時間,這個條件是待執行的隊列;reserved爲1時,reserved_at執行開始時間小於計算出的時間($this->isReservedButExpired),即當前時間減去超時秒Carbon::now()->subSeconds($this->expire)->getTimestamp(),這個條件是判斷隊列任務是否過時。

整個select過程是 「for update」的,有排他鎖。

取得符合條件的隊列後

/**
 * Mark the given job ID as reserved.
 *
 * @param \stdClass $job
 * @return \stdClass
 */
protected function markJobAsReserved($job)
{
    $job->reserved = 1;
    $job->attempts = $job->attempts + 1;
    $job->reserved_at = $this->getTime();
    $this->database->table($this->table)->where('id', $job->id)->update([
        'reserved' => $job->reserved,
        'reserved_at' => $job->reserved_at,
        'attempts' => $job->attempts,
    ]);
    return $job;
}

程序會更新該條數據,而且更新完後即commit。

同一服務器,第二個進程取數據時候遇到悲觀鎖,須要等第一個進程取數據更新reserved和時間後執行。也就是說Laravel隊列使用database時,併發的進程並非同時取多條數據,而是取同一條數據等待其中一個進程update數據狀態和執行時間,隊列取得數據成功後第一個操做就是更新,因此第二個進程不會取到第一進程的一樣數據,除非是隊列過時。

在DatabaseQueue.php的pop方法中,取得隊列數據後,「$this->database->commit(); 」前 sleep(10),會很明顯的看到第二隊列沒有獲取其餘隊列數據,說明「for update」只是update級排他鎖,不會排斥select。

Laravel使用database隊列有時候會有阻塞現象,不知道是否是這個緣由形成的。

若是執行時間過長,超過‘expire’參數設置時間,第二隊列會取得第一個隊列數據,判斷超時,這時候就會根據設置的最大執行次數tries來判斷是插入新隊列數據繼續嘗試執行,仍是插入到錯誤隊列「failed_jobs」表判斷隊列執行失敗。

以上就是Laravel使用mysql執行隊列的邏輯,以前提到的兩臺服務器部署Laravel框架執行artisan腳本,一個jobs表隊列Failed的問題就是服務器時間不一致的緣由,後一臺服務器執行時候將前一隊列數據判斷爲超時而插入到「failed_jobs」一條新數據,已經達到最大失敗次數的狀況,不然還會插入新的數據繼續嘗試。


因此queue:listen的執行時間參數 --timeout=60,必定要設置小於隊列任務過時時間expire參數!

還有,Laravel5.2的queue:work並無--timeout=60這個參數。。。。。


最後是隊列執行完的處理邏輯。

若是隊列執行成功會刪除jobs的數據,這沒什麼問題。若是失敗,包括超時、異常等,會根據設置的最大失敗次數判斷是否插入一條新數據,或者插入一條Failed數據到「failed_jobs」表。

出現錯誤時,handleJobException的異常處理調用DatabaseQueue.php的release方法,$job->release($delay),最終是pushToDatabase實現。

插入新數據時候,attempts是失敗次數,reserved爲0,available_at爲當前時間戳加上延時時間參數,這樣整個隊列處理就造成了完整的數據邏輯操做。


Laravel5.4對隊列功能進行了很大的修改,手冊中的提示

任務過時和超時

任務執行時間

在配置文件 config/queue.php 中,每一個鏈接都定義了 retry_after 項。該配置項的目的是定義任務在執行之後多少秒後釋放回隊列。若是retry_after 設定的值爲 90, 任務在運行 90 秒後還未完成,那麼將被釋放回隊列而不是刪除掉。毫無疑問,你須要把 retry_after 的值設定爲任務執行時間的最大可能值。

Laravel5.4去掉了queue的listen命令,work也增長了超時參數。Laravel5.5出來的時候應該升級上去。


附錄:Laravel5.2測試的腳本,以前網上搜出來的都比較早,仍是把job寫成命令的方式,其實5.2之後job使用很是簡單。

jobs下定義job任務,handle能夠增長一些測試方案,好比我這種拋出異常,直接Failed的

class MyJob extends Job implements ShouldQueue
{
    use InteractsWithQueue, SerializesModels;
    private $key;
    private $value;
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($key, $value)
    {
        $this->key = $key;
        $this->value = $value;
    }
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        for($i=0;$i<20;$i++){
            echo "$i\n";
            sleep(1);
        }
        echo "sss\t".$this->key."\t".date("Y-m-d H:i:s")."\n";
        throw new \Exception("測試\n");
//         Redis::hset('queue.test', $this->key, $this->value);
    }
    public function failed()
    {
        dump('failed');
    }
}

控制器訪問設置任務隊列,key和value以前用了測試redis插入的,能夠按本身的測試方案設置job參數。

for($i = 0; $i < 5; $i ++) {
    echo "$i";
    $job = (new MyJob($i, $i))->delay(20);
    $this->dispatch($job);
}

個人例子設置了5個隊列,開啓多個shell併發執行artisan測試吧。

原本想將redis隊列代碼讀完,一塊兒發出來的,最近事情太多,redis代碼也沒怎麼看。

redis驅動能夠參考 http://www.cnblogs.com/z12987... 這篇文章對Laravel隊列redis驅動邏輯介紹的很詳細了,redis驅動使用的list和zset結構儲存隊列,執行過程會移除轉存隊列,沒有數據庫的「for update」 操做,因此應該不是存在隊列阻塞的狀況。

BUT隊列任務過時時間設置和數據庫驅動是同樣的,因此一樣

queue:listen的執行時間參數 --timeout=60,必定要設置小於隊列任務過時時間expire參數!

終於寫完了。。。

相關文章
相關標籤/搜索