解決註冊併發問題並提升QPS

前言:前面在本地的windows經過apache的ab工具測試了600併發下「查詢指定手機是否存在再提交數據」的註冊功能會出現重複提交的狀況,而且在註冊完成時還須要對邀請人進行獎勵,記錄邀請記錄,對該新用戶自動發佈動態信息,發短信或發郵件等其餘業務功能。因此這裏當併發時,註冊功能就變得低效且容易出現問題。php

方法:先對重複提交的問題經過redis解決,再把註冊儲存用戶基本信息之後的操做放到隊列中進行異步執行,能夠很好的優化註冊功能,提升QPS。redis

1、環境要求數據庫

  • PHP版本 >= 5.6.0
  • PHP框架:Thinkphp5.1.*
  • 消息隊列:Think-queue2.0
  • PHP擴展:Redis

2、下載框架和消息隊列中間件apache

1. 下載tp5.1。json

composer create-project topthink/think=5.1.* tp5  --prefer-dist

2. 安裝think-queue。windows

composer require topthink/think-queue

 3. php安裝redis擴展和打開redis服務端和客戶端。api

3、解決註冊重複提交數組

1. 配置文件中cache設置爲redis驅動,並新建控制器由於cache相關命名空間。緩存

use think\Exception;
use think\facade\Cache;
use think\facade\Env;
use think\Queue;

2. 使用無序集合存手機號,經過判斷當前手機號是不是在指定鍵裏爲成員(若是註冊存入數據庫失敗,經過sRem刪除該成員),而後再經過查詢數據庫判斷是否存在。閉包

private $cache;
private  $handler;
// 實例化redis
public function __construct() {
    $this->cache = Cache::init();
    $this->handler = $this->cache->handler();
}



// 判斷手機號是否在集合中
$is_existe = $this->handler->sIsMember("register:mobile",$mobile);
if(!$is_existe) {
   $this->handler->sAdd("register:mobile",$mobile);
}else {
   //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---手機號已存在');
   var_dump('手機號已存在');    // 用戶已存在
   die;
}

// 查詢手機號碼是否已註冊
$user = db('user')->field('mobile')->where('mobile', $mobile)->find();
if ($user) {
    //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---手機號註冊了');
    var_dump('手機號已註冊');    // 用戶已存在
    die;
}

4、消息隊列分解註冊功能

1. 配置消息隊列,後面以redis驅動爲例。

<?php

return [
    'connector'  => 'Redis',            // Redis 驅動
    'expire'     => 60,                // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null
    'default'    => 'default',        // 默認的隊列名稱
    'host'       => '127.0.0.1',        // redis 主機ip
    'port'       => 6379,            // redis 端口
    'password'   => '',                // redis 密碼
    'select'     => 0,                // 使用哪個 db,默認爲 db0
    'timeout'    => 0,                // redis鏈接的超時時間
    'persistent' => false,            // 是不是長鏈接

    //    'connector' => 'Database',   // 數據庫驅動
    //    'expire'    => 60,           // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null
    //    'default'   => 'default',    // 默認的隊列名稱
    //    'table'     => 'jobs',       // 存儲消息的表名,不帶前綴
    //    'dsn'       => [],

    //    'connector'   => 'Topthink',    // ThinkPHP內部的隊列通知服務平臺 ,本文不做介紹
    //    'token'       => '',
    //    'project_id'  => '',
    //    'protocol'    => 'https',
    //    'host'        => 'qns.topthink.com',
    //    'port'        => 443,
    //    'api_version' => 1,
    //    'max_retries' => 3,
    //    'default'     => 'default',

//        'connector'   => 'Sync',        // Sync 驅動,該驅動的實際做用是取消消息隊列,還原爲同步執行
];

2. 完成添加新用戶後將指定數據加入消息隊列。

<?php
namespace app\index\controller;
use think\Db;
use think\Validate;
use think\Exception;
use think\facade\Cache;
use think\facade\Env;
use think\Queue;
use think\Log;

class Index
{	
	private $cache;
    private  $handler;
	public function __construct() {
		$this->cache = Cache::init();
		$this->handler = $this->cache->handler();
	}

