Swoft 源碼剖析 - Swoole和Swoft的那些事(Task投遞/定時任務篇)

做者:bromine
連接:https://www.jianshu.com/p/b44...
來源:簡書
著做權歸做者全部,本文已得到做者受權轉載,並對原文進行了從新的排版。
Swoft Github: https://github.com/swoft-clou...php

Swoft源碼剖析系列目錄: https://segmentfault.com/a/11...

前言

Swoft的任務功能基於SwooleTask機制,或者說SwoftTask機制本質就是對SwooleTask機制的封裝和增強。git

任務投遞

//Swoft\Task\Task.php
class Task
{
    /**
     * Deliver coroutine or async task
     *
     * @param string $taskName
     * @param string $methodName
     * @param array  $params
     * @param string $type
     * @param int    $timeout
     *
     * @return bool|array
     * @throws TaskException
     */
    public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
    {
        $data   = TaskHelper::pack($taskName, $methodName, $params, $type);

        if(!App::isWorkerStatus() && !App::isCoContext()){
            return self::deliverByQueue($data);//見下文Command章節
        }

        if(!App::isWorkerStatus() && App::isCoContext()){
            throw new TaskException('Please deliver task by http!');
        }


        $server = App::$server->getServer();
        // Delier coroutine task
        if ($type == self::TYPE_CO) {
            $tasks[0]  = $data;
            $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

            App::profileStart($prifleKey);
            $result = $server->taskCo($tasks, $timeout);
            App::profileEnd($prifleKey);

            return $result;
        }

        // Deliver async task
        return $server->task($data);
    }
}

任務投遞Task::deliver()將調用參數打包後根據$type參數經過Swoole$server->taskCo()$server->task()接口投遞到Task進程
Task自己始終是同步執行的,$type僅僅影響投遞這一操做的行爲,Task::TYPE_ASYNC對應的$server->task()是異步投遞,Task::deliver()調用後立刻返回;Task::TYPE_CO對應的$server->taskCo()是協程投遞,投遞後讓出協程控制,任務完成或執行超時後Task::deliver()才從協程返回。github

任務執行

//Swoft\Task\Bootstrap\Listeners\TaskEventListener 
/**
 * The listener of swoole task
 * @SwooleListener({
 *     SwooleEvent::ON_TASK,
 *     SwooleEvent::ON_FINISH,
 * })
 */
class TaskEventListener implements TaskInterface, FinishInterface
{
    /**
     * @param \Swoole\Server $server
     * @param int            $taskId
     * @param int            $workerId
     * @param mixed          $data
     * @return mixed
     * @throws \InvalidArgumentException
     */
    public function onTask(Server $server, int $taskId, int $workerId, $data)
    {
        try {
            /* @var TaskExecutor $taskExecutor*/
            $taskExecutor = App::getBean(TaskExecutor::class);
            $result = $taskExecutor->run($data);
        } catch (\Throwable $throwable) {
            App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
            $result = false;

            // Release system resources
            App::trigger(AppEvent::RESOURCE_RELEASE);

            App::trigger(TaskEvent::AFTER_TASK);
        }
        return $result;
    }
}

此處是swoole.onTask的事件回調,其職責僅僅是將將Worker進程投遞來的打包後的數據轉發給TaskExecutorshell

SwooleTask機制的本質是Worker進程將耗時任務投遞給同步的Task進程(又名TaskWorker)處理,因此swoole.onTask的事件回調是在Task進程中執行的。上文說過,Worker進程是你大部分HTTP服務代碼執行的環境,可是從TaskEventListener.onTask()方法開始,代碼的執行環境都是Task進程,也就是說,TaskExecutor和具體的TaskBean都是執行在Task進程中的。segmentfault

//Swoft\Task\TaskExecutor
/**
 * The task executor
 *
 * @Bean()
 */
