thinkphp-queue自帶的隊列包使用分析(轉)

前言

當前筆記中的內容針對的是 thinkphp-queue 的 v1.1.2 版本,如今官方已經更新到了 v1.1.3 版本, 下文中提到的幾個Bug在最新的master分支上均已修復。 筆記中的部份內容還未更新。php

傳統的程序執行流程通常是 即時|同步|串行的,在某些場景下,會存在併發低,吞吐量低,響應時間長等問題。在大型系統中,通常會引入消息隊列的組件,將流程中部分任務抽離出來放入消息隊列,並由專門的消費者做針對性的處理,從而下降系統耦合度,提升系統性能和可用性。html

通常來講,能夠抽離的任務具備如下的特色:laravel

  • 容許延後|異步|並行處理 (相對於傳統的 即時|同步|串行 的執行方式)git

    • 容許延後:github

      搶購活動時,先快速緩衝有限的參與人數到消息隊列,後續再排隊處理實際的搶購業務;redis

    • 容許異步:sql

      業務處理過程當中的郵件,短信等通知thinkphp

    • 容許並行:shell

      用戶支付成功以後,郵件通知,微信通知,短信通知能夠由多個不一樣的消費者並行執行,通知到達的時間不要求前後順序。數據庫

  • 容許失敗和重試

    • 強一致性的業務放入核心流程處理
    • 無一致性要求或最終一致便可的業務放入隊列處理

thinkphp-queue 是thinkphp 官方提供的一個消息隊列服務,它支持消息隊列的一些基本特性:

  • 消息的發佈,獲取,執行,刪除,重發,失敗處理,延遲執行,超時控制等
  • 隊列的多隊列, 內存限制 ,啓動,中止,守護等
  • 消息隊列可降級爲同步執行

thinkphp-queue 內置了 Redis,Database,Topthink ,Sync這四種驅動。本文主要介紹 thinkphp-queue 結合其內置的 redis 驅動的使用方式和基本原理。

注1:如無特殊說明,下文中的 ‘消息’ 和 ‘任務’兩個詞指代的是同一個概念,即隊列中的一個成員。該成員對消息隊列而言是其內部保存的消息; 對業務應用而言是一個待執行的任務。請根據語境區分。

注2:本文編寫時(2017-02-15)使用的 thinkphp-queue 的版本號是 v1.1.2 。該版本中部分功能並未所有完成,如 subscribe 模式,以及存在幾個bug(稍後會說起)。若有變動,請以官方最新版爲準。

一 代碼示例

先經過一段代碼,瞭解一下 thinkphp-queue 的基本使用流程。

目標:

在業務控制器中推送一個新消息到一個名爲 ‘helloJobQueue’ 的隊列中,該消息中包含咱們自定義的業務數據,而後,編寫一個名爲 Hello 的消費者類,並經過命令行去調用該消費者類獲取這個消息,拿到定義的數據。

1.1 安裝 thinkphp-queue

composer install thinkphp-queue

1.2 搭建消息隊列的存儲環境

  • 使用 Redis [推薦]

    安裝並啓動 Redis 服務
  • 使用數據庫 [不推薦]

    CREATE TABLE `prefix_jobs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `queue` varchar(255) NOT NULL, `payload` longtext NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserved` tinyint(3) unsigned NOT NULL, `reserved_at` int(10) unsigned DEFAULT NULL, `available_at` int(10) unsigned NOT NULL, `created_at` int(10) unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

1.3 配置消息隊列的驅動

根據選擇的存儲方式,在 \application\extra\queue.php 這個配置文件中,添加消息隊列對應的驅動配置

   return [  'connector' => 'Redis', // Redis 驅動  'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null  'default' => 'default', // 默認的隊列名稱  'host' => '127.0.0.1', // redis 主機ip  'port' => 6379, // redis 端口  'password' => '', // redis 密碼  'select' => 0, // 使用哪個 db,默認爲 db0  'timeout' => 0, // redis鏈接的超時時間  'persistent' => false, // 是不是長鏈接   // 'connector' => 'Database', // 數據庫驅動  // 'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null  // 'default' => 'default', // 默認的隊列名稱  // 'table' => 'jobs', // 存儲消息的表名,不帶前綴  // 'dsn' => [],   // 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平臺 ,本文不做介紹  // 'token' => '',  // 'project_id' => '',  // 'protocol' => 'https',  // 'host' => 'qns.topthink.com',  // 'port' => 443,  // 'api_version' => 1,  // 'max_retries' => 3,  // 'default' => 'default',   // 'connector' => 'Sync', // Sync 驅動,該驅動的實際做用是取消消息隊列,還原爲同步執行  ];

1.3.1 配置文件中的 expire 參數說明

expire 參數指的是任務的過時時間。 過時的任務,其準確的定義是

  1. 任務的狀態爲執行中
  2. 任務的開始執行的時刻 + expire > 當前時刻

expire 不爲null 時 ,thinkphp-queue 會在每次獲取下一個任務以前檢查並重發過時(執行超時)的任務。

expire 爲null 時,thinkphp-queue 不會檢查過時的任務,性能相對較高一點。可是須要注意:

  • 這些執行超時的任務會一直留在消息隊列中,須要開發者另行處理(刪除或者重發)!
  • Bug ]在redis 驅動下,expire 設置爲 null 時,沒法實現任務的延遲執行! (Database 驅動下無影響)

