基於Redis的消息隊列php-resque

轉載:http://netstu.5iunix.net/archives/201305-835/php

最近的作一個短信羣發的項目,須要用到消息隊列。所以開始了我對消息隊列選型的漫長路.git

爲何選型會糾結呢,直接使用ActiveMQ,RabittMQ,Gearman等流行的消息隊列不就能夠了嗎? 在這個項目中,只有單臺服務器,並且我採用了redis來作系統緩存,同時開啓了php apc來緩存phalcon model Metadata.若是再開啓其餘進程,須要很合理的分配各個應用的資源,若是分配很差,極有可能產生浪費,同時還有可能形成應用性能低下.github

所以,爲了實現項目結構的簡單化,同時簡化維護,我就打算用Redis來實現,但短信羣發過程當中,必需要有優先級別的,直接使用Redis的 List顯然不是太合適,本着這一打算,我還專門google了一下,發現有人使用zset來模擬FIFO,實現LIST中的lpush,rpop操做。web

經過Google發現有一個基於Redis的比較不錯的消息隊列 Resque https://github.com/resque/resque 而此消息隊列則是使用ruby語言實現的.我更不想一個項目中使用多種語言了,幸虧,立刻又發現了基於resque的PHP實現, php-resque https://github.com/chrisboulton/php-resqueredis

Resque還包括不少其餘插件以及其餘語言的實現,詳細列表請看官方列表 https://github.com/resque/resque/wiki/Alternate-Implementationsjson

php-resque自己並無包括查看消息隊列status的用戶界面,但咱們能夠經過命令行查看,同時因爲php-resque的消息隊列實現和resque中命名一致,能夠安裝ruby的web界面來進行查看.bootstrap

我一直使用的是phpredis擴展. php-resque使用Redis Client是 Credis, https://github.com/colinmollenhour/credis ,而Credis的API徹底是基於phpredis來封裝的,即API同樣,前者是C擴展,後者是PHP實現.數組

促使我最終決定使用Redis及PHP-Resque的一個最要緣由是PHP-Resque內置可同時開啓多個進程,對於海量數據下發來講很是必要,同時不須要本身再寫代碼實現多進程或多線程了.緩存

下面具體介紹一下PHP-Resque的安裝與使用.ruby

PHP-Resque安裝

1.環境需求

  • PHP 5.3+
  • Redis 2.2+
  • Optional but Recommended: Composer

2.安裝Composer

$ curl -sS https://getcomposer.org/installer | php
$ mv composer.phar /usr/local/bin/composer

3.安裝PHP-Resque

git clone git://github.com/chrisboulton/php-resque.git
cd php-resque
composer install

php-resque的隊列實現

全部的消息隊列都差很少,有生產者,消費者,還有Task Server.只不過名稱不一樣而已.

生產者:接受用戶提交信息,組成消息隊列中的一個任務,並將其加入消息隊列服務中.

消費者:通常爲一個守護進程,從消息隊列服務器中取出任務,對任務作出相應的處理.PHP-Resque中的消息者爲守護進程+Job Class .即由守護進程從消息隊列中取出任務,具體的任務中在任務的生產過程當中已經指定了由哪一個Job Class來作具體的處理.

Task Server: 消息隊列服務器,本處指redis服務.

具體實現

下載安裝PHP-Resque後,在php-resque/demo目錄中,有相似以下幾個文件(我修改測試過,和原有目錄不太同樣了):

[root@myfc demo]# ll
total 52
-rw-r--r-- 1 root root  113 Apr 17 17:48 bad_job.php
-rw-r--r-- 1 root root  494 Apr 17 17:48 check_status.php
-rw-r--r-- 1 root root 1629 Apr 28 15:43 cli-bootstrap.php
-rw-r--r-- 1 root root  396 Mar 25 10:27 config.ini
-rw-r--r-- 1 root root    6 Apr 18 11:26 index.php
-rw-r--r-- 1 root root  680 Apr 17 17:48 init.php
-rw-r--r-- 1 root root  199 Apr 18 13:13 job.php
-rw-r--r-- 1 root root   78 Apr 17 17:48 long_job.php
-rw-r--r-- 1 root root  325 Apr 28 15:51 MtSend_Job.php
-rw-r--r-- 1 root root   97 Apr 18 12:05 php_error_jobFF.php
-rw-r--r-- 1 root root  382 Apr 18 11:43 queue.php
-rw-r--r-- 1 root root  160 Apr 28 18:09 resque.php
-rw-r--r-- 1 root root   80 Apr 18 13:22 Test_Job.php