class TaskExecutor
{
    /**
     * @param string $data
     * @return mixed
    */
    public function run(string $data)
    {
        $data = TaskHelper::unpack($data);

        $name   = $data['name'];
        $type   = $data['type'];
        $method = $data['method'];
        $params = $data['params'];
        $logid  = $data['logid'] ?? uniqid('', true);
        $spanid = $data['spanid'] ?? 0;


        $collector = TaskCollector::getCollector();
        if (!isset($collector['task'][$name])) {
            return false;
        }

        list(, $coroutine) = $collector['task'][$name];
        $task = App::getBean($name);
        if ($coroutine) {
            $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
        } else {
            $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
        }

        return $result;
    }
}

任務執行思路很簡單,將Worker進程發過來的數據解包還原成原來的調用參數,根據$name參數找到對應的TaskBean並調用其對應的task()方法。其中TaskBean使用類級別註解@Task(name="TaskName")或者@Task("TaskName")聲明。swoole

值得一提的一點是,@Task註解除了name屬性,還有一個coroutine屬性,上述代碼會根據該參數選擇使用協程的runCoTask()或者同步的runSyncTask()執行Task。可是因爲並且因爲SwooleTask進程的執行是徹底同步的,不支持協程,因此目前版本請該參數不要配置爲true。一樣的在TaskBean中編寫的任務代碼必須的同步阻塞的或者是要能根據環境自動將異步非阻塞和協程降級爲同步阻塞的異步

從Process中投遞任務

前面咱們提到:async

SwooleTask機制的本質是 Worker進程將耗時任務投遞給同步的 Task進程(又名 TaskWorker)處理。

換句話說,Swoole$server->taskCo()$server->task()都只能在Worker進程中使用。
這個限制大大的限制了使用場景。 如何可以爲了可以在Process中投遞任務呢?Swoft爲了繞過這個限制提供了Task::deliverByProcess()方法。其實現原理也很簡單,經過Swoole$server->sendMessage()方法將調用信息從Process中投遞到Worker進程中,而後由Worker進程替其投遞到Task進程當中,相關代碼以下:性能

//Swoft\Task\Task.php
/**
 * Deliver task by process
 *
 * @param string $taskName
 * @param string $methodName
 * @param array  $params
 * @param string $type
 * @param int    $timeout
 * @param int    $workId
 *
 * @return bool
 */
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
    /* @var PipeMessageInterface $pipeMessage */
    $server      = App::$server->getServer();
    $pipeMessage = App::getBean(PipeMessage::class);
    $data = [
        'name'    => $taskName,
        'method'  => $methodName,
        'params'  => $params,
        'timeout' => $timeout,
        'type'    => $type,
    ];

    $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
    return $server->sendMessage($message, $workId);
}

數據打包後使用$server->sendMessage()投遞給Worker:this

//Swoft\Bootstrap\Server\ServerTrait.php
/**
 * onPipeMessage event callback
 *
 * @param \Swoole\Server $server
 * @param int            $srcWorkerId
 * @param string         $message
 * @return void
 * @throws \InvalidArgumentException
 */
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
    /* @var PipeMessageInterface $pipeMessage */
    $pipeMessage = App::getBean(PipeMessage::class);
    list($type, $data) = $pipeMessage->unpack($message);

    App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}

$server->sendMessage後,Worker進程收到數據時會觸發一個swoole.pipeMessage事件的回調,Swoft會將其轉換成本身的swoft.pipeMessage事件並觸發.

//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
 * The pipe message listener
 *
 * @Listener(event=AppEvent::PIPE_MESSAGE)
 */
class PipeMessageListener implements EventHandlerInterface
{
    /**
     * @param \Swoft\Event\EventInterface $event
     */
    public function handle(EventInterface $event)
    {
        $params = $event->getParams();
        if (count($params) < 3) {
            return;
        }

        list($type, $data, $srcWorkerId) = $params;

        if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
            return;
        }

        $type       = $data['type'];
        $taskName   = $data['name'];
        $params     = $data['params'];
        $timeout    = $data['timeout'];
        $methodName = $data['method'];

        // delever task
        Task::deliver($taskName, $methodName, $params, $type, $timeout);
    }
}

