Lumen框架「異步隊列任務」源碼剖析

1.架構介紹

php在異步編程上的短板是周所周知的,爲了保持語言的簡單、處理邏輯的清晰,php使用了進程阻塞模型。儘管異步難以實現,需求中仍是常常會用到異步任務處理機制,好比比較耗時的郵件發送,訂單生成;還有一些須要延時處理的任務;爲了加快響應速度,將主幹邏輯與其餘不相干邏輯解耦等等。Laravel/Lumen框架已經實現了異步機制,咱們結合源碼學習一下Lumen是怎樣實現異步任務處理的。筆者簡單總結了一下lumen實現異步隊列任務的架構圖:php

Lumen實現異步任務是在兩個進程中進行的,一個是產生任務的生產者,另一個是單獨處理任務的消費者。一般,生產者通常是咱們處理業務邏輯所在的fast-cgi進程,它將任務封裝成payload,push到隊列,消費者則是另外單獨編寫的守護進程,不停的取出隊列中的任務,解析payload,消費執行。隊列是Lumen實現異步處理不可缺乏的中間媒介,Lumen自己就支持Redis/Sqs/Database/Beanstalkd多種隊列中間件,其中Redis使用最普遍,咱們就以Redis爲例,學習Lumen使用Redis的zset、list數據結構實現失敗任務重試和延時任務處理。不論是生產者仍是消費者,都使用了Lumen框架容器所提供的衆多服務:任務的分發處理(BusServiceProvider)、事件的訂閱/發佈(EventServiceProvider)、任務隊列化的實現(QueueServiceProvider)等等。mysql

2.庖丁解牛,源碼解析

咱們將結合架構圖從Lumen框架的隊列服務註冊與啓動、Bus服務註冊與啓動、 生產者分發任務到隊列、守護進程消費任務四個階段來解讀源碼,幫助讀者清晰瞭解Lumen框架實現異步隊列任務每一個階段的工做原理。laravel

2.1 隊列服務註冊與啓動

Lumen框架服務容器啓動之後,經過服務提供者向容器中註冊服務(服務提供者繼承ServiceProvider抽象類,須要自行實現register方法)。隊列的服務提供者是QueueServiceProvider類(vendor/illuminate/queue/QueueServiceProvider.php),它註冊了隊列用到的不少服務:git

class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
{
    /**
     * Register the service provider.
     *
     * @return void
     */
    public function register()
    {
        $this->registerManager();
        $this->registerConnection();
        $this->registerWorker();
        $this->registerListener();
        $this->registerFailedJobServices();
        $this->registerOpisSecurityKey();
    }
    ......
}
複製代碼

其中registerManager註冊了隊列管理的門面(Facade),QueueManager類底層使用了隊列的鏈接,其中能夠註冊['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs']任意的隊列中間件鏈接類,咱們以Redis爲例子:github

protected function registerManager()
    {
        $this->app->singleton('queue', function ($app) {
            return tap(new QueueManager($app), function ($manager) {
                $this->registerConnectors($manager);
            });
        });
    }
    ......
    public function registerConnectors($manager)
    {
        foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
            $this->{"register{$connector}Connector"}($manager);
        }
    }
    ......
    protected function registerRedisConnector($manager)
    {
        $manager->addConnector('redis', function () {
            return new RedisConnector($this->app['redis']);
        });
    }
複製代碼

QueueManager 是隊列服務的總門面,提供一切與隊列相關的操做接口(可使用Queue:: + 方法名來調用隊列的方法)。QueueManager 中有一個成員變量 $connectors,存儲着各類驅動的 connector,例如 RedisConnector、SqsConnector、DatabaseConnector、BeanstalkdConnector。 registerConnection 底層隊列鏈接服務:redis

protected function registerConnection()
    {
        $this->app->singleton('queue.connection', function ($app) {
            return $app['queue']->connection();
        });
    }

複製代碼

隊列鏈接的時候會讀取默認的配置信息,咱們查看QueueManager($app['queue']就是從容器中取出服務)類(/vendor/illuminate/queue/QueueManager.php)中的相關代碼:sql