    public function index()
    {
    	$data = input('post.');

        unset($data['balance']);
        unset($data['credit']);
        
        // $blacklist = [
        //     "18124198164","13401363108","17688552009","15089352898","13602940094","13346643336","13181351655","18301123028","13598020751","13014568187",
        //     "13428733909","17337991130","13275342497"
        // ];

        $rule = [
            'mobile' => 'require|number|length:11',
            'password' => 'require|length:6,32',
        ];

        $msg = [
            'mobile.require' => '手機號必須',
            'mobile.length' => '手機號爲11位數字',
            'mobile.number' => '手機號爲11位數字',
            'password.require' => '密碼必須',
            'password.length' => '密碼爲6-12位之間',
        ];

        //驗證數據是否合法
        $mobile = isset($data['mobile']) ? $data['mobile'] : '';

        $validate = new Validate($rule, $msg);
        $result = $validate->check($data);

        if (!$result) {
            var_dump($validate->getError());
            die;
        }
        
        // if(in_array($mobile,$blacklist)) {
        //     var_dump('該手機號已註冊了');    // 黑名單
        //     die;
        // }
        
        // 判斷手機號是否在集合中
        $is_existe = $this->handler->sIsMember("register:mobile",$mobile);
        if(!$is_existe) {
            $this->handler->sAdd("register:mobile",$mobile);
        }else {
            //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---手機號已存在');
            var_dump('手機號已存在');    // 用戶已存在
            die;
        }

        // 查詢手機號碼是否已註冊
        $user = db('user')->field('mobile')->where('mobile', $mobile)->find();
        if ($user) {
            //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---手機號註冊了');
            var_dump('手機號已註冊');    // 用戶已存在
            die;
        }       

        // 用戶不存在註冊
        // $data['id']          = getNewUserid();
        $data['no'] = date("Ymdhis").rand(100, 999);
        $data['avatar'] = 'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_h.png';
        $data['password'] = md5($data['password']);
        $randomNickname = date("Ymdhis").rand(100, 999);         
        $data['nickname'] = 'rm_' . $randomNickname;
        $data['create_time'] = time();
        $data['type'] = 1;
        
        /***是否存在邀請人的跑步錢進號***/
        if(isset($data['pbqj_no']) && !empty($data['pbqj_no'])) {
            $inviter = db('user')->field('id')->where(["no"=>$data['pbqj_no']])->find();
            if($inviter) {
                $data['inviter_id'] = $inviter['id'];
            }
        }
        /***是否存在邀請人的跑步錢進號***/
        unset($data['pbqj_no']);
        $userid = db('user')->insertGetId($data);

        if ($userid) {
        /******************加入消息隊列異步處理後續操做*******************/

            // 1.當前任務將由哪一個類來負責處理。 
            // 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法
            $jobHandlerClassName  = 'app\index\job\JobUser'; 
            // 2.當前任務歸屬的隊列名稱,若是爲新隊列,會自動建立
            $jobQueueName         = "userJobQueue"; 
            // 3.當前任務所需的業務數據 . 不能爲 resource 類型,其餘類型最終將轉化爲json形式的字符串
            // ( jobData 爲對象時,須要在先在此處手動序列化,不然只存儲其public屬性的鍵值對)
            //$jobData              = ['ts' => time(), 'bizId' => uniqid() , 'a' => 1];
            $jobData              = ['userid'=>$userid,'time'=>time(),'mobile'=>$mobile,'inviterid'=>(isset($data['inviter_id']) ? $data['inviter_id'] : 0)];
            // 4.將該任務推送到消息隊列,等待對應的消費者去執行
            $isPushed = Queue::push($jobHandlerClassName , $jobData , $jobQueueName);    
            // database 驅動時,返回值爲 1|false  ;   redis 驅動時,返回值爲 隨機字符串|false
            if($isPushed !== false) { 
            	var_dump('加入隊列成功');
            	die;
                //Log::write('-----------加入消息隊列成功-----------');
                //echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
            }else{
            	var_dump('加入消息隊列');
            	die;
                //Log::write('-----------加入消息隊列失敗-----------');
                //echo 'Oops, something went wrong.';
            }
        /******************加入消息隊列異步處理後續操做*******************/

            $res['id'] = $userid;
            $res['no'] = $data['no'];

            // // token處理類
            // $accessToken = new AccessToken();
            // $accessToken = $accessToken->getToken($userid);

            // if (empty($accessToken)) {
            //     //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---祕鑰生成失敗');
            //     var_dump('祕鑰生成失敗');
            // } else {
            //     $res['user_token'] = $accessToken;
            // }

            // if (method_exists(\chat\User::class, 'getToken')) {
            //     $chat_token = \chat\User::getToken($res['id'], $data['nickname'], $data['avatar']);
            //     if (!$chat_token) {
            //         //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---聊天祕鑰生成失敗');
            //         var_dump('聊天祕鑰生成失敗');
            //     } else {
            //         $res['chat_token'] = $chat_token;
            //     }
            // } else {
            //     $res['chat_token'] = '';
            // }
            
            //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---註冊成功');
            var_dump($res);
            die;
        } else {
            //Log::write('---壓力測試'.date("Y-m-d h:i:s").'---數據庫錯誤');
            $this->handler->sRem("register:mobile",$mobile);
            var_dump('數據庫錯誤');
            die;
        }
    }