swoft.pipeMessage事件最終由PipeMessageListener處理。在相關的監聽其中,若是發現swoft.pipeMessage事件由Task::deliverByProcess()產生的,Worker進程會替其執行一次Task::deliver(),最終將任務數據投遞到TaskWorker進程中。

一道簡單的回顧練習:從Task::deliverByProcess()到某TaskBean 最終執行任務,經歷了哪些進程,而調用鏈的哪些部分又分別是在哪些進程中執行?

從Command進程或其子進程中投遞任務

//Swoft\Task\QueueTask.php
/**
 * @param string $data
 * @param int    $taskWorkerId
 * @param int    $srcWorkerId
 *
 * @return bool
 */
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
    if ($taskWorkerId === null) {
        $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
    }

    if ($srcWorkerId === null) {
        $srcWorkerId = mt_rand(0, $this->workerNum - 1);
    }

    $this->check();
    $data   = $this->pack($data, $srcWorkerId);
    $result = \msg_send($this->queueId, $taskWorkerId, $data, false);
    if (!$result) {
        return false;
    }

    return true;
}

對於Command進程的任務投遞,狀況會更復雜一點。
上文提到的Process,其每每衍生於Http/Rpc服務,做爲同一個Manager的子孫進程,他們可以拿到Swoole\Server的句柄變量,從而經過$server->sendMessage(),$server->task()等方法進行任務投遞。

但在Swoft的體系中,還有一個十分路人的角色: Command
Command的進程從shellcronb獨立啓動,和Http/Rpc服務相關的進程沒有親緣關係。所以Command進程以及從Command中啓動的Process進程是沒有辦法拿到Swoole\Server的調用句柄直接經過UnixSocket進行任務投遞的。
爲了爲這種進程提供任務投遞支持,Swoft利用了SwooleTask進程的一個特殊功能----消息隊列

使用消息隊列的Task進程.png

同一個項目中CommandHttp\RpcServer 經過約定一個message_queue_key獲取到系統內核中的同一條消息隊列,而後Comand進程就能夠經過該消息隊列向Task進程投遞任務了。
該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()方法中,Swoft會根據當前環境隱式切換投遞方式。但該消息隊列的實現依賴Semaphore拓展,若是你想使用,須要在編譯PHP時加上--enable-sysvmsg參數。

定時任務

除了手動執行的普通任務,Swoft還提供了精度爲秒的定時任務功能用來在項目中替代Linux的Crontab功能.

Swoft用兩個前置Process---任務計劃進程:CronTimerProcess和任務執行進程CronExecProcess
,和兩張內存數據表-----RunTimeTable(任務(配置)表)OriginTable((任務)執行表)用於定時任務的管理調度。
兩張表的每行記錄的結構以下:

\\Swoft\Task\Crontab\TableCrontab.php
/**
 * 任務表,記錄用戶配置的任務信息
 * 表每行記錄包含的字段以下,其中`rule`,`taskClass`,`taskMethod`生成key惟一肯定一條記錄
 * @var array $originStruct 
 */
private $originStruct = [
    'rule'       => [\Swoole\Table::TYPE_STRING, 100],//定時任務執行規則,對應@Scheduled註解的cron屬性
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//任務名 對應@Task的name屬性(默認爲類名)
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,對應@Scheduled註解所在方法
    'add_time'   => [\Swoole\Table::TYPE_STRING, 11],//初始化該表內容時的10位時間戳
];

/**
 * 執行表,記錄短期內要執行的任務列表及其執行狀態
 * 表每行記錄包含的字段以下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key惟一肯定一條記錄
 * @var array $runTimeStruct 
 */
private $runTimeStruct = [
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//同上
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
    'minute'      => [\Swoole\Table::TYPE_STRING, 20],//須要執行任務的時間,精確到分鐘 格式date('YmdHi')
    'sec'        => [\Swoole\Table::TYPE_STRING, 20],//須要執行任務的時間,精確到分鐘 10位時間戳
    'runStatus'  => [\Swoole\TABLE::TYPE_INT, 4],//任務狀態,有 0(未執行)  1(已執行)  2(執行中) 三種。 
    //注意:這裏的執行是一個容易誤解的地方,此處的執行並非指任務自己的執行,而是值`任務投遞`這一操做的執行,從宏觀上看換成 _未投遞_,_已投遞_,_投遞中_描述會更準確。
];