public function connection($name = null)
    {
        $name = $name ?: $this->getDefaultDriver();
        
        if (! isset($this->connections[$name])) {
            $this->connections[$name] = $this->resolve($name);

            $this->connections[$name]->setContainer($this->app);
        }

        return $this->connections[$name];
    }
    ...
    protected function resolve($name)
    {
        $config = $this->getConfig($name);

        return $this->getConnector($config['driver'])
                        ->connect($config)
                        ->setConnectionName($name);
    }
    ...
    protected function getConnector($driver)
    {
        if (! isset($this->connectors[$driver])) {
            throw new InvalidArgumentException("No connector for [$driver]");
        }

        return call_user_func($this->connectors[$driver]);
    }
    ...
    protected function getConfig($name)
    {
        if (! is_null($name) && $name !== 'null') {
            return $this->app['config']["queue.connections.{$name}"];
        }

        return ['driver' => 'null'];
    }
    ...
    public function getDefaultDriver()
    {
        return $this->app['config']['queue.default'];
    }
複製代碼

由此咱們能夠,隊列首先經過getDefaultDriver方法得到驅動的鏈接並將其保存到驅動鏈接池數組中,生產者使用隊列的使用能夠根據驅動名稱選擇不一樣的鏈接服務,例如使用sqs鏈接:編程

ProcessPodcast::dispatch($podcast)->onConnection('sqs');
複製代碼

咱們使用的隊列驅動是Redis,config/queue.php要作相關配置:json

<?php

return [
    'default' => env('QUEUE_DRIVER', 'redis'),
    'connections' => [
        ......
        'redis' => [
            'driver' => 'redis',
            'connection' => env('QUEUE_REDIS_CONNECTION', 'queue'),
            'queue' => 'default',
            'retry_after' => 60,
        ]
    ],

    //失敗的隊列任務先不配置到隊列中
    /*'failed' => [
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => env('QUEUE_FAILED_TABLE', 'failed_jobs'),
    ],*/
];
複製代碼

registerWorker註冊消費者服務,程序會返回Illuminate\Queue\Worker類,咱們在第四部分講解消費者的時候會詳細瞭解它。下邊的registerListener、registerFailedJobServices、registerOpisSecurityKey請你們自行閱讀,其中registerListener使用到了訂閱/發佈模式,使用的是Lumen框架的事件Event系統,又是一個比較大的板塊,還比較重要,和生產者/消費者相似,能夠爲隊列註冊不一樣的監聽者,當隊列執行到這個狀態時,就會通知監聽者,例如能夠在AppServiceProvider(/app/Providers/AppServiceProvider.php)啓動時註冊隊列監聽者:api

class AppServiceProvider extends ServiceProvider
{
    public function boot()
    {
        //任務運行前
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        //任務運行後
        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        //任務循環前
        Queue::looping(function () {

        });

        //任務失敗後
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
複製代碼

這樣就能夠在任務執行的各個階段對任務進行監控了;項目中註冊監聽者頗有用,舉一個例子,咱們想要記錄項目api業務邏輯中對DB的全部sql語句並蒐集慢查詢相關的信息,一樣能夠在AppServiceProvider中使用:

\DB::listen(function ($query) {
                $sql = str_replace("?", "'%s'", $query->sql);
                $sql = vsprintf($sql, $query->bindings) . " | {$query->time}";
                Log::channel('sql-daily')->info($sql);
                if ($query->time > 100) {
                    Log::warning('SLOOOOOW-SQL: ' . $sql);
                }
            });
複製代碼

registerFailedJobServices在異步任務處理中也是頗有必要的,咱們經常也會將重試以後失敗的任務保存到DB中,方便未來定位問題或再次重試。

2.2 Bus服務註冊與啓動

Bus服務在Lumen系統中就是任務分發總線,就像公共汽車把乘客載到不一樣的目的地,dispatch函數就是Bus服務。咱們來看一下BusServiceProvider(/vendor/illuminate/bus/BusServiceProvider.php)的註冊函數:

class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
    public function register()
    {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });

