Hyperf/Crontab 組件源碼解析

前置閱讀: Hyperf/Crontab使用文檔
前置閱讀: Hyperf/Process自定義進程使用文檔
前置閱讀: Hyperf事件機制

寫在開頭

以前作項目用到了Hyperf/Crontab組件來進行秒級的數據清洗,最近又在作定時任務的拆分,因而就打算過一遍組件源碼加深理解,順便構思一下如何在此基礎上搭建Hyperf/Crontab的任務調度功能。Crontab本質上是一個隨Server啓動的自定義進程,因此接下來咱們將從啓動和執行兩個階段來進行介紹。php

定時器的啓動

因爲Crontab是一個隨着Server啓動的進程,分析他的生命週期確定會設計到框架的啓動。但本文不是主要介紹Hyperf框架啓動源碼的,因此咱們會簡單的過一下涉及到自定義進程啓動的框架啓動代碼。
當咱們使用命令 php bin/hyperf.php start 啓動hyperf時算法

$application = $container->get(\Hyperf\Contract\ApplicationInterface::class);

在入口文件 hyperf.php 框架會實例化一個 HyperfFrameworkApplicationFactory 實例,同時會掃描一遍全部帶 @Command 註解或者定義了 commands 配置的地方,實例化一個SymfonyComponentConsoleApplication對象 (這是一個Symfony的Consoul命令類用於定義命令觸發執行的任務) 。將全部被定義爲command的對象類註冊到$application對象中,最後在入口文件執行編程

$application->run();

執行全部的Command命令,其中包括 HyperfServerCommandStartServer 服務Server啓動類。在這個類裏面定義了若是接收到的命令參數包含 start 那麼就會執行他的邏輯,即根據 config/autoload/server.php 的配置實例化HyperfServerServer類,在這個過程當中會觸發BeforeMainServerStart事件,到這裏咱們即將進入自定義進程啓動的核心階段。緩存

$serverProcesses = $serverConfig['processes'] ?? [];
        $processes = $this->config->get('processes', []);
        $annotationProcesses = $this->getAnnotationProcesses();
        // Retrieve the processes have been registered.
        $processes = array_merge($serverProcesses, $processes, ProcessManager::all(), array_keys($annotationProcesses));
        foreach ($processes as $process) {
            ...
            if ($instance instanceof ProcessInterface) {
                $instance->isEnable() && $instance->bind($server);
            }
        }

BootProcessListener監聽到BeforeMainServerStart事件的觸發會拿到全部 @Process 和 定義在processes配置文件的進程類,執行他們的 isEnablebind 方法。服務器

定時器的執行

在上面咱們對自定義進程是如何隨框架啓動的進行了簡單的介紹,接下來咱們對本文的主角Crontab自定義進程進行解析。
Hyperf文檔介紹在使用Crontab以前須要在 config/autoload/processes.php 內註冊一下 HyperfCrontabProcessCrontabDispatcherProcess 自定義進程。那麼咱們就直接來看一下這個process類裏面作了什麼事情。
若是你對 Hyperf/Process 組件使用熟悉的話會知道,一個process類主要執行的邏輯都在 handle() 方法
內。app

public function handle(): void
    {
        $this->event->dispatch(new CrontabDispatcherStarted());
        while (true) {
            $this->sleep();
            $crontabs = $this->scheduler->schedule();
            while (! $crontabs->isEmpty()) {
                $crontab = $crontabs->dequeue();
                $this->strategy->dispatch($crontab);
            }
        }
    }

在handle內首先觸發來一個 CrontabDispatcherStarted 事件,目前這個事件無人監聽。接下來就是一段時間的協程阻塞,阻塞時間第一次爲距離下一次整分鐘的秒數,其他的都是60s。關於爲何要使用 SwooleCoroutine::sleep 而不是直接 sleep() ,是由於自定義進程默認是一個協程的Server。
接下來框架

$crontabs = $this->scheduler->schedule();

返回一個當前這一分鐘該執行的SplQueue隊列,隊列中的是Crontab對象函數

object(Hyperf\Crontab\Crontab)#46164 (10) {
  ["name":protected]=>
  string(4) "Foo4"
  ["type":protected]=>
  string(8) "callback"
  ["rule":protected]=>
  string(11) "* * * * * *"
  ["singleton":protected]=>
  bool(false)
  ["mutexPool":protected]=>
  string(7) "default"
  ["mutexExpires":protected]=>
  int(3600)
  ["onOneServer":protected]=>
  bool(true)
  ["callback":protected]=>
  array(2) {
[DEBUG] Event Hyperf\Framework\Event\OnPipeMessage handled by Hyperf\Crontab\Listener\OnPipeMessageListener listener.
    [0]=>
    string(16) "App\Task\FooTask"
    [1]=>
    string(7) "execute"
  }
  ["memo":protected]=>
  NULL
  ["executeTime":protected]=>
  object(Carbon\Carbon)#46106 (3) {
    ["date"]=>
    string(26) "2020-06-02 14:15:57.000000"
    ["timezone_type"]=>
    int(3)
    ["timezone"]=>
    string(13) "Asia/Shanghai"
  }
}

