前些時間咱們發佈了 Mix PHP V2 實例:協程池異步郵件發送守護程序 範例,這一次咱們提供一個使用大廠 SDK 經過 Swoole Hook 協程化來並行執行短信發送任務,本文是一個代碼簡單、IO 性能極強的範例。php
請先升級到 mix-framework >= v2.0.5。
本範例依然使用消息隊列的方式接收短信發送任務,消息中間件使用:html
一般框架中使用 Redis 會安裝一個類庫來使用,本例使用原生代碼,便於理解。
// 鏈接 $redis = new \Redis(); if (!$redis->connect('127.0.0.1', 6379)) { throw new \Exception('Redis connect failed.'); } $redis->auth(''); $redis->select(0); // 投遞任務 for($i = 0; $i < 3; $i++){ $data = [ 'phone' => '***', 'templateCode' => 'SMS_***', 'templateParam' => ['code' => 123456], ]; $redis->lpush('queue:sms', serialize($data)); }
使用的是 ali 雲的短信服務,查看官方 PHP SDK 文檔 ,使用的庫爲:git
composer require alibabacloud/client
經過查看該庫的 composer 依賴文件,咱們得知該庫基於 guzzlehttp 開發,由於 Mix PHP 提供了無需修改代碼就可 Hook Guzzle 庫可在協程中使用的工具 Mix PHP V2 生態:讓 Guzzle 支持 Swoole 的 Hook 協程,因此能基本肯定該庫可在 Swoole 協程中使用。github
首先咱們安裝 https://github.com/mix-php/guzzle-hook 讓 alibabacloud/client
可在協程中使用:redis
composer require mix/guzzle-hook
而後在項目的 composer.json
文件中增長 extra 配置項,以下:json
"extra": { "include_files": [ "vendor/mix/guzzle-hook/src/functions_include.php" ] }
更新自動加載:api
composer dump-autoload
下面咱們採用 Mix PHP V2 的守護程序、協程池來完成一個超高性能的短信發送程序。安全
首先咱們在配置 applications/console/config/main.php
中註冊一個命令:app
// 命令 'commands' => [ 'smser' => [ 'Smser', 'description' => "SMS send daemon demo.", 'options' => [ [['d', 'daemon'], 'description' => 'Run in the background'], ], ], ],
註冊的命令中指定的 Smser 命令類,接下來咱們編寫一個 SmserCommand 類:composer
applications/console/src/Commands/SmserCommand.php
<?php namespace Console\Commands; use Console\Libraries\SmserWorker; use Mix\Concurrent\CoroutinePool\Dispatcher; use Mix\Console\CommandLine\Flag; use Mix\Core\Coroutine; use Mix\Core\Coroutine\Channel; use Mix\Core\Event; use Mix\Helper\ProcessHelper; use AlibabaCloud\Client\AlibabaCloud; /** * Class SmserCommand * @package Daemon\Commands * @author liu,jian <coder.keda@gmail.com> */ class SmserCommand { const ACCESS_KEY = '***'; const ACCESS_SECRET = '***'; /** * 退出 * @var bool */ public $quit = false; /** * 主函數 */ public function main() { // 守護處理 $daemon = Flag::bool(['d', 'daemon'], false); if ($daemon) { ProcessHelper::daemon(); } // 捕獲信號 ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) { $this->quit = true; ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null); }); // 設置ali雲全局參數 AlibabaCloud::accessKeyClient(static::ACCESS_KEY, static::ACCESS_SECRET)->regionId('cn-hangzhou')->asDefaultClient(); // 手動關閉Swoole文件Hook,由於ali雲依賴的uuid庫有文件hook協程兼容問題,Swoole 4.4已經適配該問題 Coroutine::enableHook(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_FILE); // 協程池執行任務 xgo(function () { $maxWorkers = 20; $maxQueue = 20; $jobQueue = new Channel($maxQueue); $dispatch = new Dispatcher([ 'jobQueue' => $jobQueue, 'maxWorkers' => $maxWorkers, ]); $dispatch->start(SmserWorker::class); // 投聽任務 $redis = app()->redisPool->getConnection(); while (true) { if ($this->quit) { $dispatch->stop(); return; } try { $data = $redis->brPop(['queue:sms'], 3); } catch (\Throwable $e) { $dispatch->stop(); return; } if (!$data) { continue; } $data = array_pop($data); // brPop命令最後一個鍵纔是值 $jobQueue->push($data); } }); // 等待事件 Event::wait(); } }
從$data = $redis->brPop(['queue:sms'], 3);
外部的異常捕獲可得知,當 Redis 鏈接出錯時,好比 Redis 重啓、鏈接異常時協程池會安全退出,也就是說當進程異常退出後用戶需使用supervisor
、pm2
等工具重啓守護進程。
上面是一個 Mix PHP 協程池的使用代碼,基本能夠直接複製使用,框架默認包含了協程池的 Demo,本次實例只是修改了協程池的 Worker,本命令主要是完成從 Redis 隊列中獲取消息而後 push 到 jobQueue 中,jobQueue 中的數據會被 20 個 Worker 實例中某一個搶佔後並行執行,本例的發送代碼邏輯就在 SmserWorker 類中:
applications/console/src/Libraries/SmserWorker.php
<?php namespace Console\Libraries; use Mix\Concurrent\CoroutinePool\AbstractWorker; use Mix\Concurrent\CoroutinePool\WorkerInterface; /** * Class SmserWorker * @package Daemon\Libraries * @author liu,jian <coder.keda@gmail.com> */ class SmserWorker extends AbstractWorker implements WorkerInterface { /** * 郵件發送器 * @var Smser */ public $smser; /** * 初始化事件 */ public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 實例化一些需重用的對象 $this->smser = new Smser(); } /** * 處理 * @param $data */ public function handle($data) { // TODO: Implement handle() method. $data = unserialize($data); if (empty($data)) { return; } try { $result = $this->smser->send($data['phone'], $data['templateCode'], $data['templateParam']); app()->log->info("SMS sent successfully:phone {phone} templateCode {templateCode} result {result}", array_merge($data, ['result' => json_encode($result, JSON_UNESCAPED_UNICODE)])); } catch (\Throwable $e) { app()->log->error("SMS failed to send:phone {phone} templateCode {templateCode} error {error}", array_merge($data, ['error' => $e->getMessage()])); } } }
由以上代碼可見,Worker 在初始化時,新增了一個 Smser 類的屬性,當 jobQueue 消息投遞過來時消息會傳遞到 handle 方法,在該方法中使用 Mailer 類的實例完成郵件發送任務,因此咱們要編寫了一個 Smser 發送程序:
applications/console/src/Libraries/Smser.php
<?php namespace Console\Libraries; use AlibabaCloud\Client\AlibabaCloud; use AlibabaCloud\Client\Exception\ClientException; use AlibabaCloud\Client\Exception\ServerException; use Mix\Core\Coroutine; /** * Class Smser * @package Console\Libraries * @author liu,jian <coder.keda@gmail.com> */ class Smser { /** * 配置信息 */ const SIGN_NAME = '***'; /** * Smser constructor. */ public function __construct() { // 開啓協程鉤子 Coroutine::enableHook(); } /** * 發送 * @param $phone * @param $templateCode * @param $templateParam * @return array * @throws ClientException * @throws ServerException */ public function send($phone, $templateCode, $templateParam) { $result = AlibabaCloud::rpc() ->product('Dysmsapi') // ->scheme('https') // https | http ->version('2017-05-25') ->action('SendSms') ->method('POST') ->options([ 'query' => [ 'PhoneNumbers' => $phone, 'SignName' => static::SIGN_NAME, 'TemplateCode' => $templateCode, 'TemplateParam' => json_encode($templateParam), ], ]) ->request(); return $result->toArray(); } }
以上就完成了所有的代碼邏輯,如今咱們開始測試,先啓動消費者守護程序:
[root@localhost bin]# ./mix-console smser
將上文的生產者腳本命名爲 push.php
而後在 CLI 中執行 (開一個新終端):
[root@localhost bin]# php /tmp/push.php
消費者守護程序結果:
[root@localhost bin]# ./mix-console smser [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"OK","RequestId":"4071D031-6D9E-4F70-9269-6C1979080858","BizId":"939807358670612546^0","Code":"OK"} [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"觸發分鐘級流控Permits:1","RequestId":"490B73D7-317E-4362-B2DD-5E2153A7B891","Code":"isv.BUSINESS_LIMIT_CONTROL"} [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"觸發分鐘級流控Permits:1","RequestId":"1FD22EDB-BAA4-4416-8FF9-242EDCF34359","Code":"isv.BUSINESS_LIMIT_CONTROL"}
命令行終端打印了發送成功的日誌,發送完成。