    public function hello($name = 'ThinkPHP5')
    {
        return 'hello,' . $name;
    }
}

 2. 建立消費者(job),對執行隊列中的任務。

(1). 在同一模塊下新建job文件夾和一個執行類(JobUser), 須要對應生產者中jobHandlerClassName。

(2). 前面執行完隊列加入成功後,能夠本地使用redis客戶端經過lrange queues:userJobQueue 0 -1 查看隊列成員 (queues:userJobQueue中,userJobQueue是本身在加入隊列前本身起的隊列名稱,與queues: 拼接就是redis的list的鍵名,因此能夠直接查看 )。

(3).隊列中的data就是本身傳遞的數據,後面須要在消費者中經過該數據進行註冊功能後的業務操做: 送獎勵,存儲邀請記錄,發動態,發短信,發郵件等等。

<?php
namespace app\index\job;
use think\queue\Job;
use think\Db;
use think\Exception;
use think\facade\Cache;
use think\facade\Env;

class JobUser {

    private  $cache;
    private  $handler;
    public  function  __construct()
    {
        $this->cache = Cache::init();
        $this->handler = $this->cache->handler();
    }

    /**
     * fire方法是消息隊列默認調用的方法
     * @param Job            $job      當前的任務對象
     * @param array|mixed    $data     發佈任務時自定義的數據
     */
    public function fire(Job $job,$data) {
        $job->delete();
        //print("hahah\n");
        // print("<info>The user already exists "."</info>\n");
        //     exit();
        if(empty($data) || empty($data['userid']) || empty($data['mobile'])) {
            $job->delete();
            print("canshu buzu\n");
            return;
        }

        // 若有必要,能夠根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone) {
            print("hahah\n");
            $job->delete();
            return;
        }

        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            //若是任務執行成功, 記得刪除任務
            $job->delete();
            print("<info>Hello Job has been done and deleted"."</info>\n");
        }else{
            if ($job->attempts() > 3) {
                //經過這個方法能夠檢查這個任務已經重試了幾回了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
                //$job->delete();
                // 也能夠從新發布這個任務
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay爲延遲時間,表示該任務延遲2秒後再執行
            }
        }
    }

    /**
     * 有些消息在到達消費者時,可能已經再也不須要執行了
     * @param array|mixed    $data     發佈任務時自定義的數據
     * @return boolean                 任務執行的結果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data) {
        // 判斷手機緩存集合中是否存在
        // $is_existe = $this->handler->sIsMember("register:mobile",$data['mobile']);
        // if($is_existe) {
        //     return false;  
        // } 

        // // 查詢當前用戶是否在數據庫中存在
        // $userinfo = Db::name('user')->field('id')->where('id',$data['userid'])->find();
        // if($userinfo) {
        //     return false;  
        // } 

        return true;
    }

    /**
     * 根據消息中的數據進行實際的業務處理
     * @param array|mixed    $data     發佈任務時自定義的數據
     * @return boolean                 任務執行的結果
    */
    private function doHelloJob($data) {
        try{

            if(isset($data['inviterid']) && !empty($data['inviterid'])) {

                // 添加邀請記錄
                $res_record = Db::name('user_inviter')
                    ->insert([
                        'inviterid'   => $data['inviterid'],
                        'userid'      => $data['userid'],
                        'code'        => $data['inviterid'] . 'T' . $data['userid'],
                        'create_time' => $data['time'],
                ]);

                // 給邀請人贈送300步幣
                Db::name('user_credit')
                    ->insert([
                        'userid'      => $data['inviterid'],
                        'type'        => 1,
                        'credit'      => 300,
                        'source'      => $res_record,
                        'create_time' => $data['time']
                ]);

                // 更新邀請人步幣(用戶表)
                Db::name('user')->where('id', $data['inviterid'])->setInc('credit', 300);              
            }
            
            {   // 註冊成功發表動態
                $dynamic_data['userid'] = $data['userid'];
                $dynamic_data['dynamic'] = base64_encode('號外!號外!我加入跑步錢進了,你們一塊兒走路領紅包吧!');
                $dynamic_data['images'][] = 'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_d.png';
                $dynamic_data['images'] = serialize($dynamic_data['images']);
                $dynamic_data['create_time'] = $data['time'];

                $result = Db::name('dynamic')->insert($dynamic_data);
            }

        }catch(\Exception $e) {
            Log::write('---執行消息隊列出錯---'.$e->getMessage());
            return false;
        }

        return true;

        // 根據消息中的數據進行實際的業務處理...
        //var_dump($data);
//        print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
//        print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
//        print("<info>Hello Job is Done!"."</info> \n");

        //return true;
    }

    /**
     * 該方法用於接收任務執行失敗的通知,你能夠發送郵件給相應的負責人員
     * @param $jobData  string|array|...      //發佈任務時傳遞的 jobData 數據
    */
    public function failed($jobData) {
        //send_mail_to_somebody() ;
        print("Warning: Job failed after max retries. job data is :".var_export($jobData,true)."\n");
    }

}

