TP5 中的redis 隊列

首先咱們看一下本身的TP5的框架中的  TP5\vendor\topthink ,這個文件中有沒有think-queue這個文件夾,若是沒有請安裝,php

安裝這個是要用到Composer的若是沒有安裝composer,請安裝Composermysql

1.$ curl -sS https://getcomposer.org/installer | php

2.$ mv composer.phar /usr/local/bin/composer
Linux上安裝 think-queue ,請先進入到框架的根目錄再運行redis

composer require topthink/think-queue
這個時候再去看就會有 think-queue 這個文件夾了,肯定一下看看是否安裝成功,運行sql

php think queue:work -h
能出現如下 結果 就表示think-queue的 安裝好了thinkphp

 

 

配置
配置文件位於 application/extra/queue.phpshell

公共配置
 數據庫

<?php
return [
'connector' => 'Redis', // Redis 驅動
'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null
'default' => 'default', // 默認的隊列名稱
'host' => '127.0.0.1', // redis 主機ip
'port' => 6379, // redis 端口
'password' => '', // redis 密碼
'select' => 0, // 使用哪個 db,默認爲 db0
'timeout' => 0, // redis鏈接的超時時間
'persistent' => false, // 是不是長鏈接

// 'connector' => 'Database', // 數據庫驅動
// 'expire' => 60, // 任務的過時時間,默認爲60秒; 若要禁用,則設置爲 null
// 'default' => 'default', // 默認的隊列名稱
// 'table' => 'jobs', // 存儲消息的表名,不帶前綴
// 'dsn' => [],

// 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平臺 ,本文不做介紹
// 'token' => '',
// 'project_id' => '',
// 'protocol' => 'https',
// 'host' => 'qns.topthink.com',
// 'port' => 443,
// 'api_version' => 1,
// 'max_retries' => 3,
// 'default' => 'default',

// 'connector' => 'Sync', // Sync 驅動,該驅動的實際做用是取消消息隊列,還原爲同步執行
];
 json

1.4 消息的建立與推送api

咱們在業務控制器中建立一個新的消息,並推送到 helloJobQueue 隊列瀏覽器

新增 \application\index\controller\JobTest.php 控制器,在該控制器中添加 actionWithHelloJob 方法

<?php
/**
* 文件路徑: \application\index\controller\JobTest.php
* 該控制器的業務代碼中藉助了thinkphp-queue 庫,將一個消息推送到消息隊列
*/
namespace app\index\controller;
use think\Exception;

use think\Queue;

class JobTest {
/**
* 一個使用了隊列的 action
*/
public function actionWithHelloJob(){

// 1.當前任務將由哪一個類來負責處理。
// 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法
$jobHandlerClassName = 'application\index\job\Hello';
// 2.當前任務歸屬的隊列名稱,若是爲新隊列,會自動建立
$jobQueueName = "helloJobQueue";
// 3.當前任務所需的業務數據 . 不能爲 resource 類型,其餘類型最終將轉化爲json形式的字符串
// ( jobData 爲對象時,須要在先在此處手動序列化,不然只存儲其public屬性的鍵值對)
$jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;
// 4.將該任務推送到消息隊列,等待對應的消費者去執行
$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
// database 驅動時,返回值爲 1|false ; redis 驅動時,返回值爲 隨機字符串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
}else{
echo 'Oops, something went wrong.';
}
}
}
注意: 在這個例子當中,咱們是手動指定的 $jobHandlerClassName ,更合理的作法是先定義好消息名稱與消費者類名的映射關係,而後由某個能夠獲取該映射關係的類來推送這個消息。這樣,生產者只須要知道消息的名稱,而無需指定哪一個消費者類來處理。

除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );這種方式以外,還能夠直接傳入 Queue::push( $jobHandlerObject ,null , $jobQueueName ); 這時,須要在 $jobHandlerObject 中定義一個 handle() 方法,消息隊列在執行到該任務時會自動反序列化該對象,並調用其 handle()方法。 該方式的缺點是沒法傳入自定義數據。

1.5 消息的消費與刪除

編寫 Hello 消費者類,用於處理 helloJobQueue 隊列中的任務

新增 \application\index\job\Hello.php 消費者類,並編寫其 fire() 方法

<?php
/**
* 文件路徑: \application\index\job\Hello.php
* 這是一個消費者類,用於處理 helloJobQueue 隊列中的任務
*/
namespace app\index\job;

use think\queue\Job;

class Hello {

/**
* fire方法是消息隊列默認調用的方法
* @param Job $job 當前的任務對象
* @param array|mixed $data 發佈任務時自定義的數據
*/
public function fire(Job $job,$data){
         // 若有必要,能夠根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.
         $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
         if(!isJobStillNeedToBeDone){
$job->delete();
return;
}
       
$isJobDone = $this->doHelloJob($data);

if ($isJobDone) {
//若是任務執行成功, 記得刪除任務
$job->delete();
print("<info>Hello Job has been done and deleted"."</info>\n");
}else{
if ($job->attempts() > 3) {
//經過這個方法能夠檢查這個任務已經重試了幾回了
print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
$job->delete();
// 也能夠從新發布這個任務
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay爲延遲時間,表示該任務延遲2秒後再執行
}
}
}

/**
      * 有些消息在到達消費者時,可能已經再也不須要執行了
* @param array|mixed $data 發佈任務時自定義的數據
* @return boolean 任務執行的結果
      */
     private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}

