注意:這個是 MixPHP V1 的範例
郵件發送是很常見的需求,因爲發送郵件的操做通常是比較耗時的,因此咱們通常採用異步處理來提高用戶體驗,而異步一般咱們使用消息隊列來實現。php
傳統 MVC 框架因爲缺乏多進程開發能力,一般是採用同一個腳本執行屢次,產生多個進程的方式,mixphp 封裝了 TaskExecutor 專用於多進程開發,用戶能很是簡單的開發出功能完善的高可用多進程應用。html
下面演示一個異步郵件發送系統的開發過程,涉及知識點:git
PHP 使用消息隊列一般是使用中間件來實現,經常使用的消息中間件有:github
本次咱們選用 redis 來實現異步郵件發送,redis 的數據類型中有一個 list 類型,可實現消息隊列,使用如下命令:redis
// 入列 $redis->lpush($key, $data); // 出列 $data = $redis->rpop($key); // 阻塞出列 $data = $redis->brpop($key, 10);
本實例由傳統 MVC 框架投遞郵件發送需求,MixPHP 多進程執行發送任務。shell
以往咱們一般使用框架提供的郵件發送庫,或者網上下載別的用戶分享的庫,composer 出現後,https://packagist.org/ 上有大量優質的庫,咱們只需選擇一個最好的便可,本例選擇 swiftmailer。數據庫
因爲發送任務是由 MixPHP 執行,因此 swiftmailer 是安裝在 MixPHP 項目中,在項目根目錄中執行如下命令安裝:swift
composer require swiftmailer/swiftmailer
在郵件發送這個需求中生產者是指投遞發送任務的一方,這一方一般是一個接口或網頁,這個部分並不必定需 mixphp 開發,TP、CI、YII 這些均可以,只需在接口或網頁中把任務信息投遞到消息隊列中便可。微信
在傳統 MVC 框架的控制器中增長以下代碼:swoole
一般框架中使用 redis 會安裝一個類庫來使用,本例使用原生代碼,便於理解。
// 鏈接 $redis = new \Redis(); if (!$redis->connect('127.0.0.1', 6379)) { throw new \Exception('Redis Connect Failure'); } $redis->auth(''); $redis->select(0); // 投遞任務 $data = [ 'to' => ['***@qq.com' => 'A name'], 'body' => 'Here is the message itself', 'subject' => 'The title content', ]; $redis->lpush('queue:email', serialize($data));
一般異步開發中,投遞完成後就會當即響應一個消息給用戶,固然此時該任務並無執行。
本例咱們使用 MixPHP 的多進程開發工具 TaskExecutor 來完成這個需求,一般使用常駐進程來處理隊列的消費,因此咱們使用 TaskExecutor 的 TYPE_DAEMON 類型,MODE_PUSH 模式。
TaskExecutor 的 MODE_PUSH 模式有二種進程:
PushCommand.php 代碼以下:
<?php namespace apps\daemon\commands; use mix\console\ExitCode; use mix\facades\Input; use mix\facades\Redis; use mix\task\CenterProcess; use mix\task\LeftProcess; use mix\task\TaskExecutor; /** * 推送模式範例 * @author 劉健 <coder.liu@qq.com> */ class PushCommand extends BaseCommand { // 配置信息 const HOST = 'smtpdm.aliyun.com'; const PORT = 465; const SECURITY = 'ssl'; const USERNAME = '****@email.***.com'; const PASSWORD = '****'; // 初始化事件 public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 獲取程序名稱 $this->programName = Input::getCommandName(); // 設置pidfile $this->pidFile = "/var/run/{$this->programName}.pid"; } /** * 獲取服務 * @return TaskExecutor */ public function getTaskService() { return create_object( [ // 類路徑 'class' => 'mix\task\TaskExecutor', // 服務名稱 'name' => "mix-daemon: {$this->programName}", // 執行類型 'type' => \mix\task\TaskExecutor::TYPE_DAEMON, // 執行模式 'mode' => \mix\task\TaskExecutor::MODE_PUSH, // 左進程數 'leftProcess' => 1, // 中進程數 'centerProcess' => 5, // 任務超時時間 (秒) 'timeout' => 5, ] ); } // 啓動 public function actionStart() { // 預處理 if (!parent::actionStart()) { return ExitCode::UNSPECIFIED_ERROR; } // 啓動服務 $service = $this->getTaskService(); $service->on('LeftStart', [$this, 'onLeftStart']); $service->on('CenterStart', [$this, 'onCenterStart']); $service->start(); // 返回退出碼 return ExitCode::OK; } // 左進程啓動事件回調函數 public function onLeftStart(LeftProcess $worker) { try { // 模型內使用長鏈接版本的數據庫組件,這樣組件會自動幫你維護鏈接不斷線 $queueModel = Redis::getInstance(); // 保持任務執行狀態,循環結束後當前進程會退出,主進程會重啓一個新進程繼續執行任務,這樣作是爲了不長時間執行內存溢出 for ($j = 0; $j < 16000; $j++) { // 從消息隊列中間件阻塞獲取一條消息 $data = $queueModel->brpop('queue:email', 10); if (empty($data)) { continue; } list(, $data) = $data; // 將消息推送給中進程去處理,push有長度限制 (https://wiki.swoole.com/wiki/page/290.html) $worker->push($data, false); } } catch (\Exception $e) { // 休息一會,避免 CPU 出現 100% sleep(1); // 拋出錯誤 throw $e; } } // 中進程啓動事件回調函數 public function onCenterStart(CenterProcess $worker) { // 保持任務執行狀態,循環結束後當前進程會退出,主進程會重啓一個新進程繼續執行任務,這樣作是爲了不長時間執行內存溢出 for ($j = 0; $j < 16000; $j++) { // 從進程消息隊列中搶佔一條消息 $data = $worker->pop(); if (empty($data)) { continue; } // 處理消息 try { // 處理消息,好比:發送短信、發送郵件、微信推送 var_dump($data); $ret = self::sendEmail($data); var_dump($ret); } catch (\Exception $e) { // 回退數據到消息隊列 $worker->rollback($data); // 休息一會,避免 CPU 出現 100% sleep(1); // 拋出錯誤 throw $e; } } } // 發送郵件 public static function sendEmail($data) { // Create the Transport $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) ->setUsername(self::USERNAME) ->setPassword(self::PASSWORD); // Create the Mailer using your created Transport $mailer = new \Swift_Mailer($transport); // Create a message $message = (new \Swift_Message($data['subject'])) ->setFrom([self::USERNAME => '**網']) ->setTo($data['to']) ->setBody($data['body']); // Send the message $result = $mailer->send($message); return $result; } }
[root@localhost bin]# ./mix-daemon push start mix-daemon 'push' start successed.
此時 shell 終端將打印:
成功收到測試郵件:
GitHub: https://github.com/mixstart/m...
官網:http://www.mixphp.cn/