(3). 設置任務執行失敗後的處理,好比記錄日誌或發郵件給開發者。

a. 在tags.php中配置失敗後執行了類。

<?php

// 應用行爲擴展定義文件
return [
    // 應用初始化
    'app_init'     => [],
    // 應用開始
    'app_begin'    => [],
    // 模塊初始化
    'module_init'  => [],
    // 操做開始執行
    'action_begin' => [],
    // 視圖內容過濾
    'view_filter'  => [],
    // 日誌寫入
    'log_write'    => [],
    // 應用結束
    'app_end'      => [],
    'queue_failed' => [
        // 數組形式,[ 'ClassName' , 'methodName']
        ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']

        // 字符串(靜態方法),'StaicClassName::methodName'
        // 'MyQueueFailedLogger::logAllFailedQueues'

        // 字符串(對象方法),'ClassName',此時需在對應的ClassName類中添加一個名爲 queueFailed 的方法
        // 'application\\behavior\\MyQueueFailedLogger'

        // 閉包形式
        /*
        function( &$jobObject , $extra){
            // var_dump($jobObject);
            return true;
        }
        */
    ],
    
];

b. 在application目錄下建立任務錯誤執行後的處理腳本,根據業務需求自定。

<?php

namespace app\behavior;
use think\Db;


class MyQueueFailedLogger
{
    const should_run_hook_callback = true;

    /**
     * @param $jobObject   \think\queue\Job   //任務對象,保存了該任務的執行狀況和業務數據
     * @return bool     true                  //是否須要刪除任務並觸發其failed() 方法
    */
    public function logAllFailedQueues(&$jobObject) {

        $failedJobLog = [
            'jobHandlerClassName'   => $jobObject->getName(), // 'application\index\job\Hello'
            'queueName' => $jobObject->getQueue(),			   // 'helloJobQueue'
            'jobData'   => $jobObject->getRawBody()['data'],  // '{'a': 1 }'
            'attempts'  => $jobObject->attempts(),            // 3
        ];
        var_export(json_encode($failedJobLog,true));

        $data = [
            "content" => json_encode($failedJobLog,true),
            "create_time" => time(),
        ];
        Db::name('ztest')->insertGetId($data);

        // $jobObject->release();     //重發任務
        //$jobObject->delete();         //刪除任務
        //$jobObject->failed();	  //通知消費者類任務執行失敗

        return self::should_run_hook_callback;
    }
}

5、經過命令運行消息隊列,如下以windows舉慄

1. cmd進入當前項目, 而後輸入 "php think queue:listen --queue userJobQueue"   (userJobQueue是本身的隊列名)。

2. 也能夠在項目的根目錄建立bat文件,文件寫入"php think queue:listen --queue userJobQueue",保存只需雙擊就能夠執行。

 

6、測試結果

使用了消息隊列後,一樣610的併發,使用時間就縮短了

 

公衆號

關注公衆號,輸入「TP消息隊列「,獲取示例demo

相關文章
相關標籤/搜索