http://www.imooc.com/wenda/detail/252185php
1、隊列使用場景:爲何須要隊列
在web開發中,咱們常常會遇到須要處理批量任務的時候,這些批量任務多是用戶提交的,也多是當系統被某個事件觸發時須要進行批量處理的,面對這樣的 任務,若是是用戶提交的批量任務,初級程序員只能讓用戶觸發提交動做後,等待服務器處理完畢,而且將結果返回到瀏覽器,期間用戶不能關掉瀏覽器窗口,若是 數據比較大,或者處理速度比較慢,那用戶體驗將會所以受到直接影響。可是當咱們使用某訊或者某浪的郵箱時,點擊羣發郵件以後,只需等待很短的時間,瀏覽器 提示提交成功,正在發送之類的信息時,用戶就能夠關掉瀏覽器,稍後,收件地址欄裏的郵箱將陸續收到該羣發郵件,再好比羣發定時郵件,以及當商城系統中有客 戶下單,客戶,客服,倉庫等相關人員收到訂單郵件信息。諸如此類,隊列的應用範圍是如此之廣。mysql
二 :普通工程師的解決方案和架構師的解決方案
方案1:建表存郵件,消息等,用定時程序取出發送。linux
方案2:抽象到更高一層,開發一套通用異步處理隊列適用於任何複雜的業務邏輯
那麼,做爲架構師,使用隊列的作法,將抽象層和業務層分離,可具備良好的擴展性和可維護性。相比較而言就高明瞭許多,下面就咱們介紹一下自定義隊列的實現思路和方法。程序員
三 :隊列整體設計web
1:須要隊列程序,提供加入隊列接口和取隊列接口等
2:須要存儲隊列,文件或者數據庫
3:須要定時程序取出隊列並執行
4:其它擴展功能:優先級,日誌,定時等sql
代碼的目錄結構以下,每一個文件的做用用//註釋來標明
|–addTask.php //添加任務到隊列的例子
|–cronMission.php //定時任務調度程序,例如linux中受crontab直接調用的文件,業務邏輯工程師能夠在這個文件中靈活定義本身的隊列任務,從而不用每一個隊列任務 都須要上服務器修改crontab,從而在安全性,便捷性方面有很大提升
|–db.php //數據庫操做
|–db.sql //創建隊列須要用到的基本表結構
|–doQueue.php //執行隊列任務
|–Queue.class.php //隊列核心業務在這裏定義,包括將任務加入隊列,讀隊列,更改隊列任務狀態
|–sendMsg.php //隊列要實現具體任務的業務接口,好比現有系統的發送消息的接口,這裏例子中由於將此隊列程序和現有系統系統集成,用寫入日誌來演示shell
四 :隊列具體實現一:建任務存儲表
1:
先來個最基本的:數據庫
1
2
3
4
5
6
7
8
|
CREATE TABLE`queue` (
id int(11) NOT NULL auto_increment primarykey,
taskphp varchar(128) NOT NULL
default
''
,
param text not null
default
''
,
status tinyint not null
default
0,
ctime timestamp NOT NULL
default
CURRENT_TIMESTAMP,
KEY (ctime)
) ENGINE=InnoDBDEFAULT CHARSET=utf8;
|
字段解釋:
taskphp:處理業務的接口文件
param:處理業務的接口文件須要接收的參數
status:任務處理狀態,0爲未處理,處理完畢更改成1json
五 、隊列具體實現二:定義調用接口
數組
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
<?php
/**
*
* 任務隊列實現
*
*/
include_once
(
'db.php'
);
class
Queue
{
/**
* 把任務扔到隊列
*
* @param string $taskphp 執行任務的程序
* @param string $param 執行任務程序所用的參數
* 例如,羣發消息加入隊列:
* $arr = array(
* "uid" => 4,//發信息的人的UID
* "uids" => array(6,234,34,67,7888,2355), //接收信息的人的UID
* "content" => 'xxxxx',//信息內容
* );
* $cqueue = new Queue();
* $cqueue->add("/app/send_msg.php", serialize($arr));
*
*/
public
function
add(
$taskphp
,
$param
)
{
$taskphp
= mysql_real_escape_string(
$taskphp
);
//$param = mysql_real_escape_string($param);
$param
=
$param
;
$sql
=
"insert into queue (taskphp, param) values('"
.
$taskphp
.
"', '"
.
$param
.
"')"
;
$re
= execute(
$sql
);
if
(
$re
)
{
$pid
= mysql_insert_id();
return
$pid
;
}
else
{
return
false;
}
}
/**
* 讀取任務隊列
*
* @param string $limit 一次取多少條
*/
public
function
getQueueTask(
$limit
= 1000)
{
$limit
= (int)
$limit
;
$sql
=
"select id, taskphp, param from queue where status = 0 order by id asc"
;
$re
= query(
$sql
);
return
$re
;
}
/**
* 更新任務狀態
*
* @param string $limit 一次取多少條
*/
public
function
updateTaskByID(
$id
)
{
$id
= (int)
$id
;
$mtime
= time();
$sql
=
"update queue set status =1, mtime = "
.
$mtime
.
" where id = "
.
$id
;
$re
= execute(
$sql
);
return
$re
;
}
public
static
function
a2s(
$arr
)
{
$str
=
""
;
foreach
(
$arr
as
$key
=>
$value
)
{
if
(
is_array
(
$value
))
{
foreach
(
$value
as
$value2
)
{
$str
.= urlencode(
$key
) .
"[]="
. urlencode(
$value2
) .
"&"
;
}
}
else
{
$str
.= urlencode(
$key
) .
"="
. urlencode(
$value
) .
"&"
;
}
}
return
$str
;
}
public
static
function
s2a(
$str
)
{
$arr
=
array
();
parse_str
(
$str
,
$arr
);
return
$arr
;
}
}
?>
|
1:加入隊列接口
l //$param1 爲執行任務的程序,$param2 爲程序參數,能夠爲序列化的數據
l $cqueue->add($param1,$param2);
2: 讀取隊列接口
l $tasks = $cqueue->getQueueTask($limit = 1000);
3:更新任務狀態
l $cqueue->updateTaskStatus($id);
4:a2s是自定義的一個數組轉換字符串方法,這裏不要使用json_encode,容易出現問題,一樣,從數據庫中取出轉換爲數組的時候,使用s2a方法
l $re = $cqueue->add("sendMsg.php", Queue::a2s($arr));
6、隊列具體實現三:寫執行隊列的程序
根據設計,執行隊列的程序文件是 do_queue.php , 它的主要功能是把任務從隊列表裏取出來,而且在後臺執行。
1
2
3
4
5
6
7
8
9
10
11
|
do_queue.php部分代碼:
$phpcmd
=
exec
(
"which php"
);
//查找到php安裝位置
$cqueue
=
new
Queue();
$tasks
=
$cqueue
->getQueueTask(200);
foreach
(
$tasks
as
$t
)
{
$taskphp
=
$t
[
'taskphp'
];
$param
=
$t
[
'param'
];
$job
=
$phpcmd
.
" "
.
escapeshellarg
(
$taskphp
) .
" "
.
escapeshellarg
(
$param
);
system(
$job
);
}
|
7、具體任務的業務實現
仍是拿羣發消息來作例子,咱們須要寫好一個羣發消息的程序,這個程序接收事先定義好的參數,而後根據參數調用發消息的接口把消息發送出去。
這個通常由作業務功能的工程師實現。可是架構師事先得寫文檔例子,教會別人使用。
1
2
3
4
5
6
7
8
|
send_msg.php:
$para
=
$argv
[1];
$arr
= unserialize(
$para
);
$cmessage
=
new
Message();
foreach
(
$arr
[
'uids'
]
as
$touid
)
{
$cmessage
->send(
$arr
[
'uid'
],
$touid
,
$arr
[
'content'
]);
}
|
8、服務器部署一:配置crontab
我們執行隊列的程序都寫好了, 這個程序怎麼觸發呢,固然就要用到linux的定時任務,每隔必定的時間,執行do_queue.php一次。可是呢,這裏不是直接調用 do_queue.php,我們再提升一層,加個調度程序cron_mission.php, 在cron_mission.php裏面調用do_queue.php
配置定時任務 crontab:
l crontab –e
l * * * * * cd /ucai/schedule;php cron_mission.php >> cron_mission.log
#能夠先使用crontab -l查看本機已經使用的定時任務
9、服務器部署二:寫定時任務調度程序
思路:將定時任務寫入到任務調度程序cron_mission.php中,這樣能夠在cron_mission.php中靈活控制隊列任務。相比較直接通 過crontab控制doQueue.php而言,避免了頻繁修改服務器上的crontab,從安全,便於維護等角度來講,都是上策。
cron_mission.php 示例:
1
2
3
4
5
6
7
8
|
if
(
$minute
% 5 == 0)
{
if
(
chdir
(
$site_dir
.
"app/"
)) {
$cmd
=
"$phpcmd do_queue.php > do_queue.log &"
;
echo
'['
,
$ymd
,
' '
,
$hour
,
':'
,
$minute
,
'] '
,
$cmd
,
"n"
;
system(
$cmd
);
}
}
|
10、開啓多進程併發執行隊列
思路:對任務序列進行編號,數據庫中執行的時候
where條件加上id%每一個隊列要執行任務總數=隊列編號
這樣能夠避免重複處理
例如:每一個進程執行10條任務,修改以下
1:定時任務的修改
修改前:
1
2
3
4
5
6
7
8
|
if
(
$minute
% 5 == 0)
{
if
(
chdir
(
$site_dir
.
"app/"
)) {
$cmd
=
"$phpcmd do_queue.php > do_queue.log &"
;
echo
'['
,
$ymd
,
' '
,
$hour
,
':'
,
$minute
,
'] '
,
$cmd
,
"n"
;
system(
$cmd
);
}
}
|
修改後:
1
2
3
4
5
6
7
8
|
if
(
$minute
% 5 == 0)
{
for
(
$i
=0;
$i
< 10;
$i
++) {
$cmd
=
"$phpcmd doQueue.php 10 $i>> doQueueMission"
.
date
(
'Y-m-d'
).
".log "
;
echo
date
(
"Y-m-d H:i:s"
) .
"t : "
.
$cmd
.
"n"
;
system(
$cmd
);
}
}
|
//每次進行10個進程,$i來區分是當前的進程標示
2:隊列執行程序的修改
1
2
3
4
5
6
7
8
9
10
|
修改前:
$phpcmd
=
'D:workwampbinphpphp5.3.10php '
;
$cqueue
=
new
Queue();
$tasks
=
$cqueue
->getQueueTask(200);
修改後:
$phpcmd
=
'D:workwampbinphpphp5.3.10php '
;
$total
=
$argv
[1];
$i
=
$argb
[2];
$cqueue
=
new
Queue();
$tasks
=
$cqueue
->getQueueTask(
$total
,
$i
,200);
|
3:取隊列接口的修改
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public
function
getQueueTask(
$limit
= 1000)
{
$limit
= (int)
$limit
;
$sql
=
"select id, taskphp, param from queue where status = 0 order by id asc"
;
$re
= query(
$sql
);
return
$re
;
}
修改後:
public
function
getQueueTask(
$total
,
$i
,
$limit
= 1000)
{
$limit
= (int)
$limit
;
$sql
=
"select id, taskphp, param from queue where status = 0 and id%$total=$i order by id asc"
;
$re
= query(
$sql
);
return
$re
;
}
|
4:須要關注服務器壓力
進程數定爲多少,取決於服務器壓力
11、實現任務優先級
1:任務存儲表加優先級字段
在數據表裏,加一個優先級字段,按字段值的數值大小來區分優先級
2:修改取隊列任務接口,按優先級取
一樣是在sql語句中增長order by
12、記錄隊列日誌
1:關鍵地方加echo
2:shell腳本的>>和>的各自做用
總結:
咱們這裏的隊列實現藉助了服務器的計劃任務來實現,例如linux中的crontab,這自己是linux系統中的一個程序,平時咱們還可使用他來進行 定時執行.sh腳本,例如將數據庫備份打包並ftp傳送到指定服務器上,這個功能不須要藉助php腳本,直接用.sh腳本就能夠實現。在這裏咱們巧妙的將 crontab和php腳本結合,而且使用crontab來不斷調用一個隊列調度接口cronMission.php,再經過 cronMission.php直接來控制具體何時或者是知足什麼條件來執行什麼隊列任務。
這裏面幾個須要注意的地方
1:往數據庫中存取數據時,不要直接使用json_encode或者json_decode,容易形成一些意外問題,在代碼中,咱們定義了a2s和s2a兩個方法,分別是處理數組轉爲字符串,和從數據庫中讀取字符串後轉爲數組。
2:當任務量比較大,同時服務器負載又沒有充分利用的時候,可使用多進程併發處理,在併發處理的時候須要考慮一個問題,就是如何避免重複,在這裏咱們使 用了,對隊列任務進行標記,每次從數據庫中讀取一個進程須要處理的一批任務,使用數據庫中id與批次標示取餘等於0的方法來區分,避免不一樣批次的隊列,重 復處理相同任務。(上面步驟10中有具體實現)
本文出自 技術鵬飛,原文永久連接: http://www.itlipeng.cn/?p=576