隊列應用場景,本身實現隊列

詳見:http://www.ucai.cn/blogdetail/7024?mid=1&f=9

能夠在線運行查看效果哦php

 

1、隊列使用場景:爲何須要隊列

在web開發中,咱們常常會遇到須要處理批量任務的時候,這些批量任務多是用戶提交的,也多是當系統被某個事件觸發時須要進行批量處理的,面對這樣的任務,若是是用戶提交的批量任務,初級程序員只能讓用戶觸發提交動做後,等待服務器處理完畢,而且將結果返回到瀏覽器,期間用戶不能關掉瀏覽器窗口,若是數據比較大,或者處理速度比較慢,那用戶體驗將會所以受到直接影響。可是當咱們使用某訊或者某浪的郵箱時,點擊羣發郵件以後,只需等待很短的時間,瀏覽器提示提交成功,正在發送之類的信息時,用戶就能夠關掉瀏覽器,稍後,收件地址欄裏的郵箱將陸續收到該羣發郵件,再好比羣發定時郵件,以及當商城系統中有客戶下單,客戶,客服,倉庫等相關人員收到訂單郵件信息。諸如此類,隊列的應用範圍是如此之廣。mysql

 

二 :普通工程師的解決方案和架構師的解決方案linux

方案1:建表存郵件,消息等,用定時程序取出發送。程序員

方案2:抽象到更高一層,開發一套通用異步處理隊列適用於任何複雜的業務邏輯web

那麼,做爲架構師,使用隊列的作法,將抽象層和業務層分離,可具備良好的擴展性和可維護性。相比較而言就高明瞭許多,下面就咱們介紹一下自定義隊列的實現思路和方法。sql

 

三 :隊列整體設計shell

1:須要隊列程序,提供加入隊列接口和取隊列接口等數據庫

2:須要存儲隊列,文件或者數據庫json

3:須要定時程序取出隊列並執行數組

4:其它擴展功能:優先級,日誌,定時等

 

代碼的目錄結構以下,每一個文件的做用用//註釋來標明

|--addTask.php              //添加任務到隊列的例子

|--cronMission.php        //定時任務調度程序,例如linux中受crontab直接調用的文件,業務邏輯工程師能夠在這個文件中靈活定義本身的隊列任務,從而不用每一個隊列任務都須要上服務器修改crontab,從而在安全性,便捷性方面有很大提升

|--db.php                      //數據庫操做

|--db.sql                       //創建隊列須要用到的基本表結構

|--doQueue.php             //執行隊列任務

|--Queue.class.php         //隊列核心業務在這裏定義,包括將任務加入隊列,讀隊列,更改隊列任務狀態

|--sendMsg.php             //隊列要實現具體任務的業務接口,好比現有系統的發送消息的接口,這裏例子中由於將此隊列程序和現有系統系統集成,用寫入日誌來演示

 

四 :隊列具體實現一:建任務存儲表

一、

先來個最基本的:

 

