因爲PHP不支持多線程,可是做爲一個完善的系統,有不少操做都是須要異步完成的。爲了完成這些異步操做,咱們作了一個基於Redis隊列任務系統。redis
你們知道,一個消息隊列處理系統主要分爲兩大部分:消費者和生產者。json
在咱們的系統中,主系統做爲生產者,任務系統做爲消費者。服務器
具體的工做流程以下:swoole
一、主系統將須要須要處理的任務名稱+任務參數push到隊列中。多線程
二、任務系統實時的對任務隊列進行pop,pop出來一個任務就fork一個子進程,由子進程完成具體的任務邏輯。併發
具體代碼以下:異步
1 /** 2 * 啓動守護進程 3 */ 4 public function runAction() { 5 Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); 6 while (true) { 7 $this->fork_process(); 8 } 9 exit; 10 } 11 12 /** 13 * 建立子進程 14 */ 15 private function fork_process() { 16 $ppid = getmypid(); 17 $pid = pcntl_fork(); 18 if ($pid == 0) {//子進程 19 $pid = posix_getpid(); 20 //echo "* Process {$pid} was created \n\n"; 21 $this->mq_process(); 22 exit; 23 } else {//主進程 24 $pid = pcntl_wait($status, WUNTRACED); //取得子進程結束狀態 25 if (pcntl_wifexited($status)) { 26 //echo "\n\n* Sub process: {$pid} exited with {$status}"; 27 //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); 28 } else { 29 Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); 30 } 31 } 32 } 33 34 /** 35 * 業務任務隊列處理 36 */ 37 private function mq_process() { 38 $data_pop = $this->masterRedis->rPop($this->redis_list_key); 39 $data = json_decode($data_pop, 1); 40 if (!$data) { 41 return FALSE; 42 } 43 $worker = '_task_' . $data['worker']; 44 $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; 45 $params = $data['params']; 46 $class = new $class_name(); 47 $class->$worker($params); 48 return TRUE; 49 }
這是一個簡單的任務處理系統。socket
經過這個任務系統幫助咱們實現了異步,到目前爲止已經穩定運行了將近一年。測試
但很惋惜,它是一個單進程的系統。它是一直在不斷的fork,若是有任務就處理,沒有任務就跳過。this
這樣很穩定。
但問題有兩個:一是不斷地fork、pop會浪費服務器資源,二是不支持併發!
第一個問題還好,但第二個問題就很嚴重。
當主系統 同時 拋過來大量的任務時,任務的處理時間就會無限的拉長。
爲了解決併發的問題,咱們計劃作一個更加高效強壯的隊裏處理系統。
由於在PHP7以前不支持多線程,因此咱們採用多進程。
從網上找了很多資料,大多所謂的多進程都是N個進程同時在後臺運行。
顯然這是不合適的。
個人預想是:每pop出一個任務就fork一個任務,任務執行完成後子進程結束。
這個問題很簡單,那就是每fork一個子進程就自增一次。而當子進程執行完成就自減一次。
自增沒有問題,咱們就在主進程中操做就完了。那麼該如何自減呢?
可能你會說,固然是在子進程中啊。但這裏你須要注意:當fork的時候是從主進程複製了一份資源給子進程,這就意味着你沒法在子進程中操做主進程中的計數器!
因此,這裏就須要瞭解一個知識點:信號。
具體的能夠自行Google,這裏直接看代碼。
1 // install signal handler for dead kids 2 pcntl_signal(SIGCHLD, array($this, "sig_handler"));
這就安裝了一個信號處理器。固然還缺乏一點。
declare(ticks = 1);
declare是一個控制結構語句,具體的用法也請去Google。
這句代碼的意思就是每執行一條低級語句就調用一次信號處理器。
這樣,每當子進程結束的時候就會調用信號處理器,咱們就能夠在信號處理器中進行自減。
在多進程開發中,若是處理不當就會致使進程殘留。
爲了解決進程殘留,必須得將子進程回收。
那麼如何對子進程進行回收就是一個技術點了。
在pcntl的demo中,包括不少博文中都是說在主進程中回收子進程。
但咱們是基於Redis的brpop的,而brpop是阻塞的。
這就致使一個問題:當執行N個任務以後,任務系統空閒的時候主進程是阻塞的,而在發生阻塞的時候子進程還在執行,因此就沒法完成最後幾個子進程的進程回收。。。
這裏原本一直很糾結,但當我將信號處理器搞定以後就也很簡單了。
進程回收也放到信號處理器中去。
pcntl是一個進程處理的擴展,但很惋惜它對多進程的支持很是乏力。
因此這裏採用Swoole擴展中的Process。
具體代碼以下:
1 declare(ticks = 1); 2 class JobDaemonController extends Yaf_Controller_Abstract{ 3 4 use Trait_Redis; 5 6 private $maxProcesses = 800; 7 private $child; 8 private $masterRedis; 9 private $redis_task_wing = 'task:wing'; //待處理隊列 10 11 public function init(){ 12 // install signal handler for dead kids 13 pcntl_signal(SIGCHLD, array($this, "sig_handler")); 14 set_time_limit(0); 15 ini_set('default_socket_timeout', -1); //隊列處理不超時,解決redis報錯:read error on connection 16 } 17 18 private function redis_client(){ 19 $rds = new Redis(); 20 $rds->connect('redis.master.host',6379); 21 return $rds; 22 } 23 24 public function process(swoole_process $worker){// 第一個處理 25 $GLOBALS['worker'] = $worker; 26 swoole_event_add($worker->pipe, function($pipe) { 27 $worker = $GLOBALS['worker']; 28 $recv = $worker->read(); //send data to master 29 30 sleep(rand(1, 3)); 31 echo "From Master: $recv\n"; 32 $worker->exit(0); 33 }); 34 exit; 35 } 36 37 public function testAction(){ 38 for ($i = 0; $i < 10000; $i++){ 39 $data = [ 40 'abc' => $i, 41 'timestamp' => time().rand(100,999) 42 ]; 43 $this->masterRedis->lpush($this->redis_task_wing, json_encode($data)); 44 } 45 exit; 46 } 47 48 public function runAction(){ 49 while (1){ 50 // echo "\t now we de have $this->child child processes\n"; 51 if ($this->child < $this->maxProcesses){ 52 $rds = $this->redis_client(); 53 $data_pop = $rds->brpop($this->redis_task_wing, 3);//無任務時,阻塞等待 54 if (!$data_pop){ 55 continue; 56 } 57 echo "\t Starting new child | now we de have $this->child child processes\n"; 58 $this->child++; 59 $process = new swoole_process([$this, 'process']); 60 $process->write(json_encode($data_pop)); 61 $pid = $process->start(); 62 } 63 } 64 } 65 66 private function sig_handler($signo) { 67 // echo "Recive: $signo \r\n"; 68 switch ($signo) { 69 case SIGCHLD: 70 while($ret = swoole_process::wait(false)) { 71 // echo "PID={$ret['pid']}\n"; 72 $this->child--; 73 } 74 } 75 } 76 }
最終,通過測試,單核1G的服務器執行1到3秒的任務能夠作到800的併發。