此處爲什麼要使用Swoole的內存Table?

Swoft的的定時任務管理是分別由 任務計劃進程任務執行進程 進程負責的。兩個進程的運行共同管理定時任務,若是使用進程間獨立的array()等結構,兩個進程必然須要頻繁的進程間通訊。而使用跨進程的Table(本文的Table,除非特別說明,都指SwooleSwoole\Table結構)直接進行進程間數據共享,不只性能高,操做簡單 還解耦了兩個進程。

爲了Table可以在兩個進程間共同使用,Table必須在Swoole Server啓動前建立並分配內存。具體代碼在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中,比較簡單,有興趣的能夠自行閱讀。

背景介紹完了,咱們來看看這兩個定時任務進程的行爲

//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
 * Crontab timer process
 *
 * @Process(name="cronTimer", boot=true)
 */
class CronTimerProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        //code....
        /* @var \Swoft\Task\Crontab\Crontab $cron*/
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $time = (60 - date('s')) * 1000;
        $server->after($time, function () use ($server, $cron) {
            // Every minute check all tasks, and prepare the tasks that next execution point needs
            $cron->checkTask();
            $server->tick(60 * 1000, function () use ($cron) {
                $cron->checkTask();
            });
        });
    }
}
//Swoft\Task\Crontab\Crontab.php
/**
 * 初始化runTimeTable數據
 *
 * @param array $task        任務
 * @param array $parseResult 解析crontab命令規則結果,即Task須要在當前分鐘內的哪些秒執行
 * @return bool
 */
private function initRunTimeTableData(array $task, array $parseResult): bool
{
    $runTimeTableTasks = $this->getRunTimeTable()->table;

    $min = date('YmdHi');
    $sec = strtotime(date('Y-m-d H:i'));
    foreach ($parseResult as $time) {
        $this->checkTaskQueue(false);
        $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
        $runTimeTableTasks->set($key, [
            'taskClass'  => $task['taskClass'],
            'taskMethod' => $task['taskMethod'],
            'minute'     => $min,
            'sec'        => $time + $sec,
            'runStatus'  => self::NORMAL
        ]);
    }

    return true;
}

CronTimerProcessSwoft的定時任務調度進程,其核心方法是Crontab->initRunTimeTableData()
該進程使用了Swoole的定時器功能,經過Swoole\Timer在每分鐘首秒時執行的回調,CronTimerProcess每次被喚醒後都會遍歷任務表計算出當前這一分鐘內的60秒分別須要執行的任務清單,寫入執行表並標記爲 未執行。

//Swoft\Task\Bootstrap\Process
/**
 * Crontab process
 *
 * @Process(name="cronExec", boot=true)
 */
class CronExecProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        $pname = App::$server->getPname();
        $process->name(sprintf('%s cronexec process', $pname));

        /** @var \Swoft\Task\Crontab\Crontab $cron */
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $server->tick(0.5 * 1000, function () use ($cron) {
            $tasks = $cron->getExecTasks();
            if (!empty($tasks)) {
                foreach ($tasks as $task) {
                    // Diliver task
                    Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
                    $cron->finishTask($task['key']);
                }
            }
        });
    }
}

CronExecProcess做爲定時任務的執行者,經過Swoole\Timer0.5s喚醒自身一次,而後把 執行表 遍歷一次,挑選當下須要執行的任務,經過sendMessage()投遞出去並更新該 任務執行表中的狀態。
該執行進程只負責任務的投遞,任務的實際實際執行仍然在Task進程中由TaskExecutor處理。

定時任務的宏觀執行狀況以下:

Swoft定時任務機制.png

相關文章
相關標籤/搜索