譯文GitHub https://github.com/yuansir/diving-laravel-zhphp
原文連接https://divinglaravel.com/queue-system/workerslaravel
如今,咱們知道了Laravel如何將做業推到不一樣的隊列中,讓咱們來深刻了解workers如何運做你的做業。 首先,我將workers定義爲一個在後臺運行的簡單PHP進程,目的是從存儲空間中提取做業並針對多個配置選項運行它們。git
php artisan queue:work
運行此命令將指示Laravel建立應用程序的一個實例並開始執行做業,這個實例將一直存活着,啓動Laravel應用程序的操做只在運行命令時發生一次,同一個實例將被用於執行你的做業,這意味着:github
你也能夠這樣運行:redis
php artisan queue:work --once
這將啓動應用程序的一個實例,處理單個做業,而後幹掉腳本。數據庫
php artisan queue:listen
queue:listen
命令至關於無限循環地運行 queue:work --once
命令,這將致使如下問題:緩存
使用 queue:listen
確保爲每一個做業建立一個新的應用程序實例,這意味着代碼更改之後沒必要手動重啓worker,同時也意味着將消耗更多的服務器資源。服務器
咱們來看看 Queue\Console\WorkCommand
類的 handle()
方法,這是當你運行 php artisan queue:work
時會執行的方法:app
public function handle() { if ($this->downForMaintenance() && $this->option('once')) { return $this->worker->sleep($this->option('sleep')); } $this->listenForEvents(); $connection = $this->argument('connection') ?: $this->laravel['config']['queue.default']; $queue = $this->getQueue($connection); $this->runWorker( $connection, $queue ); }
首先,咱們檢查應用程序是否處於維護模式,並使用 --once
選項,在這種狀況下,咱們但願腳本正常運行,所以咱們不執行任何做業,咱們只須要在徹底殺死腳本前讓worker在一段時間內休眠。async
Queue\Worker
的 sleep()
方法看起來像這樣:
public function sleep($seconds) { sleep($seconds); }
如前所述, queue:listen
命令在循環中運行 WorkCommand
:
while (true) { // This process simply calls 'php artisan queue:work --once' $this->runProcess($process, $options->memory); }
若是應用程序處於維護模式,而且 WorkCommand
當即終止,這將致使循環結束,下一個在很短的時間內啓動,最好在這種狀況下致使一些延遲,而不是經過建立咱們不會真正使用的大量應用程序實例。
在 handle()
方法裏面咱們調用 listenForEvents()
方法:
protected function listenForEvents() { $this->laravel['events']->listen(JobProcessing::class, function ($event) { $this->writeOutput($event->job, 'starting'); }); $this->laravel['events']->listen(JobProcessed::class, function ($event) { $this->writeOutput($event->job, 'success'); }); $this->laravel['events']->listen(JobFailed::class, function ($event) { $this->writeOutput($event->job, 'failed'); $this->logFailedJob($event); }); }
在這個方法中咱們會監聽幾個事件,這樣咱們能夠在每次做業處理中,處理完或處理失敗時向用戶打印一些信息。
一旦做業失敗 logFailedJob()
方法會被調用
$this->laravel['queue.failer']->log( $event->connectionName, $event->job->getQueue(), $event->job->getRawBody(), $event->exception );
queue.failer
容器別名在 Queue\QueueServiceProvider::registerFailedJobServices()
中註冊:
protected function registerFailedJobServices() { $this->app->singleton('queue.failer', function () { $config = $this->app['config']['queue.failed']; return isset($config['table']) ? $this->databaseFailedJobProvider($config) : new NullFailedJobProvider; }); } /** * Create a new database failed job provider. * * @param array $config * @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider */ protected function databaseFailedJobProvider($config) { return new DatabaseFailedJobProvider( $this->app['db'], $config['database'], $config['table'] ); }
若是配置了 queue.failed
,則將使用數據庫隊列失敗,並將有關失敗做業的信息簡單地存儲在數據庫表中的:
$this->getTable()->insertGetId(compact( 'connection', 'queue', 'payload', 'exception', 'failed_at' ));
要運行worker,咱們須要收集兩條信息:
若是沒有使用 queue.default
配置定義的默認鏈接。您能夠爲 queue:work
命令提供 --connection=default
選項。
隊列也是同樣,您能夠提供一個 --queue=emails
選項,或選擇鏈接配置中的 queue
選項。一旦這一切完成, WorkCommand::handle()
方法會運行 runWorker()
:
protected function runWorker($connection, $queue) { $this->worker->setCache($this->laravel['cache']->driver()); return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}( $connection, $queue, $this->gatherWorkerOptions() ); }
在worker類屬性在命令構造後設置:
public function __construct(Worker $worker) { parent::__construct(); $this->worker = $worker; }
容器解析 Queue\Worker
實例,在runWorker()
中咱們設置了worker將使用的緩存驅動,咱們也根據--once
命令來決定咱們調用什麼方法。
若是使用 --once
選項,咱們只需調用 runNextJob
來運行下一個可用的做業,而後腳本就會終止。 不然,咱們將調用 daemon
方法來始終保持進程處理做業。
在開始工做時,咱們使用 gatherWorkerOptions()
方法收集用戶給出的命令選項,咱們稍後會提供這些選項,這個工具是 runNextJob
或 daemon
方法。
protected function gatherWorkerOptions() { return new WorkerOptions( $this->option('delay'), $this->option('memory'), $this->option('timeout'), $this->option('sleep'), $this->option('tries'), $this->option('force') ); }
讓我看看 Worker::daemon()
方法,這個方法的第一行調用了 Worker::daemon()
方法
protected function listenForSignals() { if ($this->supportsAsyncSignals()) { pcntl_async_signals(true); pcntl_signal(SIGTERM, function () { $this->shouldQuit = true; }); pcntl_signal(SIGUSR2, function () { $this->paused = true; }); pcntl_signal(SIGCONT, function () { $this->paused = false; }); } }
這種方法使用PHP7.1的信號處理, supportsAsyncSignals()
方法檢查咱們是否在PHP7.1上,並加載 pcntl
擴展名。
以後pcntl_async_signals()
被調用來啓用信號處理,而後咱們爲多個信號註冊處理程序:
SIGTERM
。SIGUSR2
是用戶定義的信號,Laravel用來表示腳本應該暫停。SIGCONT
。這些信號從Process Monitor(如 Supervisor )發送並與咱們的腳本進行通訊。
Worker::daemon()
方法中的第二行讀取最後一個隊列從新啓動的時間戳,當咱們調用queue:restart
時該值存儲在緩存中,稍後咱們將檢查是否和上次從新啓動的時間戳不符合,來指示worker在以後屢次重啓。
最後,該方法啓動一個循環,在這個循環中,咱們將完成其他獲取做業的worker,運行它們,並對worker進程執行多個操做。
while (true) { if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $this->pauseWorker($options, $lastRestart); continue; } $job = $this->getNextJob( $this->manager->connection($connectionName), $queue ); $this->registerTimeoutHandler($job, $options); if ($job) { $this->runJob($job, $connectionName, $options); } else { $this->sleep($options->sleep); } $this->stopIfNecessary($options, $lastRestart); }
調用 daemonShouldRun()
檢查如下狀況:
若是應用程序在維護模式下,worker使用--force
選項仍然能夠處理做業:
php artisan queue:work --force
肯定worker是否應該繼續的條件之一是:
$this->events->until(new Events\Looping($connectionName, $queue)) === false)
這行觸發 Queue\Event\Looping
事件,並檢查是否有任何監聽器在 handle()
方法中返回false,這種狀況下你能夠強制您的workers暫時中止處理做業。
若是worker應該暫停,則調用 pauseWorker()
方法:
protected function pauseWorker(WorkerOptions $options, $lastRestart) { $this->sleep($options->sleep > 0 ? $options->sleep : 1); $this->stopIfNecessary($options, $lastRestart); }
sleep
方法並傳遞給控制檯命令的 --sleep
選項,這個方法調用
public function sleep($seconds) { sleep($seconds); }
腳本休眠了一段時間後,咱們檢查worker是否應該在這種狀況下退出並殺死腳本,稍後咱們看一下stopIfNecessary
方法,以防腳本不能被殺死,咱們只需調用 continue;
開始一個新的循環:
if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $this->pauseWorker($options, $lastRestart); continue; }
$job = $this->getNextJob( $this->manager->connection($connectionName), $queue );
getNextJob()
方法接受一個隊列鏈接的實例,咱們從隊列中獲取做業
protected function getNextJob($connection, $queue) { try { foreach (explode(',', $queue) as $queue) { if (! is_null($job = $connection->pop($queue))) { return $job; } } } catch (Exception $e) { $this->exceptions->report($e); $this->stopWorkerIfLostConnection($e); } }
咱們簡單地循環給定的隊列,使用選擇的隊列鏈接從存儲空間(數據庫,redis,sqs,...)獲取做業並返回該做業。
要從存儲中retrieve做業,咱們查詢知足如下條件的最舊做業:
queue
,咱們試圖從中找到做業一旦咱們找到符合這一標準的做業,咱們將這個做業標記爲reserved,以便其餘workers獲取到,咱們還會增長做業監控次數。
下一個做業被retrieved以後,咱們調用 registerTimeoutHandler()
方法:
protected function registerTimeoutHandler($job, WorkerOptions $options) { if ($this->supportsAsyncSignals()) { pcntl_signal(SIGALRM, function () { $this->kill(1); });the $timeout = $this->timeoutForJob($job, $options); pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0); } }
再次,若是 pcntl
擴展被加載,咱們將註冊一個信號處理程序幹掉worker進程若是該做業超時的話,在配置了超時以後咱們使用 pcntl_alarm()
來發送一個 SIGALRM
信號。
若是做業所花費的時間超過了超時值,處理程序將會終止該腳本,若是不是該做業將經過,而且下一個循環將設置一個新的報警覆蓋第一個報警,由於進程中可能存在單個報警。
做業只在PHP7.1以上起效,在window上也無效 ¯_(ツ)_/¯
runJob()
方法調用 process()
:
public function process($connectionName, $job, WorkerOptions $options) { try { $this->raiseBeforeJobEvent($connectionName, $job); $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( $connectionName, $job, (int) $options->maxTries ); $job->fire(); $this->raiseAfterJobEvent($connectionName, $job); } catch (Exception $e) { $this->handleJobException($connectionName, $job, $options, $e); } }
raiseBeforeJobEvent()
觸發 Queue\Events\JobProcessing
事件, raiseAfterJobEvent()
觸發 Queue\Events\JobProcessed
事件。 markJobAsFailedIfAlreadyExceedsMaxAttempts()
檢查進程是否達到最大嘗試次數,並將該做業標記爲失敗:
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) { $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; if ($maxTries === 0 || $job->attempts() <= $maxTries) { return; } $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( 'A queued job has been attempted too many times. The job may have previously timed out.' )); throw $e; }
不然咱們在做業對象上調用 fire()
方法來運行做業。
getNextJob()
方法返回一個 Contracts\Queue\Job
的實例,這取決於咱們使用相應的Job實例的隊列驅動程序,例如若是數據庫隊列驅動則選擇 Queue\Jobs\DatabaseJob
。
在循環結束時,咱們調用 stopIfNecessary()
來檢查在下一個循環開始以前是否應該中止進程:
protected function stopIfNecessary(WorkerOptions $options, $lastRestart) { if ($this->shouldQuit) { $this->kill(); } if ($this->memoryExceeded($options->memory)) { $this->stop(12); } elseif ($this->queueShouldRestart($lastRestart)) { $this->stop(); } }
shouldQuit
屬性在兩種狀況下設置,首先listenForSignals()
內部的做爲 SIGTERM
信號處理程序,其次在 stopWorkerIfLostConnection()
中
protected function stopWorkerIfLostConnection($e) { if ($this->causedByLostConnection($e)) { $this->shouldQuit = true; } }
在retrieving和處理做業時,會在幾個try ... catch語句中調用此方法,以確保worker應該處於被幹掉的狀態,以便咱們的Process Control可能會啓動一個新的數據庫鏈接。
causedByLostConnection()
方法能夠在 Database\DetectsLostConnections
trait中找到。memoryExceeded()
檢查內存使用狀況是否超過當前設置的內存限制,您可使用 --memory
選項設置限制。
轉載請註明: 轉載自Ryan是菜鳥 | LNMP技術棧筆記
若是以爲本篇文章對您十分有益,何不 打賞一下
本文連接地址: 剖析Laravel隊列系統--Worker