thinkphp5 消息隊列thinkphp-queue擴展

1.簡介php

thinkphp-queue是thinkphp的一個第三方擴展, 內置了 Redis,Database,Topthink ,Sync這四種驅動,推薦使用redisgit

2. 下載 和安裝github

composer require topthink/think-queueredis

配置目錄在: application/extra/queue.phpthinkphp

return [
    'connector'  => 'Redis',            // Redis 驅動
    'expire'     => 60,                // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null
    'default'    => 'default',        // 默認的隊列名稱
    'host'       => '127.0.0.1',        // redis 主機ip
    'port'       => 6379,            // redis 端口
    'password'   => '',                // redis 密碼
    'select'     => 0,                // 使用哪個 db,默認爲 db0
    'timeout'    => 0,                // redis鏈接的超時時間
    'persistent' => false,            // 是不是長鏈接

    //    'connector' => 'Database',   // 數據庫驅動
    //    'expire'    => 60,           // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null
    //    'default'   => 'default',    // 默認的隊列名稱
    //    'table'     => 'jobs',       // 存儲消息的表名,不帶前綴
    //    'dsn'       => [],

    //    'connector'   => 'Topthink',    // ThinkPHP內部的隊列通知服務平臺 ,本文不做介紹
    //    'token'       => '',
    //    'project_id'  => '',
    //    'protocol'    => 'https',
    //    'host'        => 'qns.topthink.com',
    //    'port'        => 443,
    //    'api_version' => 1,
    //    'max_retries' => 3,
    //    'default'     => 'default',

//        'connector'   => 'Sync',        // Sync 驅動,該驅動的實際做用是取消消息隊列,還原爲同步執行
];

3.入隊,建立隊列的代碼shell

/**
* 文件路徑: \application\index\controller\JobTest.php
* 該控制器的業務代碼中藉助了thinkphp-queue 庫,將一個消息推送到消息隊列
*/
namespace application\index\controller;
  use think\Exception;

  use think\Queue;

  class JobTest {
  /**
   * 一個使用了隊列的 action
   */
  public function actionWithHelloJob(){
      
      // 1.當前任務將由哪一個類來負責處理。 
      //   當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法
      $jobHandlerClassName  = 'application\index\job\Hello'; 
      // 2.當前任務歸屬的隊列名稱,若是爲新隊列,會自動建立
      $jobQueueName        = "helloJobQueue"; 
      // 3.當前任務所需的業務數據 . 不能爲 resource 類型,其餘類型最終將轉化爲json形式的字符串
      //   ( jobData 爲對象時,須要在先在此處手動序列化,不然只存儲其public屬性的鍵值對)
      $jobData             = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;
      // 4.將該任務推送到消息隊列,等待對應的消費者去執行
      $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );    
      // database 驅動時,返回值爲 1|false  ;   redis 驅動時,返回值爲 隨機字符串|false
      if( $isPushed !== false ){  
          echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
      }else{
          echo 'Oops, something went wrong.';
      }
  }
 }

若是是多個任務:若是一個任務類裏有多個小任務的話,如上面的例子二,須要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2數據庫

4.消費隊列的代碼:json

<?php

namespace app\test\job;


use think\queue\Job;

class Hello {

    /**
     * fire方法是消息隊列默認調用的方法
     * @param Job            $job      當前的任務對象
     * @param array|mixed    $data     發佈任務時自定義的數據
     */
    public function fire(Job $job,$data){
        // 若有必要,能夠根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!isJobStillNeedToBeDone){
            $job->delete();
            return;
        }

        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            //若是任務執行成功, 記得刪除任務
            $job->delete();
            print("<info>Hello Job has been done and deleted"."</info>\n");
        }else{
            if ($job->attempts() > 3) {
                //經過這個方法能夠檢查這個任務已經重試了幾回了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
                $job->delete();
                // 也能夠從新發布這個任務
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay爲延遲時間,表示該任務延遲2秒後再執行
            }
        }
    }

    /**
     * 有些消息在到達消費者時,可能已經再也不須要執行了
     * @param array|mixed    $data     發佈任務時自定義的數據
     * @return boolean                 任務執行的結果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }

    /**
     * 根據消息中的數據進行實際的業務處理
     * @param array|mixed    $data     發佈任務時自定義的數據
     * @return boolean                 任務執行的結果
     */
    private function doHelloJob($data) {
        // 根據消息中的數據進行實際的業務處理...
        var_dump($data);
//        print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
//        print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
//        print("<info>Hello Job is Done!"."</info> \n");

        return true;
    }
}

執行代碼:api

php think queue:work --queue helloJobQueue

5.總結:app

5.1 命名:

  • queue:subscribe 命令 [截至2017-02-15,做者暫未實現該模式,略過]

  • queue:work 命令

    work 命令: 該命令將啓動一個 work 進程來處理消息隊列。

    php think queue:work --queue helloJobQueue
  • queue:listen 命令

    listen 命令: 該命令將會建立一個 listen 父進程 ,而後由父進程經過 proc_open(‘php think queue:work’) 的方式來建立一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。

    php think queue:listen --queue helloJobQueue

    queue:restart 從新開啓

     

     

    參考資料:https://blog.csdn.net/will5451/article/details/80434174

    https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645

    https://github.com/top-think/think-queue

相關文章
相關標籤/搜索