        $this->app->alias(
            Dispatcher::class, DispatcherContract::class
        );

        $this->app->alias(
            Dispatcher::class, QueueingDispatcherContract::class
        );
    }
複製代碼

由此可知Bus服務就是Dispatcher類實現的,咱們結合Dispatcher類看一下生產者如何將任務給push到隊列上的。

2.3 生產者分發任務到隊列

咱們在項目邏輯中每每是這樣分發任務到隊列的:

$job = (new ExampleJob($joblist));
 dispatch($job);
複製代碼

跟進dispatch是在helper.php中定義的,其中dispatch函數傳入的是一個任務實例(這很重要):

if (! function_exists('dispatch')) {
    function dispatch($job)
    {
        return new PendingDispatch($job);
    }
}
複製代碼

咱們繼續跟進PendingDispatch類實例:

class PendingDispatch
{
    protected $job;

    public function __construct($job)
    {
        $this->job = $job;
    }
    ...
    public function __destruct()
    {
        app(Dispatcher::class)->dispatch($this->job);
    }
複製代碼

在析構函數中咱們得知,程序從Lumen服務容器中解析了Dispatcher類,調用了它的dispatch處理任務。咱們接下來看Dispatcher類(/vendor/illuminate/bus/Dispatcher.php)是如何實現的:

class Dispatcher implements QueueingDispatcher
{
   ......
    public function __construct(Container $container, Closure $queueResolver = null)
    {
        $this->container = $container;
        $this->queueResolver = $queueResolver;
        $this->pipeline = new Pipeline($container);
    }

    public function dispatch($command)
    {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
            return $this->dispatchToQueue($command);
        }

        return $this->dispatchNow($command);
    }

    public function dispatchNow($command, $handler = null)
    {
        if ($handler || $handler = $this->getCommandHandler($command)) {
            $callback = function ($command) use ($handler) {
                return $handler->handle($command);
            };
        } else {
            $callback = function ($command) {
                return $this->container->call([$command, 'handle']);
            };
        }

        return $this->pipeline->send($command)->through($this->pipes)->then($callback);
    }
    ...
    protected function commandShouldBeQueued($command)
    {
        return $command instanceof ShouldQueue;
    }
複製代碼

這裏的$command就是上邊提到的job實例類,程序經過判斷job有沒有繼承實現ShouldQueue接口,若是沒有實現,則直接經過dispatchNow函數,經過Pipeline的send/Through/then來同步處理相關任務。咱們主要來看將任務推送到隊列的狀況:

public function dispatchToQueue($command)
    {
        $connection = $command->connection ?? null;

        $queue = call_user_func($this->queueResolver, $connection);

        if (! $queue instanceof Queue) {
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
        }

        if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        }

        return $this->pushCommandToQueue($queue, $command);
    }

    protected function pushCommandToQueue($queue, $command)
    {
        if (isset($command->queue, $command->delay)) {
            return $queue->laterOn($command->queue, $command->delay, $command);
        }

        if (isset($command->queue)) {
            return $queue->pushOn($command->queue, $command);
        }

        if (isset($command->delay)) {
            return $queue->later($command->delay, $command);
        }

        return $queue->push($command);
    }
複製代碼

dispatchToQueue方法會首先判斷 job實例有沒有自處理的queue方法,沒有則走pushCommandToQueue方法,pushCommandToQueue方法中有三個if條件,他們的順序不能顛倒,command->queue是查看程序是否設置了將任務推送到指定隊列,$command->delay是查看程序是否將任務設置爲超時任務;不一樣的設置選項會調用隊列驅動的不一樣方法,走的是不一樣的邏輯。咱們以Redis爲例,代碼中的queue就是RedisQueue(/vendor/illuminate/queue/RedisQueue.php),咱們來進一步查看它的處理邏輯:

