使用 mixphp 打造多進程異步郵件發送

注意:這個是 MixPHP V1 的範例

郵件發送是很常見的需求,因爲發送郵件的操做通常是比較耗時的,因此咱們通常採用異步處理來提高用戶體驗,而異步一般咱們使用消息隊列來實現。php

傳統 MVC 框架因爲缺乏多進程開發能力,一般是採用同一個腳本執行屢次,產生多個進程的方式,mixphp 封裝了 TaskExecutor 專用於多進程開發,用戶能很是簡單的開發出功能完善的高可用多進程應用。html

下面演示一個異步郵件發送系統的開發過程,涉及知識點:git

  • 異步
  • 消息隊列
  • 多進程
  • 守護進程

如何使用消息隊列實現異步

PHP 使用消息隊列一般是使用中間件來實現,經常使用的消息中間件有:github

  • redis
  • rabbitmq
  • kafka

本次咱們選用 redis 來實現異步郵件發送,redis 的數據類型中有一個 list 類型,可實現消息隊列,使用如下命令:redis

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);

架構設計

本實例由傳統 MVC 框架投遞郵件發送需求,MixPHP 多進程執行發送任務。shell

郵件發送庫選型

以往咱們一般使用框架提供的郵件發送庫,或者網上下載別的用戶分享的庫,composer 出現後,https://packagist.org/ 上有大量優質的庫,咱們只需選擇一個最好的便可,本例選擇 swiftmailer。數據庫

因爲發送任務是由 MixPHP 執行,因此 swiftmailer 是安裝在 MixPHP 項目中,在項目根目錄中執行如下命令安裝:swift

composer require swiftmailer/swiftmailer

生產者開發

在郵件發送這個需求中生產者是指投遞發送任務的一方,這一方一般是一個接口或網頁,這個部分並不必定需 mixphp 開發,TP、CI、YII 這些均可以,只需在接口或網頁中把任務信息投遞到消息隊列中便可。微信

在傳統 MVC 框架的控制器中增長以下代碼:swoole

一般框架中使用 redis 會安裝一個類庫來使用,本例使用原生代碼,便於理解。
// 鏈接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投遞任務
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));

一般異步開發中,投遞完成後就會當即響應一個消息給用戶,固然此時該任務並無執行。

消費者開發

本例咱們使用 MixPHP 的多進程開發工具 TaskExecutor 來完成這個需求,一般使用常駐進程來處理隊列的消費,因此咱們使用 TaskExecutor 的 TYPE_DAEMON 類型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二種進程:

  • 左進程:負責從消息隊列取出任務數據,投放給中進程。
  • 中進程:負責執行郵件發送任務。

PushCommand.php 代碼以下:

<?php

namespace apps\daemon\commands;

use mix\console\ExitCode;
use mix\facades\Input;
use mix\facades\Redis;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;

/**
 * 推送模式範例
 * @author 劉健 <coder.liu@qq.com>
 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = 'smtpdm.aliyun.com';
    const PORT = 465;
    const SECURITY = 'ssl';
    const USERNAME = '****@email.***.com';
    const PASSWORD = '****';

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 獲取程序名稱
        $this->programName = Input::getCommandName();
        // 設置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 獲取服務
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 類路徑
                'class'         => 'mix\task\TaskExecutor',
                // 服務名稱
                'name'          => "mix-daemon: {$this->programName}",
                // 執行類型
                'type'          => \mix\task\TaskExecutor::TYPE_DAEMON,
                // 執行模式
                'mode'          => \mix\task\TaskExecutor::MODE_PUSH,
                // 左進程數
                'leftProcess'   => 1,
                // 中進程數
                'centerProcess' => 5,
                // 任務超時時間 (秒)
                'timeout'       => 5,
            ]
        );
    }

    // 啓動
    public function actionStart()
    {
        // 預處理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 啓動服務
        $service = $this->getTaskService();
        $service->on('LeftStart', [$this, 'onLeftStart']);
        $service->on('CenterStart', [$this, 'onCenterStart']);
        $service->start();
        // 返回退出碼
        return ExitCode::OK;
    }

    // 左進程啓動事件回調函數
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型內使用長鏈接版本的數據庫組件,這樣組件會自動幫你維護鏈接不斷線
            $queueModel = Redis::getInstance();
            // 保持任務執行狀態,循環結束後當前進程會退出,主進程會重啓一個新進程繼續執行任務,這樣作是爲了不長時間執行內存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 從消息隊列中間件阻塞獲取一條消息
                $data = $queueModel->brpop('queue:email', 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 將消息推送給中進程去處理,push有長度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\Exception $e) {
            // 休息一會,避免 CPU 出現 100%
            sleep(1);
            // 拋出錯誤
            throw $e;
        }
    }

    // 中進程啓動事件回調函數
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任務執行狀態,循環結束後當前進程會退出,主進程會重啓一個新進程繼續執行任務,這樣作是爲了不長時間執行內存溢出
        for ($j = 0; $j < 16000; $j++) {
            // 從進程消息隊列中搶佔一條消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 處理消息
            try {
                // 處理消息,好比:發送短信、發送郵件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\Exception $e) {
                // 回退數據到消息隊列
                $worker->rollback($data);
                // 休息一會,避免 CPU 出現 100%
                sleep(1);
                // 拋出錯誤
                throw $e;
            }
        }
    }

    // 發送郵件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \Swift_Mailer($transport);
        // Create a message
        $message = (new \Swift_Message($data['subject']))
            ->setFrom([self::USERNAME => '**網'])
            ->setTo($data['to'])
            ->setBody($data['body']);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}

測試

  1. 在 shell 中啓動 push 常駐程序。
[root@localhost bin]# ./mix-daemon push start
mix-daemon 'push' start successed.
  1. 調用接口往消息隊列投聽任務。

此時 shell 終端將打印:

圖1

成功收到測試郵件:

圖2

MixPHP

GitHub: https://github.com/mixstart/m...
官網:http://www.mixphp.cn/

相關文章
相關標籤/搜索