thinkphp5 消息隊列thinkphp-queue擴展

1.簡介php

thinkphp-queue是thinkphp的一個第三方擴展, 內置了 Redis,Database,Topthink ,Sync這四種驅動,推薦使用redisgit

2. 下載 和安裝github

composer require topthink/think-queueredis

配置目錄在: application/extra/queue.phpthinkphp

return [ 'connector'  => 'Redis',            // Redis 驅動
    'expire'     => 60,                // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null
    'default'    => 'default',        // 默認的隊列名稱
    'host'       => '127.0.0.1',        // redis 主機ip
    'port'       => 6379,            // redis 端口
    'password'   => '',                // redis 密碼
    'select'     => 0,                // 使用哪個 db,默認爲 db0
    'timeout'    => 0,                // redis鏈接的超時時間
    'persistent' => false,            // 是不是長鏈接 // 'connector' => 'Database', // 數據庫驅動 // 'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null // 'default' => 'default', // 默認的隊列名稱 // 'table' => 'jobs', // 存儲消息的表名,不帶前綴 // 'dsn' => [], // 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平臺 ,本文不做介紹 // 'token' => '', // 'project_id' => '', // 'protocol' => 'https', // 'host' => 'qns.topthink.com', // 'port' => 443, // 'api_version' => 1, // 'max_retries' => 3, // 'default' => 'default', // 'connector' => 'Sync', // Sync 驅動,該驅動的實際做用是取消消息隊列,還原爲同步執行
];

3.入隊,建立隊列的代碼shell

/** * 文件路徑: \application\index\controller\JobTest.php * 該控制器的業務代碼中藉助了thinkphp-queue 庫,將一個消息推送到消息隊列 */ namespace application\index\controller; use think\Exception; use think\Queue; class JobTest { /** * 一個使用了隊列的 action */
  public function actionWithHelloJob(){ // 1.當前任務將由哪一個類來負責處理。 // 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法
      $jobHandlerClassName  = 'application\index\job\Hello'; // 2.當前任務歸屬的隊列名稱,若是爲新隊列,會自動建立
      $jobQueueName        = "helloJobQueue"; // 3.當前任務所需的業務數據 . 不能爲 resource 類型,其餘類型最終將轉化爲json形式的字符串 // ( jobData 爲對象時,須要在先在此處手動序列化,不然只存儲其public屬性的鍵值對)
      $jobData             = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ; // 4.將該任務推送到消息隊列,等待對應的消費者去執行
      $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // database 驅動時,返回值爲 1|false ; redis 驅動時,返回值爲 隨機字符串|false
      if( $isPushed !== false ){ echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>"; }else{ echo 'Oops, something went wrong.'; } } }

若是是多個任務:若是一個任務類裏有多個小任務的話,如上面的例子二,須要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2數據庫

4.消費隊列的代碼:json

<?php  namespace app\test\job; use think\queue\Job; class Hello { /** * fire方法是消息隊列默認調用的方法 * @param Job $job 當前的任務對象 * @param array|mixed $data 發佈任務時自定義的數據 */
    public function fire(Job $job,$data){ // 若有必要,能夠根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!isJobStillNeedToBeDone){ $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { //若是任務執行成功, 記得刪除任務
            $job->delete(); print("<info>Hello Job has been done and deleted"."</info>\n"); }else{ if ($job->attempts() > 3) { //經過這個方法能夠檢查這個任務已經重試了幾回了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n"); $job->delete(); // 也能夠從新發布這個任務 //print("<info>Hello Job will be availabe again after 2s."."</info>\n"); //$job->release(2); //$delay爲延遲時間,表示該任務延遲2秒後再執行
 } } } /** * 有些消息在到達消費者時,可能已經再也不須要執行了 * @param array|mixed $data 發佈任務時自定義的數據 * @return boolean 任務執行的結果 */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * 根據消息中的數據進行實際的業務處理 * @param array|mixed $data 發佈任務時自定義的數據 * @return boolean 任務執行的結果 */
    private function doHelloJob($data) { // 根據消息中的數據進行實際的業務處理...
        var_dump($data); // print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n"); // print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n"); // print("<info>Hello Job is Done!"."</info> \n");

        return true; } }

執行代碼:api

php think queue:work --queue helloJobQueue

5.總結:app

5.1 命名:

  • queue:subscribe 命令 [截至2017-02-15,做者暫未實現該模式,略過]

  • queue:work 命令

    work 命令: 該命令將啓動一個 work 進程來處理消息隊列。

    php think queue:work --queue helloJobQueue
  • queue:listen 命令

    listen 命令: 該命令將會建立一個 listen 父進程 ,而後由父進程經過 proc_open(‘php think queue:work’) 的方式來建立一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。

    php think queue:listen --queue helloJobQueue

    queue:restart 從新開啓

     

     

    參考資料:https://blog.csdn.net/will5451/article/details/80434174

    https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645

    https://github.com/top-think/think-queue

相關文章
相關標籤/搜索