class RedisQueue extends Queue implements QueueContract
{
    ......
    public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null)
    {
        $this->redis = $redis;
        $this->default = $default;
        $this->blockFor = $blockFor;
        $this->connection = $connection;
        $this->retryAfter = $retryAfter;
    }
    ......
    public function later($delay, $job, $data = '', $queue = null)
    {
        return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
    }
    ......
    protected function laterRaw($delay, $payload, $queue = null)
    {
        $this->getConnection()->zadd(
            $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
        );

        return json_decode($payload, true)['id'] ?? null;
    }
複製代碼

咱們這裏看到的是延時隊列的later方法,調用的是laterRaw方法,在傳入參數的時候調用createPayload方法將job給封裝成payload,這個過程很重要,由於消費者也是經過獲取解析payload實現任務消費的,咱們來看一下封裝payload的過程:

protected function createPayload($job, $queue, $data = '')
    {
        $payload = json_encode($this->createPayloadArray($job, $queue, $data));

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidPayloadException(
                'Unable to JSON encode payload. Error code: '.json_last_error()
            );
        }

        return $payload;
    }

    protected function createPayloadArray($job, $queue, $data = '')
    {
        return is_object($job)
                    ? $this->createObjectPayload($job, $queue)
                    : $this->createStringPayload($job, $queue, $data);
    }

    protected function createObjectPayload($job, $queue)
    {
        $payload = $this->withCreatePayloadHooks($queue, [
            'displayName' => $this->getDisplayName($job),
            'job' => 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries' => $job->tries ?? null,
            'delay' => $this->getJobRetryDelay($job),
            'timeout' => $job->timeout ?? null,
            'timeoutAt' => $this->getJobExpiration($job),
            'data' => [
                'commandName' => $job,
                'command' => $job,
            ],
        ]);

        return array_merge($payload, [
            'data' => [
                'commandName' => get_class($job),
                'command' => serialize(clone $job),
            ],
        ]);
    }
複製代碼

能夠看到,封裝的payload信息中包含有不少信息,其中重試次數的控制maxTries、超時的設置timeout都在payload數組中設置,另外payload中的data還將任務的名稱和序列化好的任務類serialize(clone $job)一同封裝了進去。

另外咱們知道laterRaw中將延時任務經過zadd默認添加到了queue:delayed的zset中去了,其中的score添加的是 this->availableAt(delay),咱們查看其實現:

protected function availableAt($delay = 0)
    {
        $delay = $this->parseDateInterval($delay);

        return $delay instanceof DateTimeInterface
                            ? $delay->getTimestamp()
                            : Carbon::now()->addRealSeconds($delay)->getTimestamp();
    }
複製代碼

發現score設置的正是任務執行時間的時間戳,設置可謂真是巧妙,消費者經過判斷queue:delayed中大於當前時間的任務進行執行就能夠實現延時任務的執行了;這種時間滑動窗口的設置在應用開發中很是常見。

咱們再來看非延時任務的執行就相對簡單了不少(/vendor/illuminate/queue/RedisQueue.php):

......
 public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
    }
......
public function pushRaw($payload, $queue = null, array $options = [])
    {
        $this->getConnection()->eval(
            LuaScripts::push(), 2, $this->getQueue($queue),
            $this->getQueue($queue).':notify', $payload
        );

        return json_decode($payload, true)['id'] ?? null;
    }
複製代碼

redis使用lua腳本,經過rpush將任務默認推送到了queue:default隊列

public static function push()
    {
        return <<<'LUA'
-- Push the job onto the queue...
redis.call('rpush', KEYS[1], ARGV[1])
-- Push a notification onto the "notify" queue...
redis.call('rpush', KEYS[2], 1)
LUA;
    }
複製代碼

使用lua腳本是爲了保證redis操做命令的原子性,尤爲在分佈式任務中,不少服務去爭搶任務的時候都須要使用lua腳本,在消費者中咱們還會看到lua腳本的使用,並且操做比這裏還要複雜不少。

2.4 守護進程消費任務

Lumen包含一個隊列處理器,當新任務被推到隊列中時它能處理這些任務。你能夠經過 queue:work 命令來運行處理器。生產環境中咱們經常使用supervisor來管理這些消費任務,咱們將他們稱爲守護進程消費者。咱們首先來看看消費者可使用怎樣的方式來啓動吧:

//處理給定鏈接的隊列
php artisan queue:work redis --queue=emails
//僅對隊列中的單一任務處理
php artisan queue:work --once
//若是一個任務失敗了,會被放入延時隊列中取,--delay 選項能夠設置失敗任務的延時時間
php artisan queue:work --delay=2
//若是想要限制一個任務的內存,可使用 --memory
php artisan queue:work --memory=128
//能夠指定 Lumen 隊列處理器最多執行多長時間後就應該被關閉掉
php artisan queue:work --timeout=60
//能夠指定 Lumen 隊列處理器失敗任務重試的次數
php artisan queue:work --tries=60
複製代碼

咱們使用cli程序啓動消費者的時候,命令行模式會調用 Illuminate\Queue\Console\WorkCommand,這個類在初始化的時候依賴注入了 Illuminate\Queue\Worker:

class WorkCommand extends Command
{
    protected $signature = 'queue:work {connection? : The name of the queue connection to work} {--queue= : The names of the queues to work} {--daemon : Run the worker in daemon mode (Deprecated)} {--once : Only process the next job on the queue} {--stop-when-empty : Stop when the queue is empty} {--delay=0 : The number of seconds to delay failed jobs} {--force : Force the worker to run even in maintenance mode} {--memory=128 : The memory limit in megabytes} {--sleep=3 : Number of seconds to sleep when no job is available} {--timeout=60 : The number of seconds a child process can run} {--tries=0 : Number of times to attempt a job before logging it failed}';

    protected $description = 'Start processing jobs on the queue as a daemon';

    protected $worker;

    public function __construct(Worker $worker)
    {
        parent::__construct();

        $this->worker = $worker;
    }

    public function handle()
    {
        if ($this->downForMaintenance() && $this->option('once')) {
            return $this->worker->sleep($this->option('sleep'));
        }

        $this->listenForEvents();

        $connection = $this->argument('connection')
                        ?: $this->laravel['config']['queue.default'];

        $queue = $this->getQueue($connection);

        $this->runWorker(
            $connection, $queue
        );
    }

    protected function runWorker($connection, $queue)
    {
        $this->worker->setCache($this->laravel['cache']->driver());

        return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
            $connection, $queue, $this->gatherWorkerOptions()
        );
    }

    protected function gatherWorkerOptions()
    {
        return new WorkerOptions(
            $this->option('delay'), $this->option('memory'),
            $this->option('timeout'), $this->option('sleep'),
            $this->option('tries'), $this->option('force'),
            $this->option('stop-when-empty')
        );
    }
    
    protected function listenForEvents()
    {
        $this->laravel['events']->listen(JobProcessing::class, function ($event) {
            $this->writeOutput($event->job, 'starting');
        });

        $this->laravel['events']->listen(JobProcessed::class, function ($event) {
            $this->writeOutput($event->job, 'success');
        });

        $this->laravel['events']->listen(JobFailed::class, function ($event) {
            $this->writeOutput($event->job, 'failed');

            $this->logFailedJob($event);
        });
    }

    protected function writeOutput(Job $job, $status)
    {
        switch ($status) {
            case 'starting':
                return $this->writeStatus($job, 'Processing', 'comment');
            case 'success':
                return $this->writeStatus($job, 'Processed', 'info');
            case 'failed':
                return $this->writeStatus($job, 'Failed', 'error');
        }
    }

    protected function writeStatus(Job $job, $status, $type)
    {
        $this->output->writeln(sprintf(
            "<{$type}>[%s][%s] %s</{$type}> %s",
            Carbon::now()->format('Y-m-d H:i:s'),
            $job->getJobId(),
            str_pad("{$status}:", 11), $job->resolveName()
        ));
    }

    protected function logFailedJob(JobFailed $event)
    {
        $this->laravel['queue.failer']->log(
            $event->connectionName, $event->job->getQueue(),
            $event->job->getRawBody(), $event->exception
        );
    }

    protected function getQueue($connection)
    {
        return $this->option('queue') ?: $this->laravel['config']->get(
            "queue.connections.{$connection}.queue", 'default'
        );
    }
    
    protected function downForMaintenance()
    {
        return $this->option('force') ? false : $this->laravel->isDownForMaintenance();
    }
}
複製代碼

