前言:前面在本地的windows經過apache的ab工具測試了600併發下「查詢指定手機是否存在再提交數據」的註冊功能會出現重複提交的狀況,而且在註冊完成時還須要對邀請人進行獎勵,記錄邀請記錄,對該新用戶自動發佈動態信息,發短信或發郵件等其餘業務功能。因此這裏當併發時,註冊功能就變得低效且容易出現問題。php
方法:先對重複提交的問題經過redis解決,再把註冊儲存用戶基本信息之後的操做放到隊列中進行異步執行,能夠很好的優化註冊功能,提升QPS。redis
1、環境要求數據庫
2、下載框架和消息隊列中間件apache
1. 下載tp5.1。json
composer create-project topthink/think=5.1.* tp5 --prefer-dist
2. 安裝think-queue。windows
composer require topthink/think-queue
3. php安裝redis擴展和打開redis服務端和客戶端。api
3、解決註冊重複提交數組
1. 配置文件中cache設置爲redis驅動,並新建控制器由於cache相關命名空間。緩存
use think\Exception; use think\facade\Cache; use think\facade\Env; use think\Queue;
2. 使用無序集合存手機號,經過判斷當前手機號是不是在指定鍵裏爲成員(若是註冊存入數據庫失敗,經過sRem刪除該成員),而後再經過查詢數據庫判斷是否存在。閉包
private $cache; private $handler; // 實例化redis public function __construct() { $this->cache = Cache::init(); $this->handler = $this->cache->handler(); } // 判斷手機號是否在集合中 $is_existe = $this->handler->sIsMember("register:mobile",$mobile); if(!$is_existe) { $this->handler->sAdd("register:mobile",$mobile); }else { //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---手機號已存在'); var_dump('手機號已存在'); // 用戶已存在 die; } // 查詢手機號碼是否已註冊 $user = db('user')->field('mobile')->where('mobile', $mobile)->find(); if ($user) { //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---手機號註冊了'); var_dump('手機號已註冊'); // 用戶已存在 die; }
4、消息隊列分解註冊功能
1. 配置消息隊列,後面以redis驅動爲例。
<?php return [ 'connector' => 'Redis', // Redis 驅動 'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null 'default' => 'default', // 默認的隊列名稱 'host' => '127.0.0.1', // redis 主機ip 'port' => 6379, // redis 端口 'password' => '', // redis 密碼 'select' => 0, // 使用哪個 db,默認爲 db0 'timeout' => 0, // redis鏈接的超時時間 'persistent' => false, // 是不是長鏈接 // 'connector' => 'Database', // 數據庫驅動 // 'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null // 'default' => 'default', // 默認的隊列名稱 // 'table' => 'jobs', // 存儲消息的表名,不帶前綴 // 'dsn' => [], // 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平臺 ,本文不做介紹 // 'token' => '', // 'project_id' => '', // 'protocol' => 'https', // 'host' => 'qns.topthink.com', // 'port' => 443, // 'api_version' => 1, // 'max_retries' => 3, // 'default' => 'default', // 'connector' => 'Sync', // Sync 驅動,該驅動的實際做用是取消消息隊列,還原爲同步執行 ];
2. 完成添加新用戶後將指定數據加入消息隊列。
<?php namespace app\index\controller; use think\Db; use think\Validate; use think\Exception; use think\facade\Cache; use think\facade\Env; use think\Queue; use think\Log; class Index { private $cache; private $handler; public function __construct() { $this->cache = Cache::init(); $this->handler = $this->cache->handler(); } public function index() { $data = input('post.'); unset($data['balance']); unset($data['credit']); // $blacklist = [ // "18124198164","13401363108","17688552009","15089352898","13602940094","13346643336","13181351655","18301123028","13598020751","13014568187", // "13428733909","17337991130","13275342497" // ]; $rule = [ 'mobile' => 'require|number|length:11', 'password' => 'require|length:6,32', ]; $msg = [ 'mobile.require' => '手機號必須', 'mobile.length' => '手機號爲11位數字', 'mobile.number' => '手機號爲11位數字', 'password.require' => '密碼必須', 'password.length' => '密碼爲6-12位之間', ]; //驗證數據是否合法 $mobile = isset($data['mobile']) ? $data['mobile'] : ''; $validate = new Validate($rule, $msg); $result = $validate->check($data); if (!$result) { var_dump($validate->getError()); die; } // if(in_array($mobile,$blacklist)) { // var_dump('該手機號已註冊了'); // 黑名單 // die; // } // 判斷手機號是否在集合中 $is_existe = $this->handler->sIsMember("register:mobile",$mobile); if(!$is_existe) { $this->handler->sAdd("register:mobile",$mobile); }else { //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---手機號已存在'); var_dump('手機號已存在'); // 用戶已存在 die; } // 查詢手機號碼是否已註冊 $user = db('user')->field('mobile')->where('mobile', $mobile)->find(); if ($user) { //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---手機號註冊了'); var_dump('手機號已註冊'); // 用戶已存在 die; } // 用戶不存在註冊 // $data['id'] = getNewUserid(); $data['no'] = date("Ymdhis").rand(100, 999); $data['avatar'] = 'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_h.png'; $data['password'] = md5($data['password']); $randomNickname = date("Ymdhis").rand(100, 999); $data['nickname'] = 'rm_' . $randomNickname; $data['create_time'] = time(); $data['type'] = 1; /***是否存在邀請人的跑步錢進號***/ if(isset($data['pbqj_no']) && !empty($data['pbqj_no'])) { $inviter = db('user')->field('id')->where(["no"=>$data['pbqj_no']])->find(); if($inviter) { $data['inviter_id'] = $inviter['id']; } } /***是否存在邀請人的跑步錢進號***/ unset($data['pbqj_no']); $userid = db('user')->insertGetId($data); if ($userid) { /******************加入消息隊列異步處理後續操做*******************/ // 1.當前任務將由哪一個類來負責處理。 // 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法 $jobHandlerClassName = 'app\index\job\JobUser'; // 2.當前任務歸屬的隊列名稱,若是爲新隊列,會自動建立 $jobQueueName = "userJobQueue"; // 3.當前任務所需的業務數據 . 不能爲 resource 類型,其餘類型最終將轉化爲json形式的字符串 // ( jobData 爲對象時,須要在先在此處手動序列化,不然只存儲其public屬性的鍵值對) //$jobData = ['ts' => time(), 'bizId' => uniqid() , 'a' => 1]; $jobData = ['userid'=>$userid,'time'=>time(),'mobile'=>$mobile,'inviterid'=>(isset($data['inviter_id']) ? $data['inviter_id'] : 0)]; // 4.將該任務推送到消息隊列,等待對應的消費者去執行 $isPushed = Queue::push($jobHandlerClassName , $jobData , $jobQueueName); // database 驅動時,返回值爲 1|false ; redis 驅動時,返回值爲 隨機字符串|false if($isPushed !== false) { var_dump('加入隊列成功'); die; //Log::write('-----------加入消息隊列成功-----------'); //echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>"; }else{ var_dump('加入消息隊列'); die; //Log::write('-----------加入消息隊列失敗-----------'); //echo 'Oops, something went wrong.'; } /******************加入消息隊列異步處理後續操做*******************/ $res['id'] = $userid; $res['no'] = $data['no']; // // token處理類 // $accessToken = new AccessToken(); // $accessToken = $accessToken->getToken($userid); // if (empty($accessToken)) { // //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---祕鑰生成失敗'); // var_dump('祕鑰生成失敗'); // } else { // $res['user_token'] = $accessToken; // } // if (method_exists(\chat\User::class, 'getToken')) { // $chat_token = \chat\User::getToken($res['id'], $data['nickname'], $data['avatar']); // if (!$chat_token) { // //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---聊天祕鑰生成失敗'); // var_dump('聊天祕鑰生成失敗'); // } else { // $res['chat_token'] = $chat_token; // } // } else { // $res['chat_token'] = ''; // } //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---註冊成功'); var_dump($res); die; } else { //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---數據庫錯誤'); $this->handler->sRem("register:mobile",$mobile); var_dump('數據庫錯誤'); die; } } public function hello($name = 'ThinkPHP5') { return 'hello,' . $name; } }
2. 建立消費者(job),對執行隊列中的任務。
(1). 在同一模塊下新建job文件夾和一個執行類(JobUser), 須要對應生產者中jobHandlerClassName。
(2). 前面執行完隊列加入成功後,能夠本地使用redis客戶端經過lrange queues:userJobQueue 0 -1 查看隊列成員 (queues:userJobQueue中,userJobQueue是本身在加入隊列前本身起的隊列名稱,與queues: 拼接就是redis的list的鍵名,因此能夠直接查看 )。
(3).隊列中的data就是本身傳遞的數據,後面須要在消費者中經過該數據進行註冊功能後的業務操做: 送獎勵,存儲邀請記錄,發動態,發短信,發郵件等等。
<?php namespace app\index\job; use think\queue\Job; use think\Db; use think\Exception; use think\facade\Cache; use think\facade\Env; class JobUser { private $cache; private $handler; public function __construct() { $this->cache = Cache::init(); $this->handler = $this->cache->handler(); } /** * fire方法是消息隊列默認調用的方法 * @param Job $job 當前的任務對象 * @param array|mixed $data 發佈任務時自定義的數據 */ public function fire(Job $job,$data) { $job->delete(); //print("hahah\n"); // print("<info>The user already exists "."</info>\n"); // exit(); if(empty($data) || empty($data['userid']) || empty($data['mobile'])) { $job->delete(); print("canshu buzu\n"); return; } // 若有必要,能夠根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行. $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!$isJobStillNeedToBeDone) { print("hahah\n"); $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { //若是任務執行成功, 記得刪除任務 $job->delete(); print("<info>Hello Job has been done and deleted"."</info>\n"); }else{ if ($job->attempts() > 3) { //經過這個方法能夠檢查這個任務已經重試了幾回了 print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n"); //$job->delete(); // 也能夠從新發布這個任務 //print("<info>Hello Job will be availabe again after 2s."."</info>\n"); //$job->release(2); //$delay爲延遲時間,表示該任務延遲2秒後再執行 } } } /** * 有些消息在到達消費者時,可能已經再也不須要執行了 * @param array|mixed $data 發佈任務時自定義的數據 * @return boolean 任務執行的結果 */ private function checkDatabaseToSeeIfJobNeedToBeDone($data) { // 判斷手機緩存集合中是否存在 // $is_existe = $this->handler->sIsMember("register:mobile",$data['mobile']); // if($is_existe) { // return false; // } // // 查詢當前用戶是否在數據庫中存在 // $userinfo = Db::name('user')->field('id')->where('id',$data['userid'])->find(); // if($userinfo) { // return false; // } return true; } /** * 根據消息中的數據進行實際的業務處理 * @param array|mixed $data 發佈任務時自定義的數據 * @return boolean 任務執行的結果 */ private function doHelloJob($data) { try{ if(isset($data['inviterid']) && !empty($data['inviterid'])) { // 添加邀請記錄 $res_record = Db::name('user_inviter') ->insert([ 'inviterid' => $data['inviterid'], 'userid' => $data['userid'], 'code' => $data['inviterid'] . 'T' . $data['userid'], 'create_time' => $data['time'], ]); // 給邀請人贈送300步幣 Db::name('user_credit') ->insert([ 'userid' => $data['inviterid'], 'type' => 1, 'credit' => 300, 'source' => $res_record, 'create_time' => $data['time'] ]); // 更新邀請人步幣(用戶表) Db::name('user')->where('id', $data['inviterid'])->setInc('credit', 300); } { // 註冊成功發表動態 $dynamic_data['userid'] = $data['userid']; $dynamic_data['dynamic'] = base64_encode('號外!號外!我加入跑步錢進了,你們一塊兒走路領紅包吧!'); $dynamic_data['images'][] = 'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_d.png'; $dynamic_data['images'] = serialize($dynamic_data['images']); $dynamic_data['create_time'] = $data['time']; $result = Db::name('dynamic')->insert($dynamic_data); } }catch(\Exception $e) { Log::write('---執行消息隊列出錯---'.$e->getMessage()); return false; } return true; // 根據消息中的數據進行實際的業務處理... //var_dump($data); // print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n"); // print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n"); // print("<info>Hello Job is Done!"."</info> \n"); //return true; } /** * 該方法用於接收任務執行失敗的通知,你能夠發送郵件給相應的負責人員 * @param $jobData string|array|... //發佈任務時傳遞的 jobData 數據 */ public function failed($jobData) { //send_mail_to_somebody() ; print("Warning: Job failed after max retries. job data is :".var_export($jobData,true)."\n"); } }
(3). 設置任務執行失敗後的處理,好比記錄日誌或發郵件給開發者。
a. 在tags.php中配置失敗後執行了類。
<?php // 應用行爲擴展定義文件 return [ // 應用初始化 'app_init' => [], // 應用開始 'app_begin' => [], // 模塊初始化 'module_init' => [], // 操做開始執行 'action_begin' => [], // 視圖內容過濾 'view_filter' => [], // 日誌寫入 'log_write' => [], // 應用結束 'app_end' => [], 'queue_failed' => [ // 數組形式,[ 'ClassName' , 'methodName'] ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues'] // 字符串(靜態方法),'StaicClassName::methodName' // 'MyQueueFailedLogger::logAllFailedQueues' // 字符串(對象方法),'ClassName',此時需在對應的ClassName類中添加一個名爲 queueFailed 的方法 // 'application\\behavior\\MyQueueFailedLogger' // 閉包形式 /* function( &$jobObject , $extra){ // var_dump($jobObject); return true; } */ ], ];
b. 在application目錄下建立任務錯誤執行後的處理腳本,根據業務需求自定。
<?php namespace app\behavior; use think\Db; class MyQueueFailedLogger { const should_run_hook_callback = true; /** * @param $jobObject \think\queue\Job //任務對象,保存了該任務的執行狀況和業務數據 * @return bool true //是否須要刪除任務並觸發其failed() 方法 */ public function logAllFailedQueues(&$jobObject) { $failedJobLog = [ 'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello' 'queueName' => $jobObject->getQueue(), // 'helloJobQueue' 'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }' 'attempts' => $jobObject->attempts(), // 3 ]; var_export(json_encode($failedJobLog,true)); $data = [ "content" => json_encode($failedJobLog,true), "create_time" => time(), ]; Db::name('ztest')->insertGetId($data); // $jobObject->release(); //重發任務 //$jobObject->delete(); //刪除任務 //$jobObject->failed(); //通知消費者類任務執行失敗 return self::should_run_hook_callback; } }
5、經過命令運行消息隊列,如下以windows舉慄
1. cmd進入當前項目, 而後輸入 "php think queue:listen --queue userJobQueue" (userJobQueue是本身的隊列名)。
2. 也能夠在項目的根目錄建立bat文件,文件寫入"php think queue:listen --queue userJobQueue",保存只需雙擊就能夠執行。
6、測試結果
使用了消息隊列後,一樣610的併發,使用時間就縮短了
公衆號
關注公衆號,輸入「TP消息隊列「,獲取示例demo