這篇文章給你們分享的內容是關於Swoft 源碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有必定的參考價值,有須要的朋友能夠參考一下。php
Swoft
的任務功能基於Swoole
的Task機制
,或者說Swoft
的Task
機制本質就是對Swoole
的Task機制
的封裝和增強。laravel
個人官方羣點擊此處。sql
//Swoft\Task\Task.php class Task { /** * Deliver coroutine or async task * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * * @return bool|array * @throws TaskException */ public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3) { $data = TaskHelper::pack($taskName, $methodName, $params, $type); if(!App::isWorkerStatus() && !App::isCoContext()){ return self::deliverByQueue($data);//見下文Command章節 } if(!App::isWorkerStatus() && App::isCoContext()){ throw new TaskException('Please deliver task by http!'); } $server = App::$server->getServer(); // Delier coroutine task if ($type == self::TYPE_CO) { $tasks[0] = $data; $prifleKey = 'task' . '.' . $taskName . '.' . $methodName; App::profileStart($prifleKey); $result = $server->taskCo($tasks, $timeout); App::profileEnd($prifleKey); return $result; } // Deliver async task return $server->task($data); } }
任務投遞Task::deliver()
將調用參數打包後根據$type
參數經過Swoole
的$server->taskCo()
或$server->task()
接口投遞到Task進程
。Task
自己始終是同步執行的,$type
僅僅影響投遞這一操做的行爲,Task::TYPE_ASYNC
對應的$server->task()
是異步投遞,Task::deliver()
調用後立刻返回;Task::TYPE_CO
對應的$server->taskCo()
是協程投遞,投遞後讓出協程控制,任務完成或執行超時後Task::deliver()
才從協程返回。shell
//Swoft\Task\Bootstrap\Listeners\TaskEventListener /** * The listener of swoole task * @SwooleListener({ * SwooleEvent::ON_TASK, * SwooleEvent::ON_FINISH, * }) */ class TaskEventListener implements TaskInterface, FinishInterface { /** * @param \Swoole\Server $server * @param int $taskId * @param int $workerId * @param mixed $data * @return mixed * @throws \InvalidArgumentException */ public function onTask(Server $server, int $taskId, int $workerId, $data) { try { /* @var TaskExecutor $taskExecutor*/ $taskExecutor = App::getBean(TaskExecutor::class); $result = $taskExecutor->run($data); } catch (\Throwable $throwable) { App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine())); $result = false; // Release system resources App::trigger(AppEvent::RESOURCE_RELEASE); App::trigger(TaskEvent::AFTER_TASK); } return $result; } }
此處是swoole.onTask
的事件回調,其職責僅僅是將將Worker
進程投遞來的打包後的數據轉發給TaskExecutor
。服務器
Swoole
的Task
機制的本質是Worker進程
將耗時任務投遞給同步的Task進程
(又名TaskWorker
)處理,因此swoole.onTask
的事件回調是在Task進程
中執行的。上文說過,Worker進程
是你大部分HTTP
服務代碼執行的環境,可是從TaskEventListener.onTask()
方法開始,代碼的執行環境都是Task進程
,也就是說,TaskExecutor
和具體的TaskBean
都是執行在Task進程
中的。swoole
//Swoft\Task\TaskExecutor /** * The task executor * * @Bean() */ class TaskExecutor { /** * @param string $data * @return mixed */ public function run(string $data) { $data = TaskHelper::unpack($data); $name = $data['name']; $type = $data['type']; $method = $data['method']; $params = $data['params']; $logid = $data['logid'] ?? uniqid('', true); $spanid = $data['spanid'] ?? 0; $collector = TaskCollector::getCollector(); if (!isset($collector['task'][$name])) { return false; } list(, $coroutine) = $collector['task'][$name]; $task = App::getBean($name); if ($coroutine) { $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type); } else { $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type); } return $result; } }
任務執行思路很簡單,將Worker進程
發過來的數據解包還原成原來的調用參數,根據$name
參數找到對應的TaskBean
並調用其對應的task()
方法。其中TaskBean
使用類級別註解@Task(name="TaskName")
或者@Task("TaskName")
聲明。架構
值得一提的一點是,@Task
註解除了name
屬性,還有一個coroutine
屬性,上述代碼會根據該參數選擇使用協程的runCoTask()
或者同步的runSyncTask()
執行Task
。可是因爲並且因爲Swoole
的Task進程
的執行是徹底同步的,不支持協程,因此目前版本請該參數不要配置爲true
。一樣的在TaskBean
中編寫的任務代碼必須的同步阻塞的或者是要能根據環境自動將異步非阻塞和協程降級爲同步阻塞的併發
前面咱們提到:異步
Swoole
的Task
機制的本質是Worker進程
將耗時任務投遞給同步的Task進程
(又名TaskWorker
)處理。
換句話說,Swoole
的$server->taskCo()
或$server->task()
都只能在Worker進程
中使用。async
這個限制大大的限制了使用場景。 如何可以爲了可以在Process
中投遞任務呢?Swoft
爲了繞過這個限制提供了Task::deliverByProcess()
方法。其實現原理也很簡單,經過Swoole
的$server->sendMessage()
方法將調用信息從Process
中投遞到Worker進程
中,而後由Worker進程替其投遞到Task進程
當中,相關代碼以下:
//Swoft\Task\Task.php /** * Deliver task by process * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * @param int $workId * * @return bool */ public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool { /* @var PipeMessageInterface $pipeMessage */ $server = App::$server->getServer(); $pipeMessage = App::getBean(PipeMessage::class); $data = [ 'name' => $taskName, 'method' => $methodName, 'params' => $params, 'timeout' => $timeout, 'type' => $type, ]; $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data); return $server->sendMessage($message, $workId); }
數據打包後使用$server->sendMessage()
投遞給Worker
:
//Swoft\Bootstrap\Server\ServerTrait.php /** * onPipeMessage event callback * * @param \Swoole\Server $server * @param int $srcWorkerId * @param string $message * @return void * @throws \InvalidArgumentException */ public function onPipeMessage(Server $server, int $srcWorkerId, string $message) { /* @var PipeMessageInterface $pipeMessage */ $pipeMessage = App::getBean(PipeMessage::class); list($type, $data) = $pipeMessage->unpack($message); App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId); }
$server->sendMessage
後,Worker進程
收到數據時會觸發一個swoole.pipeMessage
事件的回調,Swoft
會將其轉換成本身的swoft.pipeMessage
事件並觸發.
//Swoft\Task\Event\Listeners\PipeMessageListener.php /** * The pipe message listener * * @Listener(event=AppEvent::PIPE_MESSAGE) */ class PipeMessageListener implements EventHandlerInterface { /** * @param \Swoft\Event\EventInterface $event */ public function handle(EventInterface $event) { $params = $event->getParams(); if (count($params) < 3) { return; } list($type, $data, $srcWorkerId) = $params; if ($type != PipeMessage::MESSAGE_TYPE_TASK) { return; } $type = $data['type']; $taskName = $data['name']; $params = $data['params']; $timeout = $data['timeout']; $methodName = $data['method']; // delever task Task::deliver($taskName, $methodName, $params, $type, $timeout); } }
swoft.pipeMessage
事件最終由PipeMessageListener
處理。在相關的監聽其中,若是發現swoft.pipeMessage
事件由Task::deliverByProcess()
產生的,Worker進程
會替其執行一次Task::deliver()
,最終將任務數據投遞到TaskWorker進程
中。
一道簡單的回顧練習:從Task::deliverByProcess()
到某TaskBean
最終執行任務,經歷了哪些進程,而調用鏈的哪些部分又分別是在哪些進程中執行?
//Swoft\Task\QueueTask.php /** * @param string $data * @param int $taskWorkerId * @param int $srcWorkerId * * @return bool */ public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null) { if ($taskWorkerId === null) { $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum); } if ($srcWorkerId === null) { $srcWorkerId = mt_rand(0, $this->workerNum - 1); } $this->check(); $data = $this->pack($data, $srcWorkerId); $result = \msg_send($this->queueId, $taskWorkerId, $data, false); if (!$result) { return false; } return true; }
對於Command
進程的任務投遞,狀況會更復雜一點。
上文提到的Process
,其每每衍生於Http/Rpc
服務,做爲同一個Manager
的子孫進程,他們可以拿到Swoole\Server
的句柄變量,從而經過$server->sendMessage()
,$server->task()
等方法進行任務投遞。
但在Swoft
的體系中,還有一個十分路人的角色: Command
。Command
的進程從shell
或cronb
獨立啓動,和Http/Rpc
服務相關的進程沒有親緣關係。所以Command
進程以及從Command
中啓動的Process
進程是沒有辦法拿到Swoole\Server
的調用句柄直接經過UnixSocket
進行任務投遞的。
爲了爲這種進程提供任務投遞支持,Swoft
利用了Swoole
的Task進程
的一個特殊功能----消息隊列。
同一個項目中Command
和Http\RpcServer
經過約定一個message_queue_key
獲取到系統內核中的同一條消息隊列,而後Comand
進程就能夠經過該消息隊列向Task進程
投遞任務了。
該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()
方法中,Swoft
會根據當前環境隱式切換投遞方式。但該消息隊列的實現依賴Semaphore
拓展,若是你想使用,須要在編譯PHP
時加上--enable-sysvmsg
參數。
除了手動執行的普通任務,Swoft
還提供了精度爲秒的定時任務功能用來在項目中替代Linux的Crontab
功能.
Swoft
用兩個前置Process
---任務計劃進程:CronTimerProcess
和任務執行進程CronExecProcess
,和兩張內存數據表-----RunTimeTable
(任務(配置)表)OriginTable
((任務)執行表)用於定時任務的管理調度。
兩張表的每行記錄的結構以下:
\\Swoft\Task\Crontab\TableCrontab.php /** * 任務表,記錄用戶配置的任務信息 * 表每行記錄包含的字段以下,其中`rule`,`taskClass`,`taskMethod`生成key惟一肯定一條記錄 * @var array $originStruct */ private $originStruct = [ 'rule' => [\Swoole\Table::TYPE_STRING, 100],//定時任務執行規則,對應@Scheduled註解的cron屬性 'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//任務名 對應@Task的name屬性(默認爲類名) 'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,對應@Scheduled註解所在方法 'add_time' => [\Swoole\Table::TYPE_STRING, 11],//初始化該表內容時的10位時間戳 ]; /** * 執行表,記錄短期內要執行的任務列表及其執行狀態 * 表每行記錄包含的字段以下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key惟一肯定一條記錄 * @var array $runTimeStruct */ private $runTimeStruct = [ 'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//同上 'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上 'minute' => [\Swoole\Table::TYPE_STRING, 20],//須要執行任務的時間,精確到分鐘 格式date('YmdHi') 'sec' => [\Swoole\Table::TYPE_STRING, 20],//須要執行任務的時間,精確到分鐘 10位時間戳 'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],//任務狀態,有 0(未執行) 1(已執行) 2(執行中) 三種。 //注意:這裏的執行是一個容易誤解的地方,此處的執行並非指任務自己的執行,而是值`任務投遞`這一操做的執行,從宏觀上看換成 _未投遞_,_已投遞_,_投遞中_描述會更準確。 ];
Swoft
的的定時任務管理是分別由 任務計劃進程 和 任務執行進程 進程負責的。兩個進程的運行共同管理定時任務,若是使用進程間獨立的array()
等結構,兩個進程必然須要頻繁的進程間通訊。而使用跨進程的Table
(本文的Table
,除非特別說明,都指Swoole
的Swoole\Table
結構)直接進行進程間數據共享,不只性能高,操做簡單 還解耦了兩個進程。
爲了Table
可以在兩個進程間共同使用,Table
必須在Swoole Server
啓動前建立並分配內存。具體代碼在Swoft\Task\Bootstrap\Listeners->onBeforeStart()
中,比較簡單,有興趣的能夠自行閱讀。
背景介紹完了,咱們來看看這兩個定時任務進程的行爲
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php /** * Crontab timer process * * @Process(name="cronTimer", boot=true) */ class CronTimerProcess implements ProcessInterface { /** * @param \Swoft\Process\Process $process */ public function run(SwoftProcess $process) { //code.... /* @var \Swoft\Task\Crontab\Crontab $cron*/ $cron = App::getBean('crontab'); // Swoole/HttpServer $server = App::$server->getServer(); $time = (60 - date('s')) * 1000; $server->after($time, function () use ($server, $cron) { // Every minute check all tasks, and prepare the tasks that next execution point needs $cron->checkTask(); $server->tick(60 * 1000, function () use ($cron) { $cron->checkTask(); }); }); } } //Swoft\Task\Crontab\Crontab.php /** * 初始化runTimeTable數據 * * @param array $task 任務 * @param array $parseResult 解析crontab命令規則結果,即Task須要在當前分鐘內的哪些秒執行 * @return bool */ private function initRunTimeTableData(array $task, array $parseResult): bool { $runTimeTableTasks = $this->getRunTimeTable()->table; $min = date('YmdHi'); $sec = strtotime(date('Y-m-d H:i')); foreach ($parseResult as $time) { $this->checkTaskQueue(false); $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec); $runTimeTableTasks->set($key, [ 'taskClass' => $task['taskClass'], 'taskMethod' => $task['taskMethod'], 'minute' => $min, 'sec' => $time + $sec, 'runStatus' => self::NORMAL ]); } return true; }
CronTimerProcess
是Swoft
的定時任務調度進程,其核心方法是Crontab->initRunTimeTableData()
。
該進程使用了Swoole
的定時器功能,經過Swoole\Timer
在每分鐘首秒時執行的回調,CronTimerProcess
每次被喚醒後都會遍歷任務表計算出當前這一分鐘內的60秒分別須要執行的任務清單,寫入執行表並標記爲 未執行。
//Swoft\Task\Bootstrap\Process /** * Crontab process * * @Process(name="cronExec", boot=true) */ class CronExecProcess implements ProcessInterface { /** * @param \Swoft\Process\Process $process */ public function run(SwoftProcess $process) { $pname = App::$server->getPname(); $process->name(sprintf('%s cronexec process', $pname)); /** @var \Swoft\Task\Crontab\Crontab $cron */ $cron = App::getBean('crontab'); // Swoole/HttpServer $server = App::$server->getServer(); $server->tick(0.5 * 1000, function () use ($cron) { $tasks = $cron->getExecTasks(); if (!empty($tasks)) { foreach ($tasks as $task) { // Diliver task Task::deliverByProcess($task['taskClass'], $task['taskMethod']); $cron->finishTask($task['key']); } } }); } }
CronExecProcess
做爲定時任務的執行者,經過Swoole\Timer
每0.5s
喚醒自身一次,而後把 執行表
遍歷一次,挑選當下須要執行的任務,經過sendMessage()
投遞出去並更新該 任務執行表中的狀態。
該執行進程只負責任務的投遞,任務的實際實際執行仍然在Task進程
中由TaskExecutor
處理。
以上內容但願幫助到你們,不少PHPer在進階的時候總會遇到一些問題和瓶頸,業務代碼寫多了沒有方向感,不知道該從那裏入手去提高,對此我整理了一些資料,包括但不限於:分佈式架構、高可擴展、高性能、高併發、服務器性能調優、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優化、shell腳本、Docker、微服務、Nginx等多個知識點高級進階乾貨須要的能夠免費分享給你們,須要的能夠加入個人官方羣點擊此處。