任務啓動時會運行handle函數,執行任務以前,首先經過listenForEvents註冊監聽事件,監放任務的完成與失敗狀況。接下來啓動runWorker方法,該函數默認會調用 Illuminate\Queue\Worker 的 daemon 函數,只有在命令中強制 --once 參數的時候,纔會執行 runNestJob 函數。咱們主要看Worker類daemon函數,上邊提到的超時控制、失敗重試、內存限制都是在Worker中實現的:

public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        if ($this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }

        $lastRestart = $this->getTimestampOfLastQueueRestart();
        while (true) {
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $this->pauseWorker($options, $lastRestart);

                continue;
            }

            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );

            if ($this->supportsAsyncSignals()) {
                $this->registerTimeoutHandler($job, $options);
            }

            if ($job) {
                $this->runJob($job, $connectionName, $options);
            } else {
                $this->sleep($options->sleep);
            }

            $this->stopIfNecessary($options, $lastRestart, $job);
        }
    }
複製代碼

daemon函數首先經過supportsAsyncSignals判斷程序是否支持裝載信號,若是支持裝載信號:

...
 protected function supportsAsyncSignals()
    {
        return extension_loaded('pcntl');
    }
...
protected function listenForSignals()
    {
        pcntl_async_signals(true);

        pcntl_signal(SIGTERM, function () {
            $this->shouldQuit = true;
        });

        pcntl_signal(SIGUSR2, function () {
            $this->paused = true;
        });

        pcntl_signal(SIGCONT, function () {
            $this->paused = false;
        });
    }
...    
複製代碼

信號處理是進程間通訊的一種經常使用方式,這裏主要用於接收用戶在控制檯發送的命令和由 Process Monitor(如 Supervisor)發送並與咱們的腳本進行通訊的異步通知。假如咱們正在執行一個很是重要可是耗時又很是長的任務,這個時候守護進程又收到了程序退出的信號,怎樣使程序優雅的退出(執行完任務以後再退出),這裏向你們推薦一篇文章供你們探索:supervisor在PHP項目中的使用

在真正運行任務以前,程序從 cache 中取了一次最後一次重啓的時間,while(true)啓動一個長時間運行的進程,使用daemonShouldRun判斷當前腳本是應該處理任務,仍是應該暫停,仍是應該退出:

......
 protected function getTimestampOfLastQueueRestart()
    {
        if ($this->cache) {
            return $this->cache->get('illuminate:queue:restart');
        }
    }
......
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
    {
        return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
            $this->paused ||
            $this->events->until(new Events\Looping($connectionName, $queue)) === false);
    }
......    
複製代碼

如下幾種狀況,循環將不會處理任務:

  • 腳本處於 維護模式 而且沒有 --force 選項
  • 腳本被 supervisor 暫停
  • 腳本的 looping 事件監聽器返回 false looping 事件監聽器在每次循環的時候都會被啓動,若是返回 false,那麼當前的循環將會被暫停:pauseWorker:
protected function pauseWorker(WorkerOptions $options, $lastRestart)
    {
        $this->sleep($options->sleep > 0 ? $options->sleep : 1);

        $this->stopIfNecessary($options, $lastRestart);
    }
複製代碼

腳本在 sleep 一段時間以後,就要從新判斷當前腳本是否須要 stop:

......
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
    {
        if ($this->shouldQuit) {
            $this->stop();
        } elseif ($this->memoryExceeded($options->memory)) {
            $this->stop(12);
        } elseif ($this->queueShouldRestart($lastRestart)) {
            $this->stop();
        } elseif ($options->stopWhenEmpty && is_null($job)) {
            $this->stop();
        }
    }
