1.簡介php
thinkphp-queue是thinkphp的一個第三方擴展, 內置了 Redis,Database,Topthink ,Sync這四種驅動,推薦使用redisgit
2. 下載 和安裝github
composer require topthink/think-queueredis
配置目錄在: application/extra/queue.php
thinkphp
return [ 'connector' => 'Redis', // Redis 驅動 'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null 'default' => 'default', // 默認的隊列名稱 'host' => '127.0.0.1', // redis 主機ip 'port' => 6379, // redis 端口 'password' => '', // redis 密碼 'select' => 0, // 使用哪個 db,默認爲 db0 'timeout' => 0, // redis鏈接的超時時間 'persistent' => false, // 是不是長鏈接 // 'connector' => 'Database', // 數據庫驅動 // 'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null // 'default' => 'default', // 默認的隊列名稱 // 'table' => 'jobs', // 存儲消息的表名,不帶前綴 // 'dsn' => [], // 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平臺 ,本文不做介紹 // 'token' => '', // 'project_id' => '', // 'protocol' => 'https', // 'host' => 'qns.topthink.com', // 'port' => 443, // 'api_version' => 1, // 'max_retries' => 3, // 'default' => 'default', // 'connector' => 'Sync', // Sync 驅動,該驅動的實際做用是取消消息隊列,還原爲同步執行 ];
3.入隊,建立隊列的代碼shell
/** * 文件路徑: \application\index\controller\JobTest.php * 該控制器的業務代碼中藉助了thinkphp-queue 庫,將一個消息推送到消息隊列 */ namespace application\index\controller; use think\Exception; use think\Queue; class JobTest { /** * 一個使用了隊列的 action */ public function actionWithHelloJob(){ // 1.當前任務將由哪一個類來負責處理。 // 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法 $jobHandlerClassName = 'application\index\job\Hello'; // 2.當前任務歸屬的隊列名稱,若是爲新隊列,會自動建立 $jobQueueName = "helloJobQueue"; // 3.當前任務所需的業務數據 . 不能爲 resource 類型,其餘類型最終將轉化爲json形式的字符串 // ( jobData 爲對象時,須要在先在此處手動序列化,不然只存儲其public屬性的鍵值對) $jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ; // 4.將該任務推送到消息隊列,等待對應的消費者去執行 $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // database 驅動時,返回值爲 1|false ; redis 驅動時,返回值爲 隨機字符串|false if( $isPushed !== false ){ echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>"; }else{ echo 'Oops, something went wrong.'; } } }
若是是多個任務:若是一個任務類裏有多個小任務的話,如上面的例子二,須要用@+方法名app\lib\job\Job2@task1
、app\lib\job\Job2@task2
數據庫
4.消費隊列的代碼:json
<?php namespace app\test\job; use think\queue\Job; class Hello { /** * fire方法是消息隊列默認調用的方法 * @param Job $job 當前的任務對象 * @param array|mixed $data 發佈任務時自定義的數據 */ public function fire(Job $job,$data){ // 若有必要,能夠根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行. $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!isJobStillNeedToBeDone){ $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { //若是任務執行成功, 記得刪除任務 $job->delete(); print("<info>Hello Job has been done and deleted"."</info>\n"); }else{ if ($job->attempts() > 3) { //經過這個方法能夠檢查這個任務已經重試了幾回了 print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n"); $job->delete(); // 也能夠從新發布這個任務 //print("<info>Hello Job will be availabe again after 2s."."</info>\n"); //$job->release(2); //$delay爲延遲時間,表示該任務延遲2秒後再執行 } } } /** * 有些消息在到達消費者時,可能已經再也不須要執行了 * @param array|mixed $data 發佈任務時自定義的數據 * @return boolean 任務執行的結果 */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * 根據消息中的數據進行實際的業務處理 * @param array|mixed $data 發佈任務時自定義的數據 * @return boolean 任務執行的結果 */ private function doHelloJob($data) { // 根據消息中的數據進行實際的業務處理... var_dump($data); // print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n"); // print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n"); // print("<info>Hello Job is Done!"."</info> \n"); return true; } }
執行代碼:api
php think queue:work --queue helloJobQueue
5.總結:app
5.1 命名:
queue:subscribe 命令 [截至2017-02-15,做者暫未實現該模式,略過]
queue:work 命令
work 命令: 該命令將啓動一個 work 進程來處理消息隊列。
php think queue:work --queue helloJobQueue
queue:listen 命令
listen 命令: 該命令將會建立一個 listen 父進程 ,而後由父進程經過 proc_open(‘php think queue:work’)
的方式來建立一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。
php think queue:listen --queue helloJobQueue
queue:restart 從新開啓
參考資料:https://blog.csdn.net/will5451/article/details/80434174
https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645
https://github.com/top-think/think-queue