轉載:http://netstu.5iunix.net/archives/201305-835/php
最近的作一個短信羣發的項目,須要用到消息隊列。所以開始了我對消息隊列選型的漫長路.git
爲何選型會糾結呢,直接使用ActiveMQ,RabittMQ,Gearman等流行的消息隊列不就能夠了嗎? 在這個項目中,只有單臺服務器,並且我採用了redis來作系統緩存,同時開啓了php apc來緩存phalcon model Metadata.若是再開啓其餘進程,須要很合理的分配各個應用的資源,若是分配很差,極有可能產生浪費,同時還有可能形成應用性能低下.github
所以,爲了實現項目結構的簡單化,同時簡化維護,我就打算用Redis來實現,但短信羣發過程當中,必需要有優先級別的,直接使用Redis的 List顯然不是太合適,本着這一打算,我還專門google了一下,發現有人使用zset來模擬FIFO,實現LIST中的lpush,rpop操做。web
經過Google發現有一個基於Redis的比較不錯的消息隊列 Resque https://github.com/resque/resque 而此消息隊列則是使用ruby語言實現的.我更不想一個項目中使用多種語言了,幸虧,立刻又發現了基於resque的PHP實現, php-resque https://github.com/chrisboulton/php-resqueredis
Resque還包括不少其餘插件以及其餘語言的實現,詳細列表請看官方列表 https://github.com/resque/resque/wiki/Alternate-Implementationsjson
php-resque自己並無包括查看消息隊列status的用戶界面,但咱們能夠經過命令行查看,同時因爲php-resque的消息隊列實現和resque中命名一致,能夠安裝ruby的web界面來進行查看.bootstrap
我一直使用的是phpredis擴展. php-resque使用Redis Client是 Credis, https://github.com/colinmollenhour/credis ,而Credis的API徹底是基於phpredis來封裝的,即API同樣,前者是C擴展,後者是PHP實現.數組
促使我最終決定使用Redis及PHP-Resque的一個最要緣由是PHP-Resque內置可同時開啓多個進程,對於海量數據下發來講很是必要,同時不須要本身再寫代碼實現多進程或多線程了.緩存
下面具體介紹一下PHP-Resque的安裝與使用.ruby
PHP-Resque安裝
1.環境需求
2.安裝Composer
$ curl -sS https://getcomposer.org/installer | php $ mv composer.phar /usr/local/bin/composer
3.安裝PHP-Resque
git clone git://github.com/chrisboulton/php-resque.git cd php-resque composer install
php-resque的隊列實現
全部的消息隊列都差很少,有生產者,消費者,還有Task Server.只不過名稱不一樣而已.
生產者:接受用戶提交信息,組成消息隊列中的一個任務,並將其加入消息隊列服務中.
消費者:通常爲一個守護進程,從消息隊列服務器中取出任務,對任務作出相應的處理.PHP-Resque中的消息者爲守護進程+Job Class .即由守護進程從消息隊列中取出任務,具體的任務中在任務的生產過程當中已經指定了由哪一個Job Class來作具體的處理.
Task Server: 消息隊列服務器,本處指redis服務.
具體實現
下載安裝PHP-Resque後,在php-resque/demo目錄中,有相似以下幾個文件(我修改測試過,和原有目錄不太同樣了):
[root@myfc demo]# ll total 52 -rw-r--r-- 1 root root 113 Apr 17 17:48 bad_job.php -rw-r--r-- 1 root root 494 Apr 17 17:48 check_status.php -rw-r--r-- 1 root root 1629 Apr 28 15:43 cli-bootstrap.php -rw-r--r-- 1 root root 396 Mar 25 10:27 config.ini -rw-r--r-- 1 root root 6 Apr 18 11:26 index.php -rw-r--r-- 1 root root 680 Apr 17 17:48 init.php -rw-r--r-- 1 root root 199 Apr 18 13:13 job.php -rw-r--r-- 1 root root 78 Apr 17 17:48 long_job.php -rw-r--r-- 1 root root 325 Apr 28 15:51 MtSend_Job.php -rw-r--r-- 1 root root 97 Apr 18 12:05 php_error_jobFF.php -rw-r--r-- 1 root root 382 Apr 18 11:43 queue.php -rw-r--r-- 1 root root 160 Apr 28 18:09 resque.php -rw-r--r-- 1 root root 80 Apr 18 13:22 Test_Job.php
生成任務(建立一個Job)
// Required if redis is located elsewhere Resque::setBackend('localhost:6379'); $args = array( 'name' => 'Chris' ); Resque::enqueue('default', 'My_Job', $args);
其中第一行爲鏈接redis服務
第二行的數組是任務的內容,可隨意編寫.經過Resque::enqueue 序列化後加入到消息隊列中.
最後一行的第一個參數表示 消息隊列的名稱(可隨意標記,好比 email,log等),第二個參數表示取出任務後,由My_Job這個類來處理此條任務.
同時須要說明的是: 在PHP-Resque中也沒有數字標識的優化級,但能夠經過消息隊列名稱來實現優化級,具體下面會講到.
定義Job Class
class My_Job { public function setUp() { // ... Set up environment for this job } public function perform() { // .. Run job } public function tearDown() { // ... Remove environment for this job } }
從上面的代碼中能夠看出,這是一個很是簡單的PHP類文件, 方法perform是必須的,主要用來處理任務.其餘兩個方法無關緊要,根據你本身的應用環境看是否須要.
Demo目錄中的job.php (修改過)
<?php class PHP_Job { public function perform() { sleep(5); // fwrite(STDOUT, 'Hello!'); $p = $this->args['name']; echo 'ttime '.$p; // print_r(json_decode($p)); echo "string"; } } ?>
從上面的例子能夠看出,若是想在Job Class中讀取任務的具體內容,可直接使用 $this->args['name']來進行讀取.按上面的例子來講,它將輸出 Chris
測試消息隊列
(1)建立任務
[root@myfc demo]# cd php-resque/demo
[root@myfc demo]# php queue.php PHP_Job
Queued job bcd1cb6f39d72b9b786716f81b757370
queue.php代碼以下:
<?php if(empty($argv[1])) { die('Specify the name of a job to add. e.g, php queue.php PHP_Job'); } require __DIR__ . '/init.php'; date_default_timezone_set('GMT'); Resque::setBackend('127.0.0.1:6379'); $args = array( 'time' => time(), 'array' => array( 'test' => 'test', ), ); $jobId = Resque::enqueue('default', $argv[1], $args, true); echo "Queued job ".$jobId."\n\n";
此時查看Redis,會發現多了三條數據
resque:queues, default resque:queue:default, {"class":"PHP_Job","args":[{"time":1368167779,"array":{"test":"test"}}],"id":"875234d18af92212a7d60056daeae910"} resque:job:875234d18af92212a7d60056daeae910:status, {"status":1,"updated":1368167779,"started":1368167779}
注意: 「,」逗號前是鍵名,後面是鍵值
在開啓守護進程之前,請確保PHP_Job存在,PHP_Job爲PHP類名,而非PHP文件名. 若是PHP_Job不存在,則此條任務會處理失敗.(由於根本不存在處理它的文件)
(2)處理任務
在demo目錄中,守護進程是由目錄下的resque.php來進行,代碼以下:
<?php date_default_timezone_set('GMT'); require 'bad_job.php'; require 'job.php'; require 'long_job.php'; require 'MtSend_Job.php'; require '../bin/resque';
Job Class須要放到此文件頭部引入,固然也可使用其餘方式引入. 上面代碼中job.php便是類PHP_Job文件
開啓守護進程參數說明:
在上面的代碼中,最後一行引入的../bin/resque 其實也是一個php cli程序,代碼以下:
#!/usr/bin/env php <?php // Find and initialize Composer $files = array( __DIR__ . '/../../vendor/autoload.php', __DIR__ . '/../../../autoload.php', __DIR__ . '/../../../../autoload.php', __DIR__ . '/../vendor/autoload.php', ); $found = false; foreach ($files as $file) { if (file_exists($file)) { require_once $file; break; } } if (!class_exists('Composer\Autoload\ClassLoader', false)) { die( 'You need to set up the project dependencies using the following commands:' . PHP_EOL . 'curl -s http://getcomposer.org/installer | php' . PHP_EOL . 'php composer.phar install' . PHP_EOL ); } $QUEUE = getenv('QUEUE'); if(empty($QUEUE)) { die("Set QUEUE env var containing the list of queues to work.\n"); } $REDIS_BACKEND = getenv('REDIS_BACKEND'); $REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB'); if(!empty($REDIS_BACKEND)) { if (empty($REDIS_BACKEND_DB)) Resque::setBackend($REDIS_BACKEND); else Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB); } $logLevel = 0; $LOGGING = getenv('LOGGING'); $VERBOSE = getenv('VERBOSE'); $VVERBOSE = getenv('VVERBOSE'); if(!empty($LOGGING) || !empty($VERBOSE)) { $logLevel = Resque_Worker::LOG_NORMAL; } else if(!empty($VVERBOSE)) { $logLevel = Resque_Worker::LOG_VERBOSE; } $APP_INCLUDE = getenv('APP_INCLUDE'); if($APP_INCLUDE) { if(!file_exists($APP_INCLUDE)) { die('APP_INCLUDE ('.$APP_INCLUDE.") does not exist.\n"); } require_once $APP_INCLUDE; } $interval = 5; $INTERVAL = getenv('INTERVAL'); if(!empty($INTERVAL)) { $interval = $INTERVAL; } $count = 1; $COUNT = getenv('COUNT'); if(!empty($COUNT) && $COUNT > 1) { $count = $COUNT; } $PREFIX = getenv('PREFIX'); if(!empty($PREFIX)) { fwrite(STDOUT, '*** Prefix set to '.$PREFIX."\n"); Resque_Redis::prefix($PREFIX); } if($count > 1) { for($i = 0; $i < $count; ++$i) { $pid = Resque::fork(); if($pid == -1) { die("Could not fork worker ".$i."\n"); } // Child, start the worker else if(!$pid) { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); $worker->logLevel = $logLevel; fwrite(STDOUT, '*** Starting worker '.$worker."\n"); $worker->work($interval); break; } } } // Start a single worker else { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); $worker->logLevel = $logLevel; $PIDFILE = getenv('PIDFILE'); if ($PIDFILE) { file_put_contents($PIDFILE, getmypid()) or die('Could not write PID information to ' . $PIDFILE); } fwrite(STDOUT, '*** Starting worker '.$worker."\n"); $worker->work($interval); }
從上面的代碼能夠看出,在命令行啓動守護進程時,能夠傳遞一些環境變量.而 ../bin/resque經過getenv函數來獲取環境變量.
支持的環境變量有:
========================================================================================================
QUEUE – 這個是必要的,會決定 worker 要執行什麼任務,重要的在前,例如 QUEUE=default,notify,mail,log 。也能夠設定為 QUEUE=* 表示執行全部任務。
APP_INCLUDE – 這也能夠說是必要的,因為 Resque 的 Job 都是寫成PHP類,那 worker 執行的時候當然要把物件的檔案引入進來。能夠設成 APP_INCLUDE=require.php 再在 require.php 中引入全部 Job 的 Class 文件便可。
COUNT – 設定 worker 數量,預設是1 COUNT=5 。
REDIS_BACKEND – 設定 Redis 的 ip, port。若是沒設定,預設是連 localhost:6379 。
LOGGING, VERBOSE – 設定 log, VERBOSE=1 便可。
VVERBOSE – 比較詳細的 log, VVERBOSE=1 debug 的時候能夠開出來看。
INTERVAL – worker 檢查 queue 的間隔,預設是五秒 INTERVAL=5 。
PIDFILE – 若是你是開單 worker,能夠指定 PIDFILE 把 pid 寫入,例如 PIDFILE=/var/run/resque.pid 。
=============================================================================
啓動守護進程:
QUEUE=* APP_INCLUDE=require.php COUNT=5 VVERBOSE=1 php resque.php
QUEUE=* 代表處理全部的消息隊列,此時則沒有優化級別.若是須要優化級別,能夠按前後順序列出,如 QUEUE=mail1,mail2,mail3
APP_INCLUDE=require.php 文件中,能夠引入其餘的一些Job Class或其餘輔助類,固然也能夠直接在 demo/resque.php中引入
啓動守護進程後:
[root@myfc demo]# QUEUE=* COUNT=5 VVERBOSE=1 php resque.php #!/usr/bin/env php *** Starting worker myfc:22143:* *** Starting worker myfc:22144:* ** [07:25:09 2013-05-10] Registered signals *** Starting worker myfc:22145:* ** [07:25:09 2013-05-10] Registered signals ** [07:25:09 2013-05-10] Registered signals *** Starting worker myfc:22146:* ** [07:25:09 2013-05-10] Registered signals *** Starting worker myfc:22148:* ** [07:25:09 2013-05-10] Registered signals [root@myfc demo]# ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Found job on default ** [07:25:09 2013-05-10] got (Job{default} | ID: 875234d18af92212a7d60056daeae910 | PHP_Job | [{"time":1368167779,"array":{"test":"test"}}]) ** [07:25:09 2013-05-10] Forked 22163 at 2013-05-10 07:25:09 ** [07:25:09 2013-05-10] Processing default since 2013-05-10 07:25:09 ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Sleeping for 5 ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Sleeping for 5 ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Sleeping for 5 ** [07:25:09 2013-05-10] Checking default ** [07:25:09 2013-05-10] Sleeping for 5 ttime 1368167779string** [07:25:14 2013-05-10] done (Job{default} | ID: 875234d18af92212a7d60056daeae910 | PHP_Job | [{"time":1368167779,"array":{"test":"test"}}]) ** [07:26:29 2013-05-10] Checking default ** [07:26:29 2013-05-10] Checking default ** [07:26:29 2013-05-10] Sleeping for 5 ** [07:26:29 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Checking default ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:34 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Checking default ** [07:26:39 2013-05-10] Sleeping for 5 ** [07:26:39 2013-05-10] Sleeping for 5
能夠看出,已經成功取出任務,並使用PHP_Job處理.而後守護進程一直在運行,等待處理新的任務
此時看一下進程文件:
[root@myfc sop]# ps -aux|grep php root 22143 0.0 0.0 310700 11496 pts/2 S 15:25 0:00 php resque.php root 22144 0.0 0.0 310700 11580 pts/2 S 15:25 0:00 php resque.php root 22145 0.0 0.0 310700 11496 pts/2 S 15:25 0:00 php resque.php root 22146 0.0 0.0 310700 11496 pts/2 S 15:25 0:00 php resque.php root 22148 0.0 0.0 310700 11496 pts/2 S 15:25 0:00 php resque.php
查看任務的狀態
在生成任務的時候,使用 $token = Resque::enqueue('default', 'My_Job', $args, true); 生成一個任務,第四個參數設置爲true,會返回任務的JobID,經過JobID能夠查看任務的狀態.
$status = new Resque_Job_Status($token); echo $status->get(); // Outputs the status
在類文件Resque_Job_Status中定義了全部類型
任務狀態
Resque_Job_Status::STATUS_WAITING - 等待被取出處理 Resque_Job_Status::STATUS_RUNNING - 任務正在被處理 Resque_Job_Status::STATUS_FAILED - 任務處理失敗 Resque_Job_Status::STATUS_COMPLETE - 任務處理完成
事件觸發接口 暫時沒用到,可查看官方文檔
原本想把選型,使用,以及整合Phalcon寫到一篇文章中,看來太長,仍是分開吧.抽空把集成到Phalcon以及消息隊列實際應用寫一下.
參考文檔:
http://avnpc.com/pages/run-background-task-by-php-resque
http://blog.hsatac.net/2012/01/php-resque-introduction/
http://kamisama.me/2012/10/12/background-jobs-with-php-and-resque-part-4-managing-worker/
https://github.com/kamisama/Fresque
http://stackoverflow.com/questions/11814445/what-is-the-proper-way-to-setup-and-use-php-resque