CREATE TABLE `queue` (

id int(11) NOT NULL auto_increment primary key,

taskphp varchar(128) NOT NULL default '',

param text not null default '',

status tinyint not null default 0,

ctime timestamp NOT NULL default CURRENT_TIMESTAMP,

KEY (ctime)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

字段解釋:

taskphp:處理業務的接口文件

param:處理業務的接口文件須要接收的參數

status:任務處理狀態,0爲未處理,處理完畢更改成1

 

五 :隊列具體實現二:定義調用接口

<?php

/**

優才網公開課示例代碼

*

任務隊列實現

*

@author 優才網全棧工程師教研組  張友林

@see http://www.ucai.cn

*/

 

include_once('db.php');

class Queue

{

 

/**

把任務扔到隊列

*

* @param string $taskphp     執行任務的程序

* @param string $param              執行任務程序所用的參數

 

例如,羣發消息加入隊列:

$arr = array(

"uid" => 4,//發信息的人的UID

"uids" => array(6,234,34,67,7888,2355), //接收信息的人的UID

"content" => 'xxxxx',//信息內容

);

$cqueue = new Queue();

$cqueue->add("/app/send_msg.php", serialize($arr));

*

*/

public function add($taskphp,$param)

{

$taskphp = mysql_real_escape_string($taskphp);

//$param = mysql_real_escape_string($param);

$param = $param;

$sql = "insert into queue (taskphp, param) values('".$taskphp."', '".$param."')";

$re = execute($sql);

if ($re)

{

$pid = mysql_insert_id();

return $pid;

}

else

{

return false;

}

}

 

 

/**

讀取任務隊列

*

* @param string $limit  一次取多少條

*/

public function getQueueTask($limit = 1000)

{

$limit = (int)$limit;

$sql = "select id, taskphp, param from queue  where status = 0 order by id asc";

$re = query($sql);

return $re;

}

 

/**

更新任務狀態

*

* @param string $limit  一次取多少條

*/

public function updateTaskByID($id)

{

$id = (int)$id;

$mtime = time();

$sql = "update queue  set status =1, mtime = ".$mtime." where id = ".$id;

$re = execute($sql);

return $re;

}

 

 

public static function a2s($arr)

{

$str = "";

foreach ($arr as $key => $value)

{

if (is_array($value))

{

foreach ($value as $value2)

{

$str .= urlencode($key) . "[]=" . urlencode($value2) . "&";

}

}

else

{

$str .= urlencode($key) . "=" . urlencode($value) . "&";

}

}

return $str;

}

 

public static function s2a($str)

{

$arr = array();

parse_str($str, $arr);

return $arr;

}

 

}

 

?>

 

$cqueue = new Queue();

 

 

1: 加入隊列接口

//$param1 爲執行任務的程序,$param2 爲程序參數,能夠爲序列化的數據

$cqueue->add($param1,$param2);

 

2:  讀取隊列接口

$tasks = $cqueue->getQueueTask($limit = 1000);

 

3:更新任務狀態

$cqueue->updateTaskStatus($id);

 

4:a2s是自定義的一個數組轉換字符串方法,這裏不要使用json_encode,容易出現問題,一樣,從數據庫中取出轉換爲數組的時候,使用s2a方法

$re = $cqueue->add("sendMsg.php", Queue::a2s($arr));

六 :隊列具體實現三:寫執行隊列的程序

根據設計,執行隊列的程序文件是 do_queue.php , 它的主要功能是把任務從隊列表裏取出來,而且在後臺執行。

 

do_queue.php部分代碼:

$phpcmd = exec("which php");     //查找到php安裝位置

$cqueue = new Queue();

$tasks = $cqueue->getQueueTask(200);

foreach ($tasks as $t)

{

$taskphp = $t['taskphp'];

$param = $t['param'];

$job = $phpcmd . " " . escapeshellarg($taskphp) . " " . escapeshellarg($param);

system($job);

}

 

七 :具體任務的業務實現

仍是拿羣發消息來作例子,咱們須要寫好一個羣發消息的程序,這個程序接收事先定義好的參數,而後根據參數調用發消息的接口把消息發送出去。

這個通常由作業務功能的工程師實現。可是架構師事先得寫文檔例子,教會別人使用

 

send_msg.php:

 

$para = $argv[1];

$arr = unserialize($para);

$cmessage = new Message();

foreach($arr['uids'] as $touid)

{

$cmessage->send($arr['uid'], $touid, $arr['content']);

}

 

八 : 服務器部署一:配置crontab

我們執行隊列的程序都寫好了, 這個程序怎麼觸發呢,固然就要用到linux的定時任務,每隔必定的時間,執行do_queue.php一次。可是呢,這裏不是直接調用do_queue.php,我們再提升一層,加個調度程序cron_mission.php, 在cron_mission.php裏面調用do_queue.php

配置定時任務 crontab:

crontab –e

* * * * *  cd /ucai/schedule; php cron_mission.php >> cron_mission.log

 

#能夠先使用crontab -l查看本機已經使用的定時任務

 

九 :服務器部署二:寫定時任務調度程序

思路:將定時任務寫入到任務調度程序cron_mission.php中,這樣能夠在cron_mission.php中靈活控制隊列任務。相比較直接經過crontab控制doQueue.php而言,避免了頻繁修改服務器上的crontab,從安全,便於維護等角度來講,都是上策。

 

cron_mission.php 示例:

 

if ($minute % 5 == 0)

{

if(chdir($site_dir."app/")) {

$cmd = "$phpcmd do_queue.php > do_queue.log &";

echo '[' , $ymd , ' ' , $hour , ':' , $minute , '] ' , $cmd , "\n";

system($cmd);

}

}

 

十 :開啓多進程併發執行隊列

思路:對任務序列進行編號,數據庫中執行的時候

where條件加上id%每一個隊列要執行任務總數=隊列編號

這樣能夠避免重複處理

 

例如:每一個進程執行10條任務,修改以下

 

1:定時任務的修改

修改前:

if ($minute % 5 == 0)

{

if(chdir($site_dir."app/")) {

$cmd = "$phpcmd do_queue.php > do_queue.log &";

echo '[' , $ymd , ' ' , $hour , ':' , $minute , '] ' , $cmd , "\n";

system($cmd);

}

}

 

修改後:

if ($minute % 5 == 0)

{

for ($i=0; $i < 10; $i++) {

$cmd = "$phpcmd doQueue.php 10 $i>> doQueueMission".date('Y-m-d').".log  ";

echo  date("Y-m-d H:i:s") . "\t : " .$cmd."\n";

system($cmd);

}

}

 

//每次進行10個進程,$i來區分是當前的進程標示

 

2:隊列執行程序的修改

修改前:

$phpcmd = 'D:\work\wamp\bin\php\php5.3.10\php ';

$cqueue = new Queue();

$tasks = $cqueue->getQueueTask(200);

修改後:

$phpcmd = 'D:\work\wamp\bin\php\php5.3.10\php ';

$total=$argv[1];

$i=$argb[2];

$cqueue = new Queue();

$tasks = $cqueue->getQueueTask($total,$i,200);

 

3:取隊列接口的修改

修改前:

public function getQueueTask($limit = 1000)

{

$limit = (int)$limit;

$sql = "select id, taskphp, param from queue  where status = 0 order by id asc";

$re = query($sql);

return $re;

}

 

修改後:

public function getQueueTask($total,$i,$limit = 1000)

{

$limit = (int)$limit;

$sql = "select id, taskphp, param from queue  where status = 0 and id%$total=$i order by id asc";

$re = query($sql);

return $re;

}

 

4:須要關注服務器壓力

進程數定爲多少,取決於服務器壓力

 

十一 :實現任務優先級 

1:任務存儲表加優先級字段

在數據表裏,加一個優先級字段,按字段值的數值大小來區分優先級

2:修改取隊列任務接口,按優先級取

一樣是在sql語句中增長order by

 

十二 :記錄隊列日誌

1:關鍵地方加echo

 2:shell腳本的>>和>的各自做用

 

總結:

      咱們這裏的隊列實現藉助了服務器的計劃任務來實現,例如linux中的crontab,這自己是linux系統中的一個程序,平時咱們還可使用他來進行定時執行.sh腳本,例如將數據庫備份打包並ftp傳送到指定服務器上,這個功能不須要藉助php腳本,直接用.sh腳本就能夠實現。在這裏咱們巧妙的將crontab和php腳本結合,而且使用crontab來不斷調用一個隊列調度接口cronMission.php,再經過cronMission.php直接來控制具體何時或者是知足什麼條件來執行什麼隊列任務。

這裏面幾個須要注意的地方

1:往數據庫中存取數據時,不要直接使用json_encode或者json_decode,容易形成一些意外問題,在代碼中,咱們定義了a2s和s2a兩個方法,分別是處理數組轉爲字符串,和從數據庫中讀取字符串後轉爲數組。

2:當任務量比較大,同時服務器負載又沒有充分利用的時候,可使用多進程併發處理,在併發處理的時候須要考慮一個問題,就是如何避免重複,在這裏咱們使用了,對隊列任務進行標記,每次從數據庫中讀取一個進程須要處理的一批任務,使用數據庫中id與批次標示取餘等於0的方法來區分,避免不一樣批次的隊列,重複處理相同任務。(上面步驟10中有具體實現)

相關文章
相關標籤/搜索