本身實現異步執行任務的隊列(二)

6、隊列具體實現三:寫執行隊列的程序
根據設計,執行隊列的程序文件是 do_queue.php , 它的主要功能是把任務從隊列表裏取出來,而且在後臺執行。
do_queue.php部分代碼:php

$phpcmd = exec("which php");    //查找到php安裝位置
$cqueue = new Queue();
$tasks = $cqueue->getQueueTask(200);
foreach ($tasks as $t)
{
    $taskphp = $t['taskphp'];
    $param = $t['param'];
    $job = $phpcmd . " " . escapeshellarg($taskphp) . " " . escapeshellarg($param);
    system($job);
}

7、具體任務的業務實現
仍是拿羣發消息來作例子,咱們須要寫好一個羣發消息的程序,這個程序接收事先定義好的參數,而後根據參數調用發消息的接口把消息發送出去。
這個通常由作業務功能的工程師實現。可是架構師事先得寫文檔例子,教會別人使用。
send_msg.php:linux

$para = $argv[1];
$arr = unserialize($para);
$cmessage = new Message();
foreach($arr['uids'] as $touid)
{
    $cmessage->send($arr['uid'], $touid, $arr['content']);
}

8、服務器部署一:配置crontab
我們執行隊列的程序都寫好了, 這個程序怎麼觸發呢,固然就要用到linux的定時任務,每隔必定的時間,執行do_queue.php一次。可是呢,這裏不是直接調用do_queue.php,我們再提升一層,加個調度程序cron_mission.php, 在cron_mission.php裏面調用do_queue.php
配置定時任務 crontab:
l crontab –e
l * * * * * cd /ucai/schedule;php cron_mission.php >> cron_mission.logsql

能夠先使用crontab -l查看本機已經使用的定時任務

9、服務器部署二:寫定時任務調度程序
思路:將定時任務寫入到任務調度程序cron_mission.php中,這樣能夠在cron_mission.php中靈活控制隊列任務。相比較直接經過crontab控制doQueue.php而言,避免了頻繁修改服務器上的crontab,從安全,便於維護等角度來講,都是上策。shell

cron_mission.php 示例:數據庫

if ($minute % 5 == 0)
{
    if(chdir($site_dir."app/")) {
        $cmd = "$phpcmd do_queue.php > do_queue.log &";
        echo '[' , $ymd , ' ' , $hour , ':' , $minute , '] ' , $cmd , "n";
        system($cmd);
    }
}

10、開啓多進程併發執行隊列
思路:對任務序列進行編號,數據庫中執行的時候
where條件加上id%每一個隊列要執行任務總數=隊列編號
這樣能夠避免重複處理
例如:每一個進程執行10條任務,修改以下
1:定時任務的修改
修改前:json

if ($minute % 5 == 0)
{
    if(chdir($site_dir."app/")) {
        $cmd = "$phpcmd do_queue.php > do_queue.log &";
        echo '[' , $ymd , ' ' , $hour , ':' , $minute , '] ' , $cmd , "n";
        system($cmd);
    }
}

修改後:數組

if ($minute % 5 == 0)
{
    for ($i=0; $i < 10; $i++) { 
        $cmd = "$phpcmd doQueue.php 10 $i>> doQueueMission".date('Y-m-d').".log  ";
        echo  date("Y-m-d H:i:s") . "t : " .$cmd."n";
        system($cmd);
    }
}

//每次進行10個進程,$i來區分是當前的進程標示
2:隊列執行程序的修改
修改前:安全

$phpcmd = 'D:workwampbinphpphp5.3.10php ';
$cqueue = new Queue();
$tasks = $cqueue->getQueueTask(200);

修改後:服務器

$phpcmd = 'D:workwampbinphpphp5.3.10php ';
$total=$argv[1];
$i=$argb[2];
$cqueue = new Queue();
$tasks = $cqueue->getQueueTask($total,$i,200);

3:取隊列接口的修改架構

public function getQueueTask($limit = 1000)
     {
        $limit = (int)$limit;
        $sql = "select id, taskphp, param from queue  where status = 0 order by id asc";
        $re = query($sql);
        return $re;
     }

修改後:

public function getQueueTask($total,$i,$limit = 1000)
     {
        $limit = (int)$limit;
        $sql = "select id, taskphp, param from queue  where status = 0 and id%$total=$i order by id asc";
        $re = query($sql);
        return $re;
     }

4:須要關注服務器壓力
進程數定爲多少,取決於服務器壓力

11、實現任務優先級
1:任務存儲表加優先級字段
在數據表裏,加一個優先級字段,按字段值的數值大小來區分優先級
2:修改取隊列任務接口,按優先級取
一樣是在sql語句中增長order by

12、記錄隊列日誌 1:關鍵地方加echo 2:shell腳本的>>和>的各自做用 總結: 咱們這裏的隊列實現藉助了服務器的計劃任務來實現,例如linux中的crontab,這自己是linux系統中的一個程序,平時咱們還可使用他來進行定時執行.sh腳本,例如將數據庫備份打包並ftp傳送到指定服務器上,這個功能不須要藉助php腳本,直接用.sh腳本就能夠實現。在這裏咱們巧妙的將crontab和php腳本結合,而且使用crontab來不斷調用一個隊列調度接口cronMission.php,再經過cronMission.php直接來控制具體何時或者是知足什麼條件來執行什麼隊列任務。 這裏面幾個須要注意的地方 1:往數據庫中存取數據時,不要直接使用json_encode或者json_decode,容易形成一些意外問題,在代碼中,咱們定義了a2s和s2a兩個方法,分別是處理數組轉爲字符串,和從數據庫中讀取字符串後轉爲數組。 2:當任務量比較大,同時服務器負載又沒有充分利用的時候,可使用多進程併發處理,在併發處理的時候須要考慮一個問題,就是如何避免重複,在這裏咱們使用了,對隊列任務進行標記,每次從數據庫中讀取一個進程須要處理的一批任務,使用數據庫中id與批次標示取餘等於0的方法來區分,避免不一樣批次的隊列,重複處理相同任務。(上面步驟10中有具體實現)

相關文章
相關標籤/搜索