對expire 參數理解或者使用不當時,很容易產生一些bug,後面會舉例提到。

1.4 消息的建立與推送

咱們在業務控制器中建立一個新的消息,並推送到 helloJobQueue 隊列

新增 \application\index\controller\JobTest.php 控制器,在該控制器中添加 actionWithHelloJob 方法

<?php /** * 文件路徑: \application\index\controller\JobTest.php * 該控制器的業務代碼中藉助了thinkphp-queue 庫,將一個消息推送到消息隊列 */ namespace application\index\controller;  use think\Exception;   use think\Queue;   class JobTest {  /**  * 一個使用了隊列的 action  */  public function actionWithHelloJob(){   // 1.當前任務將由哪一個類來負責處理。  // 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法  $jobHandlerClassName = 'application\index\job\Hello';  // 2.當前任務歸屬的隊列名稱,若是爲新隊列,會自動建立  $jobQueueName = "helloJobQueue";  // 3.當前任務所需的業務數據 . 不能爲 resource 類型,其餘類型最終將轉化爲json形式的字符串  // ( jobData 爲對象時,須要在先在此處手動序列化,不然只存儲其public屬性的鍵值對)  $jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;  // 4.將該任務推送到消息隊列,等待對應的消費者去執行  $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );  // database 驅動時,返回值爲 1|false ; redis 驅動時,返回值爲 隨機字符串|false  if( $isPushed !== false ){  echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";  }else{  echo 'Oops, something went wrong.';  }  }  }

注意: 在這個例子當中,咱們是手動指定的 $jobHandlerClassName ,更合理的作法是先定義好消息名稱與消費者類名的映射關係,而後由某個能夠獲取該映射關係的類來推送這個消息。這樣,生產者只須要知道消息的名稱,而無需指定哪一個消費者類來處理。

除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );這種方式以外,還能夠直接傳入 Queue::push( $jobHandlerObject ,null , $jobQueueName ); 這時,須要在 $jobHandlerObject 中定義一個 handle() 方法,消息隊列在執行到該任務時會自動反序列化該對象,並調用其 handle()方法。 該方式的缺點是沒法傳入自定義數據。

1.5 消息的消費與刪除

編寫 Hello 消費者類,用於處理 helloJobQueue 隊列中的任務

新增 \application\index\job\Hello.php 消費者類,並編寫其 fire() 方法

 <?php  /**  * 文件路徑: \application\index\job\Hello.php  * 這是一個消費者類,用於處理 helloJobQueue 隊列中的任務  */  namespace application\index\job;   use think\queue\Job;   class Hello {   /**  * fire方法是消息隊列默認調用的方法  * @param Job $job 當前的任務對象  * @param array|mixed $data 發佈任務時自定義的數據  */  public function fire(Job $job,$data){           // 若有必要,能夠根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.           $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);           if(!isJobStillNeedToBeDone){  $job->delete();  return;  }           $isJobDone = $this->doHelloJob($data);   if ($isJobDone) {  //若是任務執行成功, 記得刪除任務  $job->delete();  print("<info>Hello Job has been done and deleted"."</info>\n");  }else{  if ($job->attempts() > 3) {  //經過這個方法能夠檢查這個任務已經重試了幾回了  print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");  $job->delete();  // 也能夠從新發布這個任務  //print("<info>Hello Job will be availabe again after 2s."."</info>\n");  //$job->release(2); //$delay爲延遲時間,表示該任務延遲2秒後再執行  }  }  }   /**        * 有些消息在到達消費者時,可能已經再也不須要執行了  * @param array|mixed $data 發佈任務時自定義的數據  * @return boolean 任務執行的結果        */       private function checkDatabaseToSeeIfJobNeedToBeDone($data){  return true;  }   /**  * 根據消息中的數據進行實際的業務處理  * @param array|mixed $data 發佈任務時自定義的數據  * @return boolean 任務執行的結果  */  private function doHelloJob($data) {  // 根據消息中的數據進行實際的業務處理...   print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");  print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");  print("<info>Hello Job is Done!"."</info> \n");   return true;  }  }

至此,全部的代碼都已準備完畢,在運行消息隊列以前,咱們先看一下如今的目錄結構:

目錄結構-代碼示例

1.6 發佈任務

在瀏覽器中訪問 http://your.project.domain/index/job_test/actionWithHelloJob ,能夠看到消息推送成功。

瀏覽器提示消息推送結果

1.7 處理任務

切換當前終端窗口的目錄到項目根目錄下,執行

php think queue:work --queue helloJobQueue

能夠看到執行的結果相似以下:

命令行執行結果

至此,咱們成功地經歷了一個消息的 建立 -> 推送 -> 消費 -> 刪除 的基本流程

下文,將介紹 thinkphp-queue 的詳細使用方法。如配置介紹,基本原理,各類特殊狀況的處理等

二 詳細介紹

2.1 命令模式

  • queue:subscribe 命令 [截至2017-02-15,做者暫未實現該模式,略過]

  • queue:work 命令

    work 命令: 該命令將啓動一個 work 進程來處理消息隊列。

    php think queue:work --queue helloJobQueue
  • queue:listen 命令

    listen 命令: 該命令將會建立一個 listen 父進程 ,而後由父進程經過 proc_open(‘php think queue:work’) 的方式來建立一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。

    php think queue:listen --queue helloJobQueue

2.2 命令行參數

  • Work 模式

    php think queue:work \
    --daemon            //是否循環執行,若是不加該參數,則該命令處理完下一個消息就退出
    --queue  helloJobQueue  //要處理的隊列的名稱
    --delay  0 \        //若是本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認爲0
    --force  \ //系統處於維護狀態時是否仍然處理任務,並未找到相關說明 --memory 128 \ //該進程容許使用的內存上限,以 M 爲單位 --sleep 3 \ //若是隊列中無任務,則sleep多少秒後從新檢查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //若是任務已經超過嘗試次數上限,則觸發‘任務嘗試次數超限’事件,默認爲0
  • Listen 模式

    php think queue:listen \
    --queue  helloJobQueue \   //監聽的隊列的名稱
    --delay  0 \ //若是本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認爲0 --memory 128 \ //該進程容許使用的內存上限,以 M 爲單位 --sleep 3 \ //若是隊列中無任務,則多長時間後從新檢查,daemon模式下有效 --tries 0 \ //若是任務已經超太重發次數上限,則進入失敗處理邏輯,默認爲0 --timeout 60 //建立的work子進程的容許執行的最長時間,以秒爲單位

    能夠看到 listen 模式下,不包含 --deamon 參數,緣由下面會說明

2.3 work 模式和 listen 模式的區別

二者均可以用於處理消息隊列中的任務

區別在於:

  • 2.3.1 執行原理不一樣

    • work 命令是單進程的處理模式。

      按照是否設置了 --daemon 參數,work命令又可分爲單次執行和循環執行兩種模式。

      • 單次執行:不添加 --daemon參數,該模式下,work進程在處理完下一個消息後直接結束當前進程。當不存在新消息時,會sleep一段時間而後退出。
      • 循環執行:添加了 --daemon參數,該模式下,work進程會循環地處理隊列中的消息,直到內存超出參數配置才結束進程。當不存在新消息時,會在每次循環中sleep一段時間。
    • listen 命令是 父進程 + 子進程 的處理模式。

      listen命令所在的父進程會建立一個單次執行模式的work子進程,並經過該work子進程來處理隊列中的下一個消息,當這個work子進程退出以後,listen命令所在的父進程會監聽到該子進程的退出信號,並從新建立一個新的單次執行的work子進程

  • 2.3.2 退出時機不一樣

    • work 命令的退出時機在上面的執行原理部分已敘述,此處再也不重複
    • listen 命令中,listen所在的父進程正常狀況會一直運行,除非遇到下面兩種狀況:
      • 建立的某個work子進程的執行時間超過了 listen命令行中的--timeout 參數配置,此時work子進程會被強制結束,listen所在的父進程也會拋出一個 ProcessTimeoutException 異常並退出。開發者能夠選擇捕獲該異常,讓父進程繼續執行,也能夠選擇經過 supervisor 等監控軟件重啓一個新的listen命令。
      • listen 命令所在的父進程因某種緣由存在內存泄露,則當父進程自己佔用的內存超過了命令行中的 --memory 參數配置時,父子進程均會退出。正常狀況下,listen進程自己佔用的內存是穩定不變的。
  • 2.3.3 性能不一樣

    • work 命令是在腳本內部作循環,框架腳本在命令執行的初期就已加載完畢;

    • 而listen模式則是處理完一個任務以後新開一個work進程,此時會從新加載框架腳本。

      所以: work 模式的性能會比listen模式高。

      注意:當代碼有更新時,work 模式下須要手動去執行 php think queue:restart 命令重啓隊列來使改動生效;而listen 模式會自動生效,無需其餘操做。

  • 2.3.4 超時控制能力

    • work 模式本質上既不能控制進程自身的運行時間,也沒法限制執行中的任務的執行時間。

      舉例來講,假如你在某次上線以後,在上文中的 \application\index\job\Hello.php 消費者的fire方法中添加了一段死循環 :

      public function fire(){  while(true){ //死循環  $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n");  sleep(1);  } } 

      那麼這個循環將永遠不能中止,直到任務所在的進程超過內存限制或者由管理員手動結束。這個過程不會有任何的告警。更嚴重的是,若是你配置了expire ,那麼這個死循環的任務可能會污染到一樣處理 helloJobQueue 隊列的其餘work進程,最後好幾個work進程將被卡死在這段死循環中。詳情後文會說明。

      work 模式下的超時控制能力,實際上應該理解爲 多個work 進程配合下的過時任務重發能力。

    • 而 listen命令能夠限制其建立的work子進程的超時時間。

      listen 命令可經過 --timeout 參數限制work子進程容許運行的最長時間,超過該時間限制仍未結束的子進程會被強制結束;

    • 這裏有必要補充一下 expire 和 timeout 之間的區別:

      • expire 在配置文件中設置,timeout 在 listen命令 的命令行參數中設置,並且,expire 和 timeout 是兩個不一樣層次上的概念:

      • expire 是指任務的過時時間。這個時間是全局的,影響到全部的work進程。(無論是獨立的work命令仍是 listen 模式下建立的的work子進程) 。expire 針對的對象是 任務。

      • timeout 是指work子進程的超時時間。這個時間只對當前執行的listen 命令有效。timeout 針對的對象是 work子進程。

  • 2.3.5 使用場景不一樣

    根據上面的介紹,能夠看到,

    work 命令的適用場景是:

    • 任務數量較多
    • 性能要求較高
    • 任務的執行時間較短
    • 消費者類中不存在死循環,sleep() ,exit() ,die() 等容易致使bug的邏輯

    listen命令的適用場景是:

    • 任務數量較少
    • 任務的執行時間較長(如生成大型的excel報表等),
    • 任務的執行時間須要有嚴格限制

2.4 消息隊列的開始,中止與重啓

  • 開始一個消息隊列:

    php think queue:work
  • 中止全部的消息隊列:

    php think queue:restart
  • 重啓全部的消息隊列:

    php think queue:restart 
    php think queue:work 

2.5 多模塊,多任務的處理

  • 多模塊

    單模塊項目推薦使用 app\job 做爲任務類的命名空間

    多模塊項目可用使用 app\module\job 做爲任務類的命名空間 也能夠放在任意能夠自動加載到的地方

  • 多任務

    若是一個任務類裏有多個小任務的話,在發佈任務時,須要用 任務的類名@方法名 如 app\lib\job\Job2@task1app\lib\job\Job2@task2

    注意:命令行中的 --queue 參數不支持@解析

    多任務例子:

    • 在 \application\index\controller\JobTest.php 控制器中,添加 actionWithMultiTask()方法:
    public function actionWithMultiTask(){   $taskType = $_GET['taskType'];  switch ($whichTask) {  case 'taskA':  $jobHandlerClassName = 'application\index\job\MultiTask@taskA';  $jobDataArr = ['a' => '1'];  $jobQueueName = "multiTaskJobQueue";  break;  case 'taskB':  $jobHandlerClassName = 'application\index\job\MultiTask@taskB';  $jobDataArr = ['b' => '2'];  $jobQueueName = "multiTaskJobQueue";  break;  default:  break;  }   $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);  if ($isPushed !== false) {  echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");  }else{  throw new Exception("push a new $taskType of MultiTask Job Failed!");  } }
    • 新增 \application\index\job\MultiTask.php 消費者類,並編寫其 taskA() 和 taskB()方法
    <?php /**  * 文件路徑: \application\index\job\MultiTask.php  * 這是一個消費者類,用於處理 multiTaskJobQueue 隊列中的任務  */ namespace application\index\job;  use think\queue\Job;  class MultiTask {   public function taskA(Job $job,$data){   $isJobDone = $this->_doTaskA($data);   if ($isJobDone) {  $job->delete();  print("Info: TaskA of Job MultiTask has been done and deleted"."\n");  }else{  if ($job->attempts() > 3) {  $job->delete();  }  }  }   public function taskB(Job $job,$data){   $isJobDone = $this->_doTaskA($data);   if ($isJobDone) {  $job->delete();  print("Info: TaskB of Job MultiTask has been done and deleted"."\n");  }else{  if ($job->attempts() > 2) {  $job->release();  }  }  }   private function _doTaskA($data) {  print("Info: doing TaskA of Job MultiTask "."\n");  return true;  }   private function _doTaskB($data) {  print("Info: doing TaskB of Job MultiTask "."\n");  return true;  }

2.6 消息的延遲執行與定時執行

延遲執行,相對於即時執行,是用來限制某個任務的最先可執行時刻。在到達該時刻以前,該任務會被跳過。

能夠利用該功能實現定時任務。

使用方式:

  • 在生產者業務代碼中:
// 即時執行 $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName); // 延遲 2 秒執行 $isPushed = Queue::later( 2, $jobHandlerClassName, $jobDataArr, $jobQueueName); // 延遲到 2017-02-18 01:01:01 時刻執行 $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now'); $isPushed = Queue::later($time2wait,$jobHandlerClassName, $jobDataArr, $jobQueueName);
  • 在消費者類中:
// 重發,即時執行 $job->release(); // 重發,延遲 2 秒執行 $job->release(2); // 延遲到 2017-02-18 01:01:01 時刻執行 $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now'); $job->release($time2wait);
  • 在命令行中:
//若是消費者類的fire()方法拋出了異常且任務未被刪除時,將自動重發該任務,重發時,會設置其下次執行前延遲多少秒,默認爲0
php think queue:work --delay 3  

2.7 消息的重發

thinkphp-queue 中,消息的重發時機有3種:

  • 2.7.1 在消費者類中手動重發:
if( $isJobDone === false){  $job->release(); }
  • 2.7.2 work進程自動重發,需同時知足如下兩個條件
    • 消費者類的 fire() 方法拋出了異常
    • 任務未被刪除
  • 2.7.3 當配置了 expire 不爲 null 時,work 進程內部每次查詢可用任務以前,會先自動重發已過時的任務。

補充:

在database 模式下,2.7.1 和 2.7.2 中的重發邏輯是先刪除原來的任務,而後插入一個新的任務。2.7.3 中的重發時機是直接更新原任務。

而在redis 模式下,3種重發都是先刪除再插入。

無論是哪一種重發方式,重發以後,任務的已嘗試次數會在原來的基礎上 +1 。

此外,消費者類中須要注意,若是 fire() 方法中可能拋出異常,那麼

  • 若是不須要自動重發的話, 請在拋出異常以前將任務刪除 $job->delete() ,以避免產生bug。
  • 若是須要自動重發的話,請直接拋出異常,不要在 fire() 方法中又手動使用 $job->release() , 這樣會致使該任務被重發兩次,產生兩個同樣的新任務。

2.8 任務的失敗回調及告警

當同時知足如下條件時,將觸發任務失敗回調:

  • 命令行的 --tries 參數的值大於0
  • 任務的已嘗試次數大於 命令行的 --tries 參數
  • 開發者添加了 queue_failed 事件標籤及其對應的回調代碼
  • 消費者類中定義了 failed() 方法,用於接收任務失敗的通知

注意, queue_failed 標籤須要在安裝了 thinkphp-queue 以後 手動 去 \application\tags.php 文件中添加。

注意:該版本有bug,若想實現失敗任務回調功能,須要先修改位於 think-queue\src\queue\Worker.php 中的 logFailedJob方法 , 修改方式以下:

/**  * Log a failed job into storage.  * @param \Think\Queue\Job $job  * @return array  */  protected function logFailedJob(Job $job)  {  // 將原來的 queue.failed' 修改成 'queue_failed' 才能夠觸發任務失敗回調  if (Hook::listen('queue.failed', $job, null, true)) {  $job->delete();  $job->failed();  }   return ['job' => $job, 'failed' => true];  } 

首先,咱們添加 queue_failed 事件標籤, 及其對應的回調方法

// 文件路徑: \application\tags.php // 應用行爲擴展定義文件 return [  // 應用初始化  'app_init' => [],  // 應用開始  'app_begin' => [],  // 模塊初始化  'module_init' => [],  // 操做開始執行  'action_begin' => [],  // 視圖內容過濾  'view_filter' => [],  // 日誌寫入  'log_write' => [],  // 應用結束  'app_end' => [],   // 任務失敗統一回調,有四種定義方式  'queue_failed'=> [   // 數組形式,[ 'ClassName' , 'methodName']  ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']   // 字符串(靜態方法),'StaicClassName::methodName'  // 'MyQueueFailedLogger::logAllFailedQueues'   // 字符串(對象方法),'ClassName',此時需在對應的ClassName類中添加一個名爲 queueFailed 的方法  // 'application\\behavior\\MyQueueFailedLogger'   // 閉包形式  /*  function( &$jobObject , $extra){  // var_dump($jobObject);  return true;  }  */  ] ];

這裏,咱們選擇數組形式的回調方式,新增 \application\behavior\MyQueueFailedLogger 類,添加一個 logAllFailedQueues() 方法

<?php /**  * 文件路徑: \application\behavior\MyQueueFailedLogger.php  * 這是一個行爲類,用於處理全部的消息隊列中的任務失敗回調  */  namespace application\behavior;   class MyQueueFailedLogger {   const should_run_hook_callback = true;   /**  * @param $jobObject \think\queue\Job //任務對象,保存了該任務的執行狀況和業務數據  * @return bool true //是否須要刪除任務並觸發其failed() 方法  */  public function logAllFailedQueues(&$jobObject){   $failedJobLog = [  'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello'  'queueName' => $jobObject->getQueue(), // 'helloJobQueue'  'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }'  'attempts' => $jobObject->attempts(), // 3  ];  var_export(json_encode($failedJobLog,true));   // $jobObject->release(); //重發任務  //$jobObject->delete(); //刪除任務  //$jobObject->failed(); //通知消費者類任務執行失敗   return self::should_run_hook_callback;  } }

須要注意該回調方法的返回值:

  • 返回 true 時,系統會自動刪除該任務,而且自動調用消費者類中的 failed() 方法
  • 返回 false 時,系統不會自動刪除該任務,也不會自動調用消費者類中的 failed() 方法,須要開發者另行處理失敗任務的刪除和通知。

最後,在消費者類中,添加 failed() 方法

/**  * 文件路徑: \application\index\job\HelloJob.php  */  /**  * 該方法用於接收任務執行失敗的通知,你能夠發送郵件給相應的負責人員  * @param $jobData string|array|... //發佈任務時傳遞的 jobData 數據  */ public function failed($jobData){  send_mail_to_somebody() ;   print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n"; }

這樣,就能夠作到任務失敗的記錄與告警

2.9 處理過時的任務

過時這個概念用文字比較難描述清楚,建議先看一下 深刻理解 中 3.4 消息處理的詳細流程圖

三 深刻理解

3.1 thinkphp-queue 中消息與隊列的保存方式

  • Redis

    在 Redis 中,每個 隊列 都三個key 與之對應 ,以 helloJobQueue 隊列舉例,其在redis 中的保存方式爲:

    key名 類型 說明
    queues:helloJobQueue List , 列表 待執行的任務列表
    queues:helloJobQueue:delayed Sorted Set,有序集合 延遲執行和定時執行的任務集合
    queues:helloJobQueue:reserved Sorted Set,有序集合 執行中的任務集合

    使用的:分隔符, 只是用來表示相關key的關聯性。自己沒有特殊含義。使用分隔符是一種常見的組織key的方式。

    其中,在queues:helloJobQueue 列表中,每一個元素的形式以下:

    redis中的隊列-queue

    在 queues:helloJobQueue:delayed 和 queues:helloJobQueue:delayed 有序集合中,每一個元素的形式以下:

    redis中的隊列-queue-reserved

    能夠看到,在有序集合中,每一個元素表明一個任務,該元素的 Score 爲該任務的入隊時間戳,任務的 value 爲json 格式,保存了任務的執行狀況和業務數據。將value decode 爲數組後形式以下:

    [
     'job' => 'application\\index\\job\\Hello' , // jobHandlerClassName,消費者類的類名  'data' => [ // 生產者傳入的業務數據  'time' => '2017-02-18 16:20:10',  'data' => 'I have 648 apples'  ],  'id' => '77IasdasadIasdadadadKL8t', // 一個隨機的32位字符串  'attempts' => 2 // 任務的已嘗試次數 ]

    redis驅動下,爲了實現任務的延遲執行和過時重發,任務將在這三個key中來回轉移,詳情可見 3.5

  • Database

    在 Database 中,每一個任務對應到表中的一行,queue 字段用來區分不一樣的隊列。

    表的字段結構以下:

    數據庫字段說明

    其中,payLoad 字段保存了消息的執行者和業務數據,payLoad 字段採用 json 格式的字符串來保存消息,將其 decode 爲數組後形式以下:

    [
     'job' => 'application\\index\\job\\Hello', // jobHandlerClassName,消費者類的類名  'data' => string|array|integer|object // 生產者傳入的業務數據 ]

3.2 thinkphp-queue 的目錄結構和類關係圖

thinkphp-queue的文件目錄

這些類構成了消息隊列中的幾個角色:

角色 類名 說明
命令行 Command + Worker 負責解析命令行參數,控制隊列的啓動,重啓
驅動 Queue + Connector 負責隊列的建立,以及消息的入隊,出隊等操做
任務 Job 用於將消息轉化爲一個任務對象,供消費者使用
生產者 業務代碼 負責消息的建立與發佈
消費者 業務代碼 負責任務的接收與執行

各個類之間的關係圖以下:

thinkphp-queue類關係圖

3.3 Deamon模式的執行流程

Daemon模式與非daemon模式狀態圖

3.4 Database模式下消息處理的詳細流程

下圖中,展現了database 模式下消息處理的詳細流程,redis 驅動下大致相似

Database模式下消息獲取和執行的具體流程

3.5 redis 驅動下的任務重發細節

在redis驅動下,爲了實現任務的延遲執行和過時重發,任務將在這三個key中來回轉移。

在3.4 Database模式下消息處理的消息流程中,咱們知道,若是配置的expire 不是null ,那麼 thinkphp-queue的work進程每次在獲取下一個可執行任務以前,會先嚐試重發全部過時的任務。而在redis驅動下,這個步驟則作了更多的事情,詳情以下:

  1. 從 queue:xxx:delayed 的key中查詢出有哪些任務在當前時刻已經能夠開始執行,而後將這些任務轉移到 queue:xxx 的key的尾部。
  2. 從 queue:xxx:reserved 的key中查詢出有哪些任務在當前時刻已通過期,而後將這些任務轉移到 queue:xxx的key的尾部。
  3. 嘗試從 queue:xxx 的key的頭部取出一個任務,若是取出成功,那麼,將這個任務轉移到 queue:xxx:reserved 的key 的頭部,同時將這個任務實例化成任務對象,交給消費者去執行。

用圖來表示這個步驟的具體過程以下:

redis隊列中的過時任務重發步驟--執行前:

redis隊列中的過時任務重發步驟-執行前

redis隊列中的過時任務重發步驟--執行後:

redis隊列中的過時任務重發步驟--執行後

3.6 thinkphp-queue的性能

  • 測試環境 :

    虛擬機 Ubuntu 16.04 , PHP 7.1 ,TP5,Redis 3.2 , 雙核 I5 6400,3G 內存

  • 測試方式 :

    使用 Redis 驅動,在一個控制器中循環推送 40000 條消息到消息隊列;

    使用php think queue:work --daemon去消費這些消息,計算推送和消費各自所耗的時間。

  • 測試結果 :

    在最簡單的邏輯下,平均每秒中可推送8000個消息,平均每秒可消費200個消息。

注意:因爲在測試時,Host 機自己的cpu和內存長期100%,而且虛擬機中的各項服務並未專門調優,所以該測試結果並不具有參考性。

3.7 thinkphp-queue 的N種錯誤使用姿式

  • 3.7.1 在 消費者類的 fire() 方法中,忘記使用 $job->delete() 去刪除消息,這種狀況下,會產生一系列的bug:

    • 配置的 expire 爲 null , 則該任務被執行一次後會永遠留在消息隊列中,佔用消息隊列的空間 , 除非開發者另行處理。

    • 配置的 expire 不爲 null ,該任務在 expire 秒後被認爲是過時任務,並被消息隊列還原爲待執行狀態,在消息隊列的後面的循環中繼續被獲取,這時,若是

      • 命令行中的 --tries 參數爲0 或者未設置,那麼每隔 一段時間該任務就會被執行一次。
      • 命令行中的 --tries 參數 n 大於0 , 那麼當這個任務被誤執行的次數超過n 時,會由消息隊列嘗試去觸發失敗回調事件:
        • 若是開發者沒有編寫失敗處理的回調事件:那麼該任務仍然不會被刪除,每隔一段時間就會被執行一次。[這個可能屬於框架的Bug] ,
        • 若是編寫了失敗回調事件
          • 回調事件中刪除了任務,則這個任務被誤執行了 n 次。
          • 回調事件中未刪除任務,這時,若是:
            • 回調事件返回值是 false,那麼該任務仍然不會被刪除,每隔一段時間就會被執行一次
            • 回調事件返回值是 true, 那麼該任務會先被刪除,而後觸發消費者類的 failed() 方法,若是在 failed() 方法中設置了告警,那麼這個告警就是一次誤報。

    所以,在 使用 thinkphp-queue 時,請記得:

    • 任務完成後, 使用 $job->delete() 刪除任務
    • 在消費者類的 fire() 方法中,使用 $job->attempt() 檢查任務已執行次數,對於次數異常的,做相應的處理。
    • 在消費者類的 fire() 方法中,根據業務數據來判斷該任務是否已經執行過,以免該任務被重複執行。
    • 編寫失敗回調事件,將事件中失敗的任務及時通知給開發人員。
  • 3.7.2 使用了 queue:work --daemon ,但更新代碼後沒有使用 queue:restart 重啓 work 進程, 使得 work 進程中的代碼與最新的代碼不一樣,出現各類問題。

  • 3.7.3 使用了 queue:work --daemon ,可是消費者類的 fire() 方法中存在死循環,或 sleep(n) 等邏輯,致使消息隊列被堵塞;或者使用了 exit() , die() 這樣的邏輯,致使work進程直接終止 。

  • 3.7.4 配置的 expire 爲 null ,這時若是採用的是 Redis 驅動且使用了延遲功能,如 later(n) , release(n) 方法或者 --delay 參數不爲0 , 那麼將致使被延遲的任務永遠沒法處理。(這個可能屬於框架的Bug)

  • 3.7.5 配置的 expire 爲null ,但並無自行處理過時的任務,致使過時的任務得不處處理,且一直佔用消息隊列的空間。

  • 3.7.6 配置的 expire 不爲null ,但配置的 expire 時間過短,以致於 expire 時間 < 消費者的 fire() 方法所需時間 + 刪除該任務所需的時間 ,那麼任務將被誤認爲執行超時,從而被消息隊列還原爲待執行狀態。

  • 3.7.7 使用 Queue::push($jobHandlerClassName , $jobData, $jobQueueName ); 推送任務時,$jobData 中包含未序列化的對象。這時,在消費者端拿到的 $jobData 中拿到的是該對象的public 屬性的鍵值對數組。所以,須要在推送前手動序列化對象,在消費者端再手動反序列化還原爲對象。

四 拓展

4.1 隊列的穩定性和拓展性

  • 穩定性:無論是 listen 模式仍是 work 模式,都建議使用 supervisor 或者 自定義的cron 腳本,去定時檢查 work 進程是否正常
  • 拓展性: 當某個隊列的消費者不足時,再給這個隊列添加 work進程便可。

4.2 消息隊列的可視化管理工具

  • 隊列管理,隊列的列表,隊列的 work 進程數量控制,隊列的任務數量變化趨勢 //TBD
  • 任務管理,任務的列表,添加/撤回/查詢任務,修改任務的 執行者/執行時間/優先級/數據 等 //TBD

4.2 編寫自定義的 thinkphp-queue 驅動

//TBD

4.3 編寫消息隊列的單元測試

//TBD

4.4 與其餘PHP消息隊列庫的對比

TP5的消息隊列與Laravel的消息隊列比較類似,下面是與laravel 中的消息隊列的一些對比:

  thinkphp-queue (v1.1.2) laravel-queue (v5.3)
內置的驅動 Database,Redis,Sync,TopThink Database,Redis, Sync(在laravel中稱爲 null)。
Redis驅動要求 安裝redis的C擴展 安裝 predis 包 + LUA腳本
推送任務 容許推送 消費者類名,消費者對象 容許推送消費者類名,消費者對象,閉包
失敗任務處理 觸發失敗回調事件 (有Bug) 觸發失敗回調事件 + 移動任務到 failed_jobs表?
消息訂閱 subscribe 命令+ Topthink驅動(注:未實現/未提供) subscribe 命令 + 安裝IronMQ 驅動
刪除任務 消費者類中手動刪除 任務完成後自動刪除
推送到多個隊列 需本身實現 原生支持
延遲執行 支持 (有Bug) 支持
消息重發 支持 支持
檢查已執行次數 原生支持 需在消費者類中顯式 use 相關的 trait
執行方式 work 模式 + listen 模式 work 模式 + listen 模式
進程命令 開啓,中止,重啓 開啓,中止,重啓
任務命令 展現失敗任務列表,重試某個失敗任務,刪除某個失敗任務
支持的事件 失敗回調事件 失敗回調事件,支持消費前事件,消
相關文章
相關標籤/搜索