/**
* 根據消息中的數據進行實際的業務處理
* @param array|mixed $data 發佈任務時自定義的數據
* @return boolean 任務執行的結果
*/
private function doHelloJob($data) {
// 根據消息中的數據進行實際的業務處理...

print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
print("<info>Hello Job is Done!"."</info> \n");

return true;
}
}

至此,全部的代碼都已準備完畢,在運行消息隊列以前,咱們先看一下如今的目錄結構:

 

1.6 發佈任務

在瀏覽器中訪問 http://your.project.domain/index/job_test/actionWithHelloJob ,能夠看到消息推送成功。

 

這個時候去Linux中連接redis,查看redis的隊列任務,就能夠看到有數據在裏面

鏈接redis:/usr/local/redis/bin/redis-cli -h 12.131.12.12 -p 6379

redis對列的命令,顯示helloJobQueue對列中的數據:LRANGE queues:helloJobQueue 0 -1  

顯示對列中有幾條數據:Llen queues:helloJobQueue

 

看到這樣就說明已經加入到隊列了,截圖是加入了好多條

 

 

1.7 處理任務

切換當前終端窗口的目錄到項目根目錄下,執行

php think queue:work --queue helloJobQueue
能夠看到執行的結果相似以下:

 

​因爲php think queue:work --queue helloJobQueue這個命令只能在TP5框架的根目錄才能運行成功,因此,shell腳本要先cd到框架的根目錄,具體見下面的shell腳本截圖

至此,咱們成功地經歷了一個消息的 建立 -> 推送 -> 消費 -> 刪除 的基本流程

可是!!! 可是!!! 可是!!!,重要的事說3遍

雖然如今是能夠將消息的建立 -> 推送 -> 消費 -> 刪除 的基本流程所有跑通,可是每次執行 php think queue:work --queue helloJobQueue 這個命令都是進行了一次,也就是說,在對列 helloJobQueue 中有5條要處理的數據,每次執行 php think queue:work --queue helloJobQueue 都是隻執行了一條數據,還有4條數據沒有處理,咱們要的是執行一次能夠直接將對列中的數據所有處理掉,因而,咱們想到定時任務去處理

首先咱們寫兩個shell腳本

1.monitorHandleQueue.sh,做用是檢查隊列的進程是否在運行


pid=$(ps -ef| grep handleQueue |grep -v grep | awk ' NR==1 {print $2}')

if [ -z $pid ]
then
sh /home/wwwroot/default/www/thinkphp5/handleQueue.sh &>/dev/null 2>&1
fi
mysqld是進程名稱

檢查進程是否存在,若是不存在啓動handleQueue.sh腳本,注意:monitorHandleQueue.sh腳本中的啓動handleQueue.sh的路徑寫本身的,NR==1表示只取第一個進程,|grep -v grep 過濾掉本身的進程

2.handleQueue.sh 腳本

cd /home/wwwroot/default/www/thinkphp5
while [ 2 > 0 ]
do
len=`/usr/local/redis/bin/redis-cli -h 1.1.1.1 -p 6379 Llen queues:helloJobQueue`
if [ $((len + 0 )) -gt 0 ];then
php think queue:work --queue helloJobQueue
else
sleep 3
php think queue:work --queue helloJobQueue
fi
done
此腳本中的cd /home/wwwroot/default/www/thinkphp5,必定要切換到框架的根目錄,解釋一下handleQueue.sh腳本的邏輯:先切換到框架的根目錄,while判斷2大於0爲真,因此會一直執行,鏈接到redis,獲取隊列的長度,if判斷,若是隊列的長度大於0直接執行隊列,不然就停3秒再執行隊列,很簡單,寫了很長時間,還有一點要注意,shell腳本最好不要在編輯器編輯,直接在Linux上編輯,由於若是在編輯器上編輯上傳到Linux上會產生意想不到的問題(我在這裏耽誤了很長時間),找不到問題所在就直接在Linux上編寫好了,省的麻煩

上面的shell腳本是我第一次寫,碰到了不少問題

1.[ 2 > 0 ]格式爲[空格判斷表達式空格]

2.len=`/usr/local/redis/bin/redis-cli -h 47.101.54.26 -p 6379 Llen queues:helloJobQueue`,等於號的左右兩邊不能有空格,反引號 `` 不知道怎麼輸出,在1是我左邊的那個鍵

3. if 的格式,if [ $((len + 0 )) -gt 0 ];then 變量大於0,大於號用 -gt 表示

建立定時任務

*/1 * * * *  /home/wwwroot/default/www/thinkphp5/monitorHandleQueue.sh &>/dev/null 2>&1

好了,至此一個循環請求隊列就寫好了

 

 

 

參考:https://blog.csdn.net/will5451/article/details/80434174

參考:https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645

原文:https://blog.csdn.net/dabao87/article/details/82414839 

相關文章
相關標籤/搜索