【Laravel-海賊王系列】第十章,Job&隊列存儲端實現

任務的存儲端

Job 的定義

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct()
    {
        echo '開始構造Job';
    }
    
    public function handle()
    {
        echo '開始處理Job';
    }
}

複製代碼

新建的 TestJob 類,這個類實現了序列化模型,隊列功能等等都是經過trait類來補充的。 這些特性咱們經過使用來分解。php

運行一個任務

dispatch(new TestJob());
複製代碼

這裏就是執行一個 TestJob 的任務,接下去看看 dispatch() 這個方法redis

function dispatch($job)
    {
        if ($job instanceof Closure) {
            $job = new CallQueuedClosure(new SerializableClosure($job));
        }

        return new PendingDispatch($job);
    }
複製代碼

這裏會返回一個 Illuminate\Foundation\Bus\PendingDispatch 對象json

TestJob 這個對象裏面經過 use Queueable 引入的幾個成員屬性。 目前爲止咱們看到只不過是實例化了一個對象,同時將 TestJob 傳給 PendingDispatchbash

咱們來解讀 PendingDispatch 這個類閉包

<?php

namespace Illuminate\Foundation\Bus;

use Illuminate\Contracts\Bus\Dispatcher;

class PendingDispatch
{
    protected $job;
   
    public function __construct($job)
    {
        // "接收傳入的 job 對象"
        $this->job = $job; 
    }

    public function onConnection($connection)
    {
        // "設置任務指定鏈接"
        $this->job->onConnection($connection); 

        return $this;
    }

    public function onQueue($queue)
    {
        // "設置任務隊列名"
        $this->job->onQueue($queue);

        return $this;
    }

    public function allOnConnection($connection)
    {
        // "設置工做鏈全部須要的鏈接"
        $this->job->allOnConnection($connection);

        return $this;
    }

    public function allOnQueue($queue)
    {
        // "設置工做鏈的隊列"
        $this->job->allOnQueue($queue);

        return $this;
    }

    public function delay($delay)
    {
        // "設置延遲時間"
        $this->job->delay($delay);

        return $this;
    }
  
    public function chain($chain)
    {
        // "設置工做鏈任務"
        $this->job->chain($chain);

        return $this;
    }

    public function __destruct()
    {
        // "經過析構函數來轉發job"
        app(Dispatcher::class)->dispatch($this->job);
    }
}
複製代碼

分解完這個類,其實大部分都是設置參數的過程,也是經過這些參數來控制任務的執行狀態,好比延遲,工做鏈模式運行等等。app

重點在析構函數,當運行完 return new PendingDispatch($job); 以後對象若是沒有被任何變量接收,那麼對象的內存空間會被回收,從而觸發析構函數執行,也是觸發 job 繼續執行的方式!dom

public function __destruct()
{
    // "經過析構函數來轉發job"
    app(Dispatcher::class)->dispatch($this->job);
}
複製代碼

獲取任務對應的解析器

app(Dispatcher::class) 傳入的參數是 Illuminate\Bus\Dispatcher , 這個契約對應的綁定類是經過配置文件 app.providers.Illuminate\Bus\BusServiceProvider::class 來加載的 關於 provider 的啓動在第九章中有講,咱們直接看啓動方法ide

public function register()
{
    $this->app->singleton(Dispatcher::class, function ($app) {
        return new Dispatcher($app, function ($connection = null) use ($app) {
            return $app[QueueFactoryContract::class]->connection($connection);
        });
    });

    $this->app->alias(
        Dispatcher::class, DispatcherContract::class
    );

    $this->app->alias(
        Dispatcher::class, QueueingDispatcherContract::class
    );
}
複製代碼

app(Dispatcher::class) 的實質就是這個閉包的返回函數

function ($app) {
    return new Dispatcher($app, function ($connection = null) use ($app) {
        return $app[QueueFactoryContract::class]->connection($connection);
    });
}
複製代碼

看看 Dispatcher 構造函數ui

public function __construct(Container $container, Closure $queueResolver = null)
{
    $this->container = $container;
    $this->queueResolver = $queueResolver;
    $this->pipeline = new Pipeline($container);
}
複製代碼

接受兩個參數,第一個是容器,第二個就是閉包因此 $this->queueResolver 就是

function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
         }
複製代碼

我管這個 $this->queueResolver 叫解析器,做用是接收一個 $connection 而後從容器中解析出隊列的驅動並進行鏈接。

QueueFactoryContract::class 是經過 provider 加載的

位於 app.providers.Illuminate\Queue\QueueServiceProvider::class,

返回的對象是 Illuminate\Queue\QueueManager 因爲 'default' => env('QUEUE_CONNECTION', 'sync'),

中配置的 redis 因此最後返回的對象是 Illuminate\Queue\RedisQueue

