剖析Laravel隊列系統--Worker

譯文GitHub https://github.com/yuansir/diving-laravel-zhphp

原文連接https://divinglaravel.com/queue-system/workerslaravel

如今,咱們知道了Laravel如何將做業推到不一樣的隊列中,讓咱們來深刻了解workers如何運做你的做業。 首先,我將workers定義爲一個在後臺運行的簡單PHP進程,目的是從存儲空間中提取做業並針對多個配置選項運行它們。git

php artisan queue:work

運行此命令將指示Laravel建立應用程序的一個實例並開始執行做業,這個實例將一直存活着,啓動Laravel應用程序的操做只在運行命令時發生一次,同一個實例將被用於執行你的做業,這意味着:github

  • 避免在每一個做業上啓動整個應用程序來節省服務器資源。
  • 在應用程序中所作的任何代碼更改後必須手動重啓worker。

你也能夠這樣運行:redis

php artisan queue:work --once

這將啓動應用程序的一個實例,處理單個做業,而後幹掉腳本。數據庫

php artisan queue:listen

queue:listen 命令至關於無限循環地運行 queue:work --once 命令,這將致使如下問題:緩存

  • 每一個循環都會啓動一個應用程序實例。
  • 分配的worker將選擇一個工做並執行。
  • worker進程將被幹掉。

使用 queue:listen 確保爲每一個做業建立一個新的應用程序實例,這意味着代碼更改之後沒必要手動重啓worker,同時也意味着將消耗更多的服務器資源。服務器

queue:work 命令

咱們來看看 Queue\Console\WorkCommand 類的 handle() 方法,這是當你運行 php artisan queue:work 時會執行的方法:app

public function handle()
{
    if ($this->downForMaintenance() && $this->option('once')) {
        return $this->worker->sleep($this->option('sleep'));
    }

    $this->listenForEvents();

    $connection = $this->argument('connection')
                    ?: $this->laravel['config']['queue.default'];

    $queue = $this->getQueue($connection);

    $this->runWorker(
        $connection, $queue
    );
}

首先,咱們檢查應用程序是否處於維護模式,並使用 --once 選項,在這種狀況下,咱們但願腳本正常運行,所以咱們不執行任何做業,咱們只須要在徹底殺死腳本前讓worker在一段時間內休眠。async

Queue\Workersleep() 方法看起來像這樣:

public function sleep($seconds)
{
    sleep($seconds);
}

爲何咱們不能在 handle() 方法中返回null來終止腳本?

如前所述, queue:listen 命令在循環中運行 WorkCommand

while (true) {
     // This process simply calls 'php artisan queue:work --once'
    $this->runProcess($process, $options->memory);
}

若是應用程序處於維護模式,而且 WorkCommand 當即終止,這將致使循環結束,下一個在很短的時間內啓動,最好在這種狀況下致使一些延遲,而不是經過建立咱們不會真正使用的大量應用程序實例。

監聽事件

handle() 方法裏面咱們調用 listenForEvents() 方法:

protected function listenForEvents()
{
    $this->laravel['events']->listen(JobProcessing::class, function ($event) {
        $this->writeOutput($event->job, 'starting');
    });

    $this->laravel['events']->listen(JobProcessed::class, function ($event) {
        $this->writeOutput($event->job, 'success');
    });

    $this->laravel['events']->listen(JobFailed::class, function ($event) {
        $this->writeOutput($event->job, 'failed');

        $this->logFailedJob($event);
    });
}

在這個方法中咱們會監聽幾個事件,這樣咱們能夠在每次做業處理中,處理完或處理失敗時向用戶打印一些信息。

記錄失敗做業

一旦做業失敗 logFailedJob() 方法會被調用

$this->laravel['queue.failer']->log(
    $event->connectionName, $event->job->getQueue(),
    $event->job->getRawBody(), $event->exception
);

queue.failer 容器別名在 Queue\QueueServiceProvider::registerFailedJobServices() 中註冊:

protected function registerFailedJobServices()
{
    $this->app->singleton('queue.failer', function () {
        $config = $this->app['config']['queue.failed'];

        return isset($config['table'])
                    ? $this->databaseFailedJobProvider($config)
                    : new NullFailedJobProvider;
    });
}

/**
 * Create a new database failed job provider.
 *
 * @param  array  $config
 * @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider
 */
protected function databaseFailedJobProvider($config)
{
    return new DatabaseFailedJobProvider(
        $this->app['db'], $config['database'], $config['table']
    );
}

若是配置了 queue.failed ,則將使用數據庫隊列失敗,並將有關失敗做業的信息簡單地存儲在數據庫表中的:

$this->getTable()->insertGetId(compact(
    'connection', 'queue', 'payload', 'exception', 'failed_at'
));

運行worker

要運行worker,咱們須要收集兩條信息:

  • worker的鏈接信息從做業中提取
  • worker找到做業的隊列

若是沒有使用 queue.default 配置定義的默認鏈接。您能夠爲 queue:work 命令提供 --connection=default 選項。

隊列也是同樣,您能夠提供一個 --queue=emails 選項,或選擇鏈接配置中的 queue 選項。一旦這一切完成, WorkCommand::handle() 方法會運行 runWorker()

protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->laravel['cache']->driver());

    return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}

在worker類屬性在命令構造後設置:

public function __construct(Worker $worker)
{
    parent::__construct();

    $this->worker = $worker;
}

容器解析 Queue\Worker 實例,在runWorker()中咱們設置了worker將使用的緩存驅動,咱們也根據--once 命令來決定咱們調用什麼方法。

若是使用 --once 選項,咱們只需調用 runNextJob 來運行下一個可用的做業,而後腳本就會終止。 不然,咱們將調用 daemon 方法來始終保持進程處理做業。

在開始工做時,咱們使用 gatherWorkerOptions() 方法收集用戶給出的命令選項,咱們稍後會提供這些選項,這個工具是 runNextJobdaemon 方法。

protected function gatherWorkerOptions()
{
    return new WorkerOptions(
        $this->option('delay'), $this->option('memory'),
        $this->option('timeout'), $this->option('sleep'),
        $this->option('tries'), $this->option('force')
    );
}

守護進程

讓我看看 Worker::daemon() 方法,這個方法的第一行調用了 Worker::daemon() 方法

protected function listenForSignals()
{
    if ($this->supportsAsyncSignals()) {
        pcntl_async_signals(true);

        pcntl_signal(SIGTERM, function () {
            $this->shouldQuit = true;
        });

        pcntl_signal(SIGUSR2, function () {
            $this->paused = true;
        });

        pcntl_signal(SIGCONT, function () {
            $this->paused = false;
        });
    }
}

這種方法使用PHP7.1的信號處理, supportsAsyncSignals() 方法檢查咱們是否在PHP7.1上,並加載 pcntl 擴展名。

以後pcntl_async_signals() 被調用來啓用信號處理,而後咱們爲多個信號註冊處理程序:

  • 當腳本被指示關閉時,會引起SIGTERM
  • SIGUSR2是用戶定義的信號,Laravel用來表示腳本應該暫停。
  • 當暫停的腳本繼續進行時,會引起SIGCONT

這些信號從Process Monitor(如 Supervisor )發送並與咱們的腳本進行通訊。

Worker::daemon() 方法中的第二行讀取最後一個隊列從新啓動的時間戳,當咱們調用queue:restart 時該值存儲在緩存中,稍後咱們將檢查是否和上次從新啓動的時間戳不符合,來指示worker在以後屢次重啓。

最後,該方法啓動一個循環,在這個循環中,咱們將完成其他獲取做業的worker,運行它們,並對worker進程執行多個操做。

while (true) {
    if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
        $this->pauseWorker($options, $lastRestart);

        continue;
    }

    $job = $this->getNextJob(
        $this->manager->connection($connectionName), $queue
    );

    $this->registerTimeoutHandler($job, $options);

    if ($job) {
        $this->runJob($job, $connectionName, $options);
    } else {
        $this->sleep($options->sleep);
    }

    $this->stopIfNecessary($options, $lastRestart);
}

肯定worker是否應該處理做業

調用 daemonShouldRun() 檢查如下狀況:

  • 應用程序不處於維護模式
  • Worker沒有暫停
  • 沒有事件監聽器阻止循環繼續

若是應用程序在維護模式下,worker使用--force選項仍然能夠處理做業:

php artisan queue:work --force

肯定worker是否應該繼續的條件之一是:

$this->events->until(new Events\Looping($connectionName, $queue)) === false)

這行觸發 Queue\Event\Looping 事件,並檢查是否有任何監聽器在 handle() 方法中返回false,這種狀況下你能夠強制您的workers暫時中止處理做業。

若是worker應該暫停,則調用 pauseWorker() 方法:

protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
    $this->sleep($options->sleep > 0 ? $options->sleep : 1);

    $this->stopIfNecessary($options, $lastRestart);
}

sleep 方法並傳遞給控制檯命令的 --sleep 選項,這個方法調用

public function sleep($seconds)
{
    sleep($seconds);
}

腳本休眠了一段時間後,咱們檢查worker是否應該在這種狀況下退出並殺死腳本,稍後咱們看一下stopIfNecessary 方法,以防腳本不能被殺死,咱們只需調用 continue; 開始一個新的循環:

if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
    $this->pauseWorker($options, $lastRestart);

    continue;
}

Retrieving 要運行的做業

$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue
);

getNextJob() 方法接受一個隊列鏈接的實例,咱們從隊列中獲取做業

protected function getNextJob($connection, $queue)
{
    try {
        foreach (explode(',', $queue) as $queue) {
            if (! is_null($job = $connection->pop($queue))) {
                return $job;
            }
        }
    } catch (Exception $e) {
        $this->exceptions->report($e);

        $this->stopWorkerIfLostConnection($e);
    }
}

咱們簡單地循環給定的隊列,使用選擇的隊列鏈接從存儲空間(數據庫,redis,sqs,...)獲取做業並返回該做業。

要從存儲中retrieve做業,咱們查詢知足如下條件的最舊做業:

  • 推送到 queue ,咱們試圖從中找到做業
  • 沒有被其餘worker reserved
  • 能夠在給定的時間內運行,有些做業在未來被推遲運行
  • 咱們也取到了好久以來被凍結的做業並重試

一旦咱們找到符合這一標準的做業,咱們將這個做業標記爲reserved,以便其餘workers獲取到,咱們還會增長做業監控次數。

監控做業超時

下一個做業被retrieved以後,咱們調用 registerTimeoutHandler() 方法:

protected function registerTimeoutHandler($job, WorkerOptions $options)
{
    if ($this->supportsAsyncSignals()) {
        pcntl_signal(SIGALRM, function () {
            $this->kill(1);
        });the

        $timeout = $this->timeoutForJob($job, $options);

        pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0);
    }
}

再次,若是 pcntl 擴展被加載,咱們將註冊一個信號處理程序幹掉worker進程若是該做業超時的話,在配置了超時以後咱們使用 pcntl_alarm() 來發送一個 SIGALRM 信號。

若是做業所花費的時間超過了超時值,處理程序將會終止該腳本,若是不是該做業將經過,而且下一個循環將設置一個新的報警覆蓋第一個報警,由於進程中可能存在單個報警。

做業只在PHP7.1以上起效,在window上也無效 ¯_(ツ)_/¯

處理做業

runJob() 方法調用 process():

public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        $this->raiseBeforeJobEvent($connectionName, $job);

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        $job->fire();

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    }
}

raiseBeforeJobEvent() 觸發 Queue\Events\JobProcessing 事件, raiseAfterJobEvent() 觸發 Queue\Events\JobProcessed 事件。 markJobAsFailedIfAlreadyExceedsMaxAttempts() 檢查進程是否達到最大嘗試次數,並將該做業標記爲失敗:

protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

    if ($maxTries === 0 || $job->attempts() <= $maxTries) {
        return;
    }

    $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
        'A queued job has been attempted too many times. The job may have previously timed out.'
    ));

    throw $e;
}

不然咱們在做業對象上調用 fire() 方法來運行做業。

從哪裏獲取做業對象

getNextJob() 方法返回一個 Contracts\Queue\Job 的實例,這取決於咱們使用相應的Job實例的隊列驅動程序,例如若是數據庫隊列驅動則選擇 Queue\Jobs\DatabaseJob

循環結束

在循環結束時,咱們調用 stopIfNecessary() 來檢查在下一個循環開始以前是否應該中止進程:

protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
    if ($this->shouldQuit) {
        $this->kill();
    }

    if ($this->memoryExceeded($options->memory)) {
        $this->stop(12);
    } elseif ($this->queueShouldRestart($lastRestart)) {
        $this->stop();
    }
}

shouldQuit 屬性在兩種狀況下設置,首先listenForSignals() 內部的做爲 SIGTERM 信號處理程序,其次在 stopWorkerIfLostConnection()

protected function stopWorkerIfLostConnection($e)
{
    if ($this->causedByLostConnection($e)) {
        $this->shouldQuit = true;
    }
}

在retrieving和處理做業時,會在幾個try ... catch語句中調用此方法,以確保worker應該處於被幹掉的狀態,以便咱們的Process Control可能會啓動一個新的數據庫鏈接。

causedByLostConnection() 方法能夠在 Database\DetectsLostConnections trait中找到。
memoryExceeded() 檢查內存使用狀況是否超過當前設置的內存限制,您可使用 --memory 選項設置限制。

轉載請註明: 轉載自Ryan是菜鳥 | LNMP技術棧筆記

若是以爲本篇文章對您十分有益,何不 打賞一下

謝謝打賞

本文連接地址: 剖析Laravel隊列系統--Worker

相關文章
相關標籤/搜索