前置閱讀: Hyperf/Crontab使用文檔
前置閱讀: Hyperf/Process自定義進程使用文檔
前置閱讀: Hyperf事件機制
以前作項目用到了Hyperf/Crontab組件來進行秒級的數據清洗,最近又在作定時任務的拆分,因而就打算過一遍組件源碼加深理解,順便構思一下如何在此基礎上搭建Hyperf/Crontab的任務調度功能。Crontab本質上是一個隨Server啓動的自定義進程,因此接下來咱們將從啓動和執行兩個階段來進行介紹。php
因爲Crontab是一個隨着Server啓動的進程,分析他的生命週期確定會設計到框架的啓動。但本文不是主要介紹Hyperf框架啓動源碼的,因此咱們會簡單的過一下涉及到自定義進程啓動的框架啓動代碼。
當咱們使用命令 php bin/hyperf.php start 啓動hyperf時算法
$application = $container->get(\Hyperf\Contract\ApplicationInterface::class);
在入口文件 hyperf.php 框架會實例化一個 HyperfFrameworkApplicationFactory 實例,同時會掃描一遍全部帶 @Command 註解或者定義了 commands 配置的地方,實例化一個SymfonyComponentConsoleApplication對象 (這是一個Symfony的Consoul命令類用於定義命令觸發執行的任務) 。將全部被定義爲command的對象類註冊到$application對象中,最後在入口文件執行編程
$application->run();
執行全部的Command命令,其中包括 HyperfServerCommandStartServer 服務Server啓動類。在這個類裏面定義了若是接收到的命令參數包含 start 那麼就會執行他的邏輯,即根據 config/autoload/server.php 的配置實例化HyperfServerServer類,在這個過程當中會觸發BeforeMainServerStart事件,到這裏咱們即將進入自定義進程啓動的核心階段。緩存
$serverProcesses = $serverConfig['processes'] ?? []; $processes = $this->config->get('processes', []); $annotationProcesses = $this->getAnnotationProcesses(); // Retrieve the processes have been registered. $processes = array_merge($serverProcesses, $processes, ProcessManager::all(), array_keys($annotationProcesses)); foreach ($processes as $process) { ... if ($instance instanceof ProcessInterface) { $instance->isEnable() && $instance->bind($server); } }
BootProcessListener監聽到BeforeMainServerStart事件的觸發會拿到全部 @Process 和 定義在processes配置文件的進程類,執行他們的 isEnable 和 bind 方法。服務器
在上面咱們對自定義進程是如何隨框架啓動的進行了簡單的介紹,接下來咱們對本文的主角Crontab自定義進程進行解析。
Hyperf文檔介紹在使用Crontab以前須要在 config/autoload/processes.php 內註冊一下 HyperfCrontabProcessCrontabDispatcherProcess 自定義進程。那麼咱們就直接來看一下這個process類裏面作了什麼事情。
若是你對 Hyperf/Process 組件使用熟悉的話會知道,一個process類主要執行的邏輯都在 handle() 方法
內。app
public function handle(): void { $this->event->dispatch(new CrontabDispatcherStarted()); while (true) { $this->sleep(); $crontabs = $this->scheduler->schedule(); while (! $crontabs->isEmpty()) { $crontab = $crontabs->dequeue(); $this->strategy->dispatch($crontab); } } }
在handle內首先觸發來一個 CrontabDispatcherStarted 事件,目前這個事件無人監聽。接下來就是一段時間的協程阻塞,阻塞時間第一次爲距離下一次整分鐘的秒數,其他的都是60s。關於爲何要使用 SwooleCoroutine::sleep 而不是直接 sleep() ,是由於自定義進程默認是一個協程的Server。
接下來框架
$crontabs = $this->scheduler->schedule();
返回一個當前這一分鐘該執行的SplQueue隊列,隊列中的是Crontab對象函數
object(Hyperf\Crontab\Crontab)#46164 (10) { ["name":protected]=> string(4) "Foo4" ["type":protected]=> string(8) "callback" ["rule":protected]=> string(11) "* * * * * *" ["singleton":protected]=> bool(false) ["mutexPool":protected]=> string(7) "default" ["mutexExpires":protected]=> int(3600) ["onOneServer":protected]=> bool(true) ["callback":protected]=> array(2) { [DEBUG] Event Hyperf\Framework\Event\OnPipeMessage handled by Hyperf\Crontab\Listener\OnPipeMessageListener listener. [0]=> string(16) "App\Task\FooTask" [1]=> string(7) "execute" } ["memo":protected]=> NULL ["executeTime":protected]=> object(Carbon\Carbon)#46106 (3) { ["date"]=> string(26) "2020-06-02 14:15:57.000000" ["timezone_type"]=> int(3) ["timezone"]=> string(13) "Asia/Shanghai" } }
這個對象中記錄了咱們目前比較關注的兩個關鍵信息:性能
關於這個crontab對象是如何生成的,這個咱們在後面會介紹到。
在咱們拿到crontab對象後,咱們會把對象發送給在 config/autoload/dependencies.php 定義好的 StrategyInterface 的實現類來進行dispatch,默認指定的是Worker 進程執行策略。this
$server = $this->serverFactory->getServer()->getServer(); if ($server instanceof Server && $crontab->getExecuteTime() instanceof Carbon) { $workerId = $this->getNextWorkerId($server); $server->sendMessage(new PipeMessage( 'callback', [Executor::class, 'execute'], $crontab ), $workerId); }
除Coroutine策略外的dispatch方法是相同的,都是進程間輪訓的向WorkID經過 sendMessage 發送 PipeMessage 對象,同時 sendMessage 方法會觸發 OnPipeMessage 事件。該事件被 OnPipeMessageListener 監聽,會根據 PipeMessage 執行對應的callback函數,即 Executor->execute' 。在該方法中會根據corntab對象的屬性定義 SwooleTimer::after ,根據corontab的executeTime屬性定義多少秒後執行callback。
$callback && Timer::after($diff > 0 ? $diff * 1000 : 1, $callback);
這樣基本就完成了咱們一個秒級定時任務的執行。
如今咱們回到上面那個關於crontab對象是何時生成的問題。咱們說過Cornrab本質上就是一個自定義進程,那根據Hyperf/Process的使用說明,全部的自定義進程都繼承了 HyperfProcessAbstractProcess ,這個類在啓動SwooleProcess時會觸發 BeforeProcessHandle 事件,在這個事件中會掃描全部的crontab配置和註解,將這些註解進行解析生成crontab對象,存儲在crontabs屬性中。
public function register(Crontab $crontab): bool { if (! $this->isValidCrontab($crontab)) { return false; } $this->crontabs[$crontab->getName()] = $crontab; return true; }
以上大體就是一個crontab定時任務的執行流程,固然裏面還有不少執行細節和Contab個性化的定義參數因爲篇幅有限咱們還沒來得及介紹,感興趣的同窗能夠私下進行閱讀,下面我也附上來Hyperf/Crontab的總體執行流程類之間的關係圖,方便你們對照着閱讀源碼。
Hyperf 是基於 Swoole 4.4+ 實現的高性能、高靈活性的 PHP 協程框架,內置協程服務器及大量經常使用的組件,性能較傳統基於 PHP-FPM 的框架有質的提高,提供超高性能的同時,也保持着極其靈活的可擴展性,標準組件均基於 PSR 標準 實現,基於強大的依賴注入設計,保證了絕大部分組件或類都是 可替換 與 可複用 的。框架組件庫除了常見的協程版的 MySQL 客戶端、Redis 客戶端,還爲您準備了協程版的 Eloquent ORM、WebSocket 服務端及客戶端、JSON RPC 服務端及客戶端、GRPC 服務端及客戶端、Zipkin/Jaeger (OpenTracing) 客戶端、Guzzle HTTP 客戶端、Elasticsearch 客戶端、Consul 客戶端、ETCD 客戶端、AMQP 組件、Apollo 配置中心、阿里雲 ACM 應用配置管理、ETCD 配置中心、基於令牌桶算法的限流器、通用鏈接池、熔斷器、Swagger 文檔生成、Swoole Tracker、Blade 和 Smarty 視圖引擎、Snowflake 全局ID生成器 等組件,省去了本身實現對應協程版本的麻煩。
Hyperf 還提供了 基於 PSR-11 的依賴注入容器、註解、AOP 面向切面編程、基於 PSR-15 的中間件、自定義進程、基於 PSR-14 的事件管理器、Redis/RabbitMQ 消息隊列、自動模型緩存、基於 PSR-16 的緩存、Crontab 秒級定時任務、Translation 國際化、Validation 驗證器 等很是便捷的功能,知足豐富的技術場景和業務場景,開箱即用。