實現異步執行任務的隊列

1、隊列使用場景:爲何須要隊列
在web開發中,咱們常常會遇到須要處理批量任務的時候,這些批量任務多是用戶提交的,也多是當系統被某個事件觸發時須要進行批量處理的,面對這樣的 任務,若是是用戶提交的批量任務,初級程序員只能讓用戶觸發提交動做後,等待服務器處理完畢,而且將結果返回到瀏覽器,期間用戶不能關掉瀏覽器窗口,若是 數據比較大,或者處理速度比較慢,那用戶體驗將會所以受到直接影響。可是當咱們使用某訊或者某浪的郵箱時,點擊羣發郵件以後,只需等待很短的時間,瀏覽器 提示提交成功,正在發送之類的信息時,用戶就能夠關掉瀏覽器,稍後,收件地址欄裏的郵箱將陸續收到該羣發郵件,再好比羣發定時郵件,以及當商城系統中有客 戶下單,客戶,客服,倉庫等相關人員收到訂單郵件信息。諸如此類,隊列的應用範圍是如此之廣。php

二 :普通工程師的解決方案和架構師的解決方案
方案1:建表存郵件,消息等,用定時程序取出發送。mysql

方案2:抽象到更高一層,開發一套通用異步處理隊列適用於任何複雜的業務邏輯
那麼,做爲架構師,使用隊列的作法,將抽象層和業務層分離,可具備良好的擴展性和可維護性。相比較而言就高明瞭許多,下面就咱們介紹一下自定義隊列的實現思路和方法。linux

三 :隊列整體設計程序員

1:須要隊列程序,提供加入隊列接口和取隊列接口等
2:須要存儲隊列,文件或者數據庫
3:須要定時程序取出隊列並執行
4:其它擴展功能:優先級,日誌,定時等web

代碼的目錄結構以下,每一個文件的做用用//註釋來標明
|--addTask.php //添加任務到隊列的例子
|--cronMission.php //定時任務調度程序,例如linux中受crontab直接調用的文件,業務邏輯工程師能夠在這個文件中靈活定義本身的隊列任務,從而不用每一個隊列任務 都須要上服務器修改crontab,從而在安全性,便捷性方面有很大提升
|--db.php //數據庫操做
|--db.sql //創建隊列須要用到的基本表結構
|--doQueue.php //執行隊列任務
|--Queue.class.php //隊列核心業務在這裏定義,包括將任務加入隊列,讀隊列,更改隊列任務狀態
|--sendMsg.php //隊列要實現具體任務的業務接口,好比現有系統的發送消息的接口,這裏例子中由於將此隊列程序和現有系統系統集成,用寫入日誌來演示sql

四 :隊列具體實現一:建任務存儲表
1:
先來個最基本的:數據庫

CREATE TABLE`queue` ( id int(11) NOT NULL auto_increment primarykey, taskphp varchar(128) NOT NULL default '', param text not null default '', status tinyint not null default 0, ctime timestamp NOT NULL default CURRENT_TIMESTAMP, KEY (ctime) ) ENGINE=InnoDBDEFAULT CHARSET=utf8; 

字段解釋:
taskphp:處理業務的接口文件
param:處理業務的接口文件須要接收的參數
status:任務處理狀態,0爲未處理,處理完畢更改成1json

五 、隊列具體實現二:定義調用接口數組

寫核心類Queue.class.php, 實現基本接口:
1:加入隊列接口
l //$param1 爲執行任務的程序,$param2 爲程序參數,能夠爲序列化的數據
l $cqueue->add($param1,$param2);
2: 讀取隊列接口
l $tasks = $cqueue->getQueueTask($limit = 1000);
3:更新任務狀態
l $cqueue->updateTaskStatus($id);
4:a2s是自定義的一個數組轉換字符串方法,這裏不要使用json_encode,容易出現問題,一樣,從數據庫中取出轉換爲數組的時候,使用s2a方法
l $re = $cqueue->add("sendMsg.php", Queue::a2s($arr));瀏覽器

 

  1 <?php
  2 /**
  3  * 優才網公開課示例代碼
  4  *
  5  * 任務隊列實現
  6  *
  7  * @author 優才網全棧工程師教研組  張友林
  8  * @see http://www.ucai.cn
  9  */
 10 
 11 include_once('db.php');
 12 class Queue
 13 {
 14 
 15     /**
 16      * 把任務扔到隊列
 17      *
 18      * @param string $taskphp   執行任務的程序
 19      * @param string $param     執行任務程序所用的參數
 20 
 21      * 例如,羣發消息加入隊列:
 22      * $arr = array(
 23      *      "uid" => 4,//發信息的人的UID
 24      *      "uids" => array(6,234,34,67,7888,2355), //接收信息的人的UID
 25      *      "content" => 'xxxxx',//信息內容
 26      *  );
 27      * $cqueue = new Queue();
 28      * $cqueue->add("/app/send_msg.php", serialize($arr));
 29      *
 30      */
 31     public function add($taskphp,$param)
 32     {
 33         $taskphp = mysql_real_escape_string($taskphp);
 34         //$param = mysql_real_escape_string($param);
 35         $param = $param;
 36         $sql = "insert into queue (taskphp, param) values('".$taskphp."', '".$param."')";
 37         $re = execute($sql);
 38         if ($re)
 39         {
 40             $pid = mysql_insert_id();
 41             return $pid;
 42         }
 43         else
 44         {
 45             return false;
 46         }
 47     }
 48 
 49 
 50     /**
 51      * 讀取任務隊列
 52      * 
 53      * @param string $limit 一次取多少條
 54      */
 55      public function getQueueTask($limit = 1000)
 56      {
 57         $limit = (int)$limit;
 58         $sql = "select id, taskphp, param from queue  where status = 0 order by id asc";
 59         $re = query($sql);
 60         return $re;
 61      }
 62 
 63     /**
 64      * 更新任務狀態
 65      * 
 66      * @param string $limit 一次取多少條
 67      */
 68      public function updateTaskByID($id)
 69      {
 70         $id = (int)$id;
 71         $mtime = time();
 72         $sql = "update queue  set status =1, mtime = ".$mtime." where id = ".$id;
 73         $re = execute($sql);
 74         return $re;
 75      }
 76 
 77 
 78      public static function a2s($arr)
 79     {
 80         $str = "";
 81         foreach ($arr as $key => $value)
 82         {
 83             if (is_array($value))
 84             {
 85                 foreach ($value as $value2)
 86                 {
 87                     $str .= urlencode($key) . "[]=" . urlencode($value2) . "&";
 88                 }
 89             }
 90             else
 91             {
 92                 $str .= urlencode($key) . "=" . urlencode($value) . "&";
 93             }
 94         }
 95         return $str;
 96     }
 97 
 98     public static function s2a($str)
 99     {
100         $arr = array();
101         parse_str($str, $arr);
102         return $arr;
103     }
104 
105 }
106 
107 ?>

接下來就是部署調度了, 這個呢也不是就配置crontab那麼簡單,要可維護性,可擴展性好,能多進程併發執行,能監控隊列運行狀態等等,太長了,這個下一篇講。

相關文章
相關標籤/搜索