這個對象中記錄了咱們目前比較關注的兩個關鍵信息:性能

  1. 執行callback
  2. 執行時間

關於這個crontab對象是如何生成的,這個咱們在後面會介紹到。
在咱們拿到crontab對象後,咱們會把對象發送給在 config/autoload/dependencies.php 定義好的 StrategyInterface 的實現類來進行dispatch,默認指定的是Worker 進程執行策略。this

$server = $this->serverFactory->getServer()->getServer();
        if ($server instanceof Server && $crontab->getExecuteTime() instanceof Carbon) {
            $workerId = $this->getNextWorkerId($server);
            $server->sendMessage(new PipeMessage(
                'callback',
                [Executor::class, 'execute'],
                $crontab
            ), $workerId);
        }

除Coroutine策略外的dispatch方法是相同的,都是進程間輪訓的向WorkID經過 sendMessage 發送 PipeMessage 對象,同時 sendMessage 方法會觸發 OnPipeMessage 事件。該事件被 OnPipeMessageListener 監聽,會根據 PipeMessage 執行對應的callback函數,即 Executor->execute' 。在該方法中會根據corntab對象的屬性定義 SwooleTimer::after ,根據corontab的executeTime屬性定義多少秒後執行callback。

$callback && Timer::after($diff > 0 ? $diff * 1000 : 1, $callback);

這樣基本就完成了咱們一個秒級定時任務的執行。

如今咱們回到上面那個關於crontab對象是何時生成的問題。咱們說過Cornrab本質上就是一個自定義進程,那根據Hyperf/Process的使用說明,全部的自定義進程都繼承了 HyperfProcessAbstractProcess ,這個類在啓動SwooleProcess時會觸發 BeforeProcessHandle 事件,在這個事件中會掃描全部的crontab配置和註解,將這些註解進行解析生成crontab對象,存儲在crontabs屬性中。

public function register(Crontab $crontab): bool
    {
        if (! $this->isValidCrontab($crontab)) {
            return false;
        }
        $this->crontabs[$crontab->getName()] = $crontab;
        return true;
    }

以上大體就是一個crontab定時任務的執行流程,固然裏面還有不少執行細節和Contab個性化的定義參數因爲篇幅有限咱們還沒來得及介紹,感興趣的同窗能夠私下進行閱讀,下面我也附上來Hyperf/Crontab的總體執行流程類之間的關係圖,方便你們對照着閱讀源碼。

Hyperf_crontab.jpg

寫在最後

Hyperf 是基於 Swoole 4.4+ 實現的高性能、高靈活性的 PHP 協程框架,內置協程服務器及大量經常使用的組件,性能較傳統基於 PHP-FPM 的框架有質的提高,提供超高性能的同時,也保持着極其靈活的可擴展性,標準組件均基於 PSR 標準 實現,基於強大的依賴注入設計,保證了絕大部分組件或類都是 可替換可複用 的。

框架組件庫除了常見的協程版的 MySQL 客戶端、Redis 客戶端,還爲您準備了協程版的 Eloquent ORM、WebSocket 服務端及客戶端、JSON RPC 服務端及客戶端、GRPC 服務端及客戶端、Zipkin/Jaeger (OpenTracing) 客戶端、Guzzle HTTP 客戶端、Elasticsearch 客戶端、Consul 客戶端、ETCD 客戶端、AMQP 組件、Apollo 配置中心、阿里雲 ACM 應用配置管理、ETCD 配置中心、基於令牌桶算法的限流器、通用鏈接池、熔斷器、Swagger 文檔生成、Swoole Tracker、Blade 和 Smarty 視圖引擎、Snowflake 全局ID生成器 等組件,省去了本身實現對應協程版本的麻煩。

Hyperf 還提供了 基於 PSR-11 的依賴注入容器、註解、AOP 面向切面編程、基於 PSR-15 的中間件、自定義進程、基於 PSR-14 的事件管理器、Redis/RabbitMQ 消息隊列、自動模型緩存、基於 PSR-16 的緩存、Crontab 秒級定時任務、Translation 國際化、Validation 驗證器 等很是便捷的功能,知足豐富的技術場景和業務場景,開箱即用。

相關文章
相關標籤/搜索