生成任務(建立一個Job)

// Required if redis is located elsewhere
Resque::setBackend('localhost:6379');
 
$args = array(
    'name' => 'Chris'
);
Resque::enqueue('default', 'My_Job', $args);

其中第一行爲鏈接redis服務

第二行的數組是任務的內容,可隨意編寫.經過Resque::enqueue 序列化後加入到消息隊列中.

最後一行的第一個參數表示 消息隊列的名稱(可隨意標記,好比 email,log等),第二個參數表示取出任務後,由My_Job這個類來處理此條任務.

同時須要說明的是: 在PHP-Resque中也沒有數字標識的優化級,但能夠經過消息隊列名稱來實現優化級,具體下面會講到.

定義Job Class

class My_Job
{
    public function setUp()
    {
        // ... Set up environment for this job
    }
 
    public function perform()
    {
        // .. Run job
    }
 
    public function tearDown()
    {
        // ... Remove environment for this job
    }
}

從上面的代碼中能夠看出,這是一個很是簡單的PHP類文件, 方法perform是必須的,主要用來處理任務.其餘兩個方法無關緊要,根據你本身的應用環境看是否須要.

Demo目錄中的job.php (修改過)

<?php
class PHP_Job
{
        public function perform()
        {
                sleep(5);
                // fwrite(STDOUT, 'Hello!');
                $p = $this->args['name'];
                echo 'ttime '.$p;
                // print_r(json_decode($p));
                echo "string";
        }
}
?>

從上面的例子能夠看出,若是想在Job Class中讀取任務的具體內容,可直接使用 $this->args['name']來進行讀取.按上面的例子來講,它將輸出 Chris

測試消息隊列

(1)建立任務

[root@myfc demo]# cd php-resque/demo
[root@myfc demo]# php queue.php PHP_Job
Queued job bcd1cb6f39d72b9b786716f81b757370

queue.php代碼以下:

<?php
if(empty($argv[1])) {
        die('Specify the name of a job to add. e.g, php queue.php PHP_Job');
}
 
require __DIR__ . '/init.php';
date_default_timezone_set('GMT');
Resque::setBackend('127.0.0.1:6379');
 
$args = array(
        'time' => time(),
        'array' => array(
                'test' => 'test',
        ),
);
 
$jobId = Resque::enqueue('default', $argv[1], $args, true);
echo "Queued job ".$jobId."\n\n";

此時查看Redis,會發現多了三條數據

resque:queues,  default
resque:queue:default, {"class":"PHP_Job","args":[{"time":1368167779,"array":{"test":"test"}}],"id":"875234d18af92212a7d60056daeae910"}
resque:job:875234d18af92212a7d60056daeae910:status, {"status":1,"updated":1368167779,"started":1368167779}

注意: 「,」逗號前是鍵名,後面是鍵值

在開啓守護進程之前,請確保PHP_Job存在,PHP_Job爲PHP類名,而非PHP文件名. 若是PHP_Job不存在,則此條任務會處理失敗.(由於根本不存在處理它的文件)

(2)處理任務

在demo目錄中,守護進程是由目錄下的resque.php來進行,代碼以下:

<?php
date_default_timezone_set('GMT');
require 'bad_job.php';
require 'job.php';
require 'long_job.php';
require 'MtSend_Job.php';
 
require '../bin/resque';

Job Class須要放到此文件頭部引入,固然也可使用其餘方式引入. 上面代碼中job.php便是類PHP_Job文件

開啓守護進程參數說明:

在上面的代碼中,最後一行引入的../bin/resque 其實也是一個php cli程序,代碼以下:

#!/usr/bin/env php
<?php
 
// Find and initialize Composer
$files = array(
  __DIR__ . '/../../vendor/autoload.php',
  __DIR__ . '/../../../autoload.php',
  __DIR__ . '/../../../../autoload.php',
  __DIR__ . '/../vendor/autoload.php',
);
 
$found = false;
foreach ($files as $file) {
        if (file_exists($file)) {
                require_once $file;
                break;
        }
}
 