......
 public function memoryExceeded($memoryLimit)
    {
        return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
    }
......
protected function queueShouldRestart($lastRestart)
    {
        return $this->getTimestampOfLastQueueRestart() != $lastRestart;
    }
......
複製代碼

如下狀況腳本將會被 stop:

  • 腳本被 supervisor 退出
  • 內存超限
  • 腳本被重啓過
......
public function kill($status = 0)
    {
        $this->events->dispatch(new Events\WorkerStopping($status));

        if (extension_loaded('posix')) {
            posix_kill(getmypid(), SIGKILL);
        }

        exit($status);
    }
......
 public function stop($status = 0)
    {
        $this->events->dispatch(new Events\WorkerStopping($status));

        exit($status);
    }
複製代碼

腳本被重啓,當前的進程須要退出而且從新加載。

接下來程序獲取下一個任務,命令行能夠用 , 鏈接多個隊列的名字,位於前面的隊列優先級更高:

protected function getNextJob($connection, $queue)
    {
        try {
            foreach (explode(',', $queue) as $queue) {
                if (! is_null($job = $connection->pop($queue))) {
                    return $job;
                }
            }
        } catch (Exception $e) {
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        }
    }
複製代碼

$connection 是具體的驅動,咱們這裏是 Illuminate\Queue\RedisQueue:

public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));

        if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return;
        }

        [$job, $reserved] = $nextJob;

        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }
複製代碼

在從隊列中取出任務以前,須要先將 delay 隊列和 reserved 隊列中已經到時間的任務放到主隊列中:

protected function migrate($queue)
    {
        $this->migrateExpiredJobs($queue.':delayed', $queue);

        if (! is_null($this->retryAfter)) {
            $this->migrateExpiredJobs($queue.':reserved', $queue);
        }
    }
public function migrateExpiredJobs($from, $to)
    {
        return $this->getConnection()->eval(
            LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime()
        );
    }
複製代碼

這裏一樣使用了lua腳本,而且這裏的lua腳本更加複雜

public static function migrateExpiredJobs()
    {
        return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
        -- Push a notification for every job that was migrated...
        for j = i, math.min(i+99, #val) do
            redis.call('rpush', KEYS[3], 1)
        end
    end
end

return val
LUA;
    }
複製代碼

腳本的大概意思是將delay中的score大於當前事件戳的任務取出,push到主隊列中去,而後將任務刪除。這裏使用lua腳本保證原子性。 接下來,就要從主隊列中獲取下一個任務,在取出下一個任務以後,還要將任務放入 reserved 隊列中,當任務執行失敗後,該任務會進行重試。

protected function retrieveNextJob($queue, $block = true)
    {
        $nextJob = $this->getConnection()->eval(
            LuaScripts::pop(), 3, $queue, $queue.':reserved', $queue.':notify',
            $this->availableAt($this->retryAfter)
        );

        if (empty($nextJob)) {
            return [null, null];
        }

        [$job, $reserved] = $nextJob;

        if (! $job && ! is_null($this->blockFor) && $block &&
            $this->getConnection()->blpop([$queue.':notify'], $this->blockFor)) {
            return $this->retrieveNextJob($queue, false);
        }

        return [$job, $reserved];
    }
    ......
    public static function pop()
    {
        return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false

if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
    redis.call('lpop', KEYS[3])
end

return {job, reserved}
LUA;
    }
......
複製代碼

從 redis 中獲取到 job 以後,就會將其包裝成 RedisJob 類. 若是一個腳本超時, pcntl_alarm 將會啓動並殺死當前的 work 進程。殺死進程後, work 進程將會被守護進程重啓,繼續進行下一個任務,若是任務註冊有fail函數還會執行失敗任務處理的相關邏輯。

protected function registerTimeoutHandler($job, WorkerOptions $options)
    {
        pcntl_signal(SIGALRM, function () use ($job, $options) {
            if ($job) {
                $this->markJobAsFailedIfWillExceedMaxAttempts(
                    $job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job)
                );
            }

            $this->kill(1);
        });

        pcntl_alarm(
            max($this->timeoutForJob($job, $options), 0)
        );
    }
    ......
    protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
    {
        $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

        if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
            $this->failJob($job, $e);
        }

        if ($maxTries > 0 && $job->attempts() >= $maxTries) {
            $this->failJob($job, $e);
        }
    }
    ......
     protected function failJob($job, $e)
    {
        return $job->fail($e);
    }
    ......
複製代碼

接下來就是執行任務了,runJob邏輯和以前描述差很少:

protected function runJob($job, $connectionName, WorkerOptions $options)
    {
        try {
            return $this->process($connectionName, $job, $options);
        } catch (Exception $e) {
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableError($e));

            $this->stopWorkerIfLostConnection($e);
        }
    }
    ......
    public function process($connectionName, $job, WorkerOptions $options)
    {
        try {
            $this->raiseBeforeJobEvent($connectionName, $job);

            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
                $connectionName, $job, (int) $options->maxTries
            );

            if ($job->isDeleted()) {
                return $this->raiseAfterJobEvent($connectionName, $job);
            }

            $job->fire();

            $this->raiseAfterJobEvent($connectionName, $job);
        } catch (Exception $e) {
            $this->handleJobException($connectionName, $job, $options, $e);
        } catch (Throwable $e) {
            $this->handleJobException(
                $connectionName, $job, $options, new FatalThrowableError($e)
            );
        }
    }
    ......
複製代碼

raiseBeforeJobEvent 函數用於觸發任務處理前的事件,raiseAfterJobEvent 函數用於觸發任務處理後的事件,這裏再也不多說。 接下來咱們再來看一下RedisJob(/vendor/illuminate/queue/Jobs/Job.php)中的fire()函數如何處理從隊列中取到的payload的:

public function fire()
    {
        $payload = $this->payload();
        [$class, $method] = JobName::parse($payload['job']);
        ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }
    ......
     public static function parse($job)
    {
        return Str::parseCallback($job, 'fire');
    }
    public static function resolve($name, $payload)
    {
        if (! empty($payload['displayName'])) {
            return $payload['displayName'];
        }

        return $name;
    }
    ......
     public static function parseCallback($callback, $default = null)
    {
        return static::contains($callback, '@') ? explode('@', $callback, 2) : [$callback, $default];
    }
複製代碼

分析代碼可知,RedisJob從payload中解析出要執行的Job類,使用隊列執行器Illuminate\Queue\CallQueuedHandler@call執行調用dispatchNow執行Job類的方法完成了消費:

public function call(Job $job, array $data)
    {
        try {
            $command = $this->setJobInstanceIfNecessary(
                $job, unserialize($data['command'])
            );
        } catch (ModelNotFoundException $e) {
            return $this->handleModelNotFound($job, $e);
        }

        $this->dispatcher->dispatchNow(
            $command, $this->resolveHandler($job, $command)
        );

        if (! $job->hasFailed() && ! $job->isReleased()) {
            $this->ensureNextJobInChainIsDispatched($command);
        }

        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }
    }
複製代碼

到這裏消費者從隊列中取出任務到消費的整個流程咱們就走完了。咱們作一個簡單的回顧總結。

3. 回顧總結

Lumen框架啓動時爲異步隊列任務提供了基礎的隊列服務和Bus任務分發服務。咱們程序中的生成者經過dispatch函數將任務push到隊列,能夠指定底層驅動,還能夠設置延時任務等。dispatch函數經過Bus服務將Job類包裝成payload添加到默認隊列,若是是延時任務會添加到Redis的Zset結構中。消費者在處理任務的時候會裝載信號,實現進程重啓、退出的同時保證任務不中斷;經過memory_get_usage(true)函數判斷任務是否內存超限;經過payload中的maxTries判斷任務是否須要重試;經過pcntl裝載計時器判斷是不是否執行超時;經過向任務類中添加fail函數來記錄失敗的任務;經過zset結構任務的中score和當前時間戳對比造成滑動窗口來執行延時任務。

相關文章
相關標籤/搜索