分發任務到隊列

public function dispatch($command)
{
    // "$this->queueResolver 這個隊列解析器是在構造的時候注入的"
    if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        return $this->dispatchToQueue($command);
    } 

    return $this->dispatchNow($command);
}
複製代碼

上面的方法明確了任務是該經過隊列仍是同步執行。

這裏咱們看,傳入的 $command 就是開始的 TestJob 對象。

還記得 Laravel 文檔說的若是要經過隊列實現須要實現一個指定的接口嗎

implements ShouldQueue,這段代碼就是解釋了緣由。

protected function commandShouldBeQueued($command)
{
    return $command instanceof ShouldQueue;
}
複製代碼

繼續下去,經過上面的判斷以後咱們進入 dispatchToQueue($command) 這裏

public function dispatchToQueue($command)
{
    $connection = $command->connection ?? null;

    $queue = call_user_func($this->queueResolver, $connection);

    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }

    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }

    return $this->pushCommandToQueue($queue, $command);
}
複製代碼

上面解析過了 $queue 就是 Illuminate\Queue\RedisQueue 這個對象

// "返回 false"
if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }
複製代碼

全部最後執行了 return $this->pushCommandToQueue($queue, $command);

protected function pushCommandToQueue($queue, $command)
{
    // "若是存在指定的隊列和延遲,則推入指定隊列+延遲"
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    } 

    // "若是存在指定的隊列則push到指定的隊列"
    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }

    // "只存在延遲設置,推入延遲"
    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }

    // "默認"
    return $queue->push($command);
}
複製代碼

構造數據

上面已經到了最終的調用,那麼接下來的事情就是構造一個什麼樣格式的數據存入redis

追蹤 $queue->push($command)

// "這裏的 $job 就是最開始傳入的 TestJob 對象!"

public function push($job, $data = '', $queue = null)
{
    return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}
複製代碼

構造 payload

protected function createPayload($job, $queue, $data = '')
{
    $payload = json_encode($this->createPayloadArray($job, $queue, $data));

    if (JSON_ERROR_NONE !== json_last_error()) {
        throw new InvalidPayloadException(
            'Unable to JSON encode payload. Error code: '.json_last_error()
        );
    }

    return $payload;
}
複製代碼

這裏的 createPayloadArray() 先調用Illuminate\Queue\RedisQueue對象的

protected function createPayloadArray($job, $queue, $data = '')
{
    return array_merge(parent::createPayloadArray($job, $queue, $data), [
        'id' => $this->getRandomId(),
        'attempts' => 0,
    ]);
}
複製代碼

追蹤父類Illuminate\Queue\Queue 方法

protected function createPayloadArray($job, $queue, $data = '')
{
    return is_object($job)
                ? $this->createObjectPayload($job, $queue)
                : $this->createStringPayload($job, $queue, $data);
}    
    
// "$job 是對象的時候格式化方式"
protected function createObjectPayload($job, $queue)
{
    $payload = $this->withCreatePayloadHooks($queue, [
        'displayName' => $this->getDisplayName($job),
        'job' => 'Illuminate\Queue\CallQueuedHandler@call',
        'maxTries' => $job->tries ?? null, // "這是任務設置的重試次數"
        'timeout' => $job->timeout ?? null, // "這是超時時間"
        'timeoutAt' => $this->getJobExpiration($job), // "獲取處理過時時間"
        'data' => [
            'commandName' => $job,
            'command' => $job,
        ],
    ]);

    return array_merge($payload, [
        'data' => [
            'commandName' => get_class($job),
            'command' => serialize(clone $job), 
                        // "序列化,這裏的序列化會調用 SerializesModels 特質類的__sleep()方法 在開頭的時候全部的 Job 類都有use"
        ],
    ]);
}
    
// "$job 是字符串的時候格式化方式"
protected function createStringPayload($job, $queue, $data)
{
    return $this->withCreatePayloadHooks($queue, [
        'displayName' => is_string($job) ? explode('@', $job)[0] : null,
        'job' => $job,
        'maxTries' => null,
        'timeout' => null,
        'data' => $data,
    ]);
}    
複製代碼

將獲取的最後 json 字符串 rpushredis 中。

public function pushRaw($payload, $queue = null, array $options = [])
{
    $this->getConnection()->rpush($this->getQueue($queue), $payload); 
    return json_decode($payload, true)['id'] ?? null; 
}    
複製代碼

至於延遲任務 return $queue->later($command->delay, $command);, 邏輯基本上同樣,只不過最後存入的隊列是名不同

小結

到這裏位置關於任務和隊列的應用寫入端口已經完成,最終是把指定的格式的數據存入配置的存儲驅動中的過程。

相關文章
相關標籤/搜索