if (!class_exists('Composer\Autoload\ClassLoader', false)) {
        die(
                'You need to set up the project dependencies using the following commands:' . PHP_EOL .
                'curl -s http://getcomposer.org/installer | php' . PHP_EOL .
                'php composer.phar install' . PHP_EOL
        );
}
 
$QUEUE = getenv('QUEUE');
if(empty($QUEUE)) {
        die("Set QUEUE env var containing the list of queues to work.\n");
}
 
$REDIS_BACKEND = getenv('REDIS_BACKEND');
$REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB');
if(!empty($REDIS_BACKEND)) {
        if (empty($REDIS_BACKEND_DB))
                Resque::setBackend($REDIS_BACKEND);
        else
                Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB);
}
 
$logLevel = 0;
$LOGGING = getenv('LOGGING');
$VERBOSE = getenv('VERBOSE');
$VVERBOSE = getenv('VVERBOSE');
if(!empty($LOGGING) || !empty($VERBOSE)) {
        $logLevel = Resque_Worker::LOG_NORMAL;
}
else if(!empty($VVERBOSE)) {
        $logLevel = Resque_Worker::LOG_VERBOSE;
}
 
$APP_INCLUDE = getenv('APP_INCLUDE');
if($APP_INCLUDE) {
        if(!file_exists($APP_INCLUDE)) {
                die('APP_INCLUDE ('.$APP_INCLUDE.") does not exist.\n");
        }
 
        require_once $APP_INCLUDE;
}
 
$interval = 5;
$INTERVAL = getenv('INTERVAL');
if(!empty($INTERVAL)) {
        $interval = $INTERVAL;
}
 
$count = 1;
$COUNT = getenv('COUNT');
if(!empty($COUNT) && $COUNT > 1) {
        $count = $COUNT;
}
 
$PREFIX = getenv('PREFIX');
if(!empty($PREFIX)) {
    fwrite(STDOUT, '*** Prefix set to '.$PREFIX."\n");
    Resque_Redis::prefix($PREFIX);
}
 
if($count > 1) {
        for($i = 0; $i < $count; ++$i) {
                $pid = Resque::fork();
                if($pid == -1) {
                        die("Could not fork worker ".$i."\n");
                }
                // Child, start the worker
                else if(!$pid) {
                        $queues = explode(',', $QUEUE);
                        $worker = new Resque_Worker($queues);
                        $worker->logLevel = $logLevel;
                        fwrite(STDOUT, '*** Starting worker '.$worker."\n");
                        $worker->work($interval);
                        break;
                }
        }
}
// Start a single worker
else {
        $queues = explode(',', $QUEUE);
        $worker = new Resque_Worker($queues);
        $worker->logLevel = $logLevel;
 
        $PIDFILE = getenv('PIDFILE');
        if ($PIDFILE) {
                file_put_contents($PIDFILE, getmypid()) or
                        die('Could not write PID information to ' . $PIDFILE);
        }
 
        fwrite(STDOUT, '*** Starting worker '.$worker."\n");
        $worker->work($interval);
}

從上面的代碼能夠看出,在命令行啓動守護進程時,能夠傳遞一些環境變量.而 ../bin/resque經過getenv函數來獲取環境變量.

支持的環境變量有:

========================================================================================================

QUEUE – 這個是必要的,會決定 worker 要執行什麼任務,重要的在前,例如 QUEUE=default,notify,mail,log 。也能夠設定為 QUEUE=* 表示執行全部任務。

APP_INCLUDE – 這也能夠說是必要的,因為 Resque 的 Job 都是寫成PHP類,那 worker 執行的時候當然要把物件的檔案引入進來。能夠設成 APP_INCLUDE=require.php 再在 require.php 中引入全部 Job 的 Class 文件便可。

COUNT – 設定 worker 數量,預設是1 COUNT=5 。

REDIS_BACKEND – 設定 Redis 的 ip, port。若是沒設定,預設是連 localhost:6379 。

LOGGING, VERBOSE – 設定 log, VERBOSE=1 便可。

VVERBOSE – 比較詳細的 log, VVERBOSE=1 debug 的時候能夠開出來看。

INTERVAL – worker 檢查 queue 的間隔,預設是五秒 INTERVAL=5 。

PIDFILE – 若是你是開單 worker,能夠指定 PIDFILE 把 pid 寫入,例如 PIDFILE=/var/run/resque.pid 。

=============================================================================

啓動守護進程:

QUEUE=* APP_INCLUDE=require.php COUNT=5 VVERBOSE=1 php resque.php

QUEUE=* 代表處理全部的消息隊列,此時則沒有優化級別.若是須要優化級別,能夠按前後順序列出,如 QUEUE=mail1,mail2,mail3

APP_INCLUDE=require.php 文件中,能夠引入其餘的一些Job Class或其餘輔助類,固然也能夠直接在 demo/resque.php中引入

啓動守護進程後:

[root@myfc demo]# QUEUE=*  COUNT=5 VVERBOSE=1 php resque.php
#!/usr/bin/env php
*** Starting worker myfc:22143:*
*** Starting worker myfc:22144:*
** [07:25:09 2013-05-10] Registered signals
*** Starting worker myfc:22145:*
** [07:25:09 2013-05-10] Registered signals
** [07:25:09 2013-05-10] Registered signals
*** Starting worker myfc:22146:*
** [07:25:09 2013-05-10] Registered signals
*** Starting worker myfc:22148:*
** [07:25:09 2013-05-10] Registered signals
[root@myfc demo]# ** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Found job on default
** [07:25:09 2013-05-10] got (Job{default} | ID: 875234d18af92212a7d60056daeae910 | PHP_Job | [{"time":1368167779,"array":{"test":"test"}}])
** [07:25:09 2013-05-10] Forked 22163 at 2013-05-10 07:25:09
** [07:25:09 2013-05-10] Processing default since 2013-05-10 07:25:09
** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Sleeping for 5
** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Sleeping for 5
** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Sleeping for 5
** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Sleeping for 5
ttime 1368167779string** [07:25:14 2013-05-10] done (Job{default} | ID: 875234d18af92212a7d60056daeae910 | PHP_Job | [{"time":1368167779,"array":{"test":"test"}}])
** [07:26:29 2013-05-10] Checking default
** [07:26:29 2013-05-10] Checking default
** [07:26:29 2013-05-10] Sleeping for 5
** [07:26:29 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Sleeping for 5

能夠看出,已經成功取出任務,並使用PHP_Job處理.而後守護進程一直在運行,等待處理新的任務

此時看一下進程文件:

[root@myfc sop]# ps -aux|grep php
root     22143  0.0  0.0 310700 11496 pts/2    S    15:25   0:00 php resque.php
root     22144  0.0  0.0 310700 11580 pts/2    S    15:25   0:00 php resque.php
root     22145  0.0  0.0 310700 11496 pts/2    S    15:25   0:00 php resque.php
root     22146  0.0  0.0 310700 11496 pts/2    S    15:25   0:00 php resque.php
root     22148  0.0  0.0 310700 11496 pts/2    S    15:25   0:00 php resque.php

查看任務的狀態

在生成任務的時候,使用 $token = Resque::enqueue('default', 'My_Job', $args, true); 生成一個任務,第四個參數設置爲true,會返回任務的JobID,經過JobID能夠查看任務的狀態.

$status = new Resque_Job_Status($token);
echo $status->get(); // Outputs the status

類文件Resque_Job_Status中定義了全部類型任務狀態

Resque_Job_Status::STATUS_WAITING - 等待被取出處理
Resque_Job_Status::STATUS_RUNNING - 任務正在被處理
Resque_Job_Status::STATUS_FAILED - 任務處理失敗
Resque_Job_Status::STATUS_COMPLETE - 任務處理完成

事件觸發接口 暫時沒用到,可查看官方文檔

原本想把選型,使用,以及整合Phalcon寫到一篇文章中,看來太長,仍是分開吧.抽空把集成到Phalcon以及消息隊列實際應用寫一下.

 

參考文檔:

http://avnpc.com/pages/run-background-task-by-php-resque

http://blog.hsatac.net/2012/01/php-resque-introduction/

http://kamisama.me/2012/10/12/background-jobs-with-php-and-resque-part-4-managing-worker/

https://github.com/kamisama/Fresque

http://stackoverflow.com/questions/11814445/what-is-the-proper-way-to-setup-and-use-php-resque

相關文章
相關標籤/搜索