Yet-another swoole 異步隊列,支持彈性擴容,工做進程協程支持

Aint Queue

基於 Swoole 的一個異步隊列庫,可彈性伸縮的工做進程池,工做進程協程支持。php

Github 地址:https://github.com/Littlesqx/...laravel

screenshot.gif

特性

  • 默認 Redis 驅動
  • 秒級延時任務
  • 自定義重試次數和時間
  • 自定義錯誤回調
  • 支持任務執行中間件
  • 自定義隊列快照事件
  • 彈性多進程消費
  • 工做進程協程支持
  • 內置漂亮的儀表盤

環境

  • PHP 7.2+
  • Swoole 4.4+
  • Redis 3.2+ (redis 驅動)

安裝

$ composer require littlesqx/aint-queue -vvv

使用

配置

默認讀取配置路徑: config/aint-queue.php, 不存在時讀取 /vendor/littlesqx/aint-queue/src/Config/config.phpgit

<?php

use Littlesqx\AintQueue\Driver\Redis\Queue as RedisQueue;
use Littlesqx\AintQueue\Logger\DefaultLogger;

return [
    // channel_name => [...config]
    'default' => [
        'driver' => [
            'class' => RedisQueue::class,
            'connection' => [
                'host' => '127.0.0.1',
                'port' => 6379,
                'database' => '0',
                // 'password' => 'password',
            ],
            'pool_size' => 8,
            'pool_wait_timeout' => 1,
            'handle_timeout' => 60 * 30,
        ],
        'logger' => [
            'class' => DefaultLogger::class,
                'options' => [
                    'level' => \Monolog\Logger::DEBUG,
                ],
            ],
        'pid_path' => '/var/run/aint-queue',
        'consumer' => [
            'sleep_seconds' => 1,
            'memory_limit' => 96,
            'dynamic_mode' => true,
            'capacity' => 6,
            'flex_interval' => 5 * 60,
            'min_worker_number' => 5,
            'max_worker_number' => 30,
            'max_handle_number' => 0,
        ],
        'job_snapshot' => [
            'interval' => 5 * 60,
            'handler' => [],
        ],
    ],
];

全部參數:github

name type comment default
channel string 頻道。隊列的單位,每一個頻道內的消息對應着各自的消費者和生產者。支持多頻道。在命令行使用 --channel 參數。 default
driver.class string 隊列驅動類,須要實現 QueueInterface。 Redis
driver.connection map 驅動配置。
pid_path string 主進程的 PID 文件存儲路徑。注意運行用戶須要可讀寫權限。 /var/run/aint-queue
consumer.sleep_seconds int 當任務空閒時,每次 pop 操做後的睡眠秒數。 1
consumer.memory_limit int 工做進程的最大使用內存,超出則重啓。單位 MB。 96
consumer.dynamic_mode bool 是否開啓自動伸縮工做進程。 true
consumer.capacity int 表明每一個工做進程在短期內而且健康狀態下的最多處理消息數,它影響了工做進程的自動伸縮策略。 5
consumer.flex_interval int flex_interval 秒,監控進程嘗試調整工做進程數(假設開啓了自動伸縮工做進程)。 5
consumer.min_worker_number int 工做進程最小數目。 5
consumer.max_worker_number int 工做進程最大數目。 30
consumer.max_handle_number int 當前工做進程最大處理消息數,超出後重啓。0 表明無限制。 0
job_snapshot map 每隔 job_snapshot.interval 秒,job_snapshot.handles 會被依次執行。job_snapshot.handles 須要實現 JobSnapshotterInterface。

消息推送

能夠在 cli/fpm 運行模式下使用:redis

<?php

use Littlesqx\AintQueue\Driver\DriverFactory;

$queue = DriverFactory::make($channel, $options);

// push a job
$queue->push(function () {
    echo "Hello aint-queue\n";
});

// push a delay job
$closureJob = function () {
    echo "Hello aint-queue delayed\n";
};
$queue->push($closureJob, 5);

更建議使用類任務,這樣功能上會更加完整,也能夠得到更好的編碼體驗和性能。shell

  • 建立的任務類須要實現 JobInterface,詳細可參考 /exampleapi

    • handle(): void: 任務主體,你要執行的內容,在這裏能夠使用 swoole 的相關 api(好比建立協程等);
    • canRetry(int $attempt, $error): bool: 決定是否要重試;
    • retryAfter(int $attempt): int: 決定重試延時時間;
    • failed(int $id, array $payload): void: 任務完全失敗了(就是達到了重試次數還沒成功);
    • middleware(): array: 當前任務的中間件列表,原理參考 laravel pipeline 流水線;
  • 注意任務必須能在生產者和消費者中(反)序列化,意味着須要在同一個項目
  • 利用隊列快照事件你能夠實現隊列實時監控,而利用任務中間件,你能夠實現任務執行速率限制,任務執行日誌等。

隊列管理

推薦使用 Supervisor 等進程管理工具守護工做進程。bash

vendor/bin/aint-queue
AintQueue Console Tool

Usage:
  command [options] [arguments]

Options:
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands:
  help                 Displays help for a command
  list                 Lists commands
 queue
  queue:clear          Clear the queue.
  queue:dashboard      Start http server for dashboard.
  queue:reload-failed  Reload all the failed jobs onto the waiting queue.
  queue:status         Get the execute status of specific queue.
 worker
  worker:listen        Listen the queue.
  worker:reload        Reload worker for the queue.
  worker:run           Run the specific job.
  worker:stop          Stop listening the queue.

測試

composer test
相關文章
相關標籤/搜索