工做須要,瞭解學習swoole相關,這是一個swoole process的實例。封裝了一個簡單的類庫,實現了一個能夠動態擴容的進程池,該進程池默認會建立min_worker_num個進程來處理任務,當發現進程不夠用的時候,會自動建立子進程執行任務。任務結束後,子進程均可被回收。php
<?php /** * process. * User: xiongzai * Date: 2016/6/27 * Time: 14:00 */ class Process { private $process_list = []; // 進程池對象數組 private $process_use = []; // 進程佔用標記數組 private $min_worker_num = 0; //進程池初始化進程數 private $max_worker_num = 10; // 進程池進程數最大值 private $current_num; // 當前進程數 public $redis; //測試 public無妨 public function __construct() { $this->initRedis(); //實例化redis // 初始化進程池 for($i = 0 ; $i < $this->min_worker_num ; $i++){ $this->createProcess(); } //註冊信號 $this->signal(); //測試跑的太快,10秒後關閉進程而已。這裏僅測試用來關閉進程。 swoole_timer_tick(10000, function($timer_id) { foreach ($this->process_list as $process) { $process->write("exit"); } swoole_timer_clear($timer_id); }); } //這裏須要redis擴展 private function initRedis(){ $redis = new Redis(); $redis->connect('127.0.0.1', 6379 ); $redis->auth(666666); $redis->select(0); $this->redis = $redis; } /** * 建立進程並返回PID * @return mixed */ private function createProcess(){ $process = new swoole_process(array($this, 'task_run') , false , 2); $pid = $process->start(); $this->process_list[$pid] = $process; $this->process_use[$pid] = 0; //綁定子進程管道的讀事件,接收子進程任務結束的通知 swoole_event_add($process->pipe, function ($pipe) use($process) { $pid = $process->read(); echo "PID:".$pid." end".PHP_EOL; //重置爲等待任務狀態。那麼這個進程能夠本身去接收新任務處理了,具體本身實現。 $this->process_use[$pid] = 0; }); $this->current_num += 1; return $pid; } /** * 添加任務並分配空閒進程執行 * @param string $params * @return bool 返回是否有進程執行任務 */ public function task($params){ $result = false; foreach ($this->process_use as $pid => $used) { // 找到了閒置的進程 if($used == 0) { $result = true; $this->process_use[$pid] = 1; // 派發任務 $this->process_list[$pid]->write($params); } } //沒有可用進程執行任務 新建立進程執行 if($result == false && $this->current_num < $this->max_worker_num){ $pid = $this->createProcess(); $this->process_use[$pid] = 1; $this->process_list[$pid]->write($params); $result = true; } return $result; } public function task_run($worker) { // 註冊監聽管道的事件,接收任務 swoole_event_add($worker->pipe, function ($pipe) use ($worker){ $data = $worker->read(); if($data == 'exit'){ $worker->exit(); // 收到退出指令,關閉子進程 exit; } $this->task_do($data); // 執行完成,通知父進程 $worker->write($worker->pid); }); } private function task_do($data){ echo 'i:'.$data.PHP_EOL; } //註冊各類信號 private function signal(){ // 註冊信號,回收退出的子進程 swoole_process::signal(SIGCHLD, function($sig) { while($ret = swoole_process::wait(false)) { echo "PID={$ret['pid']} out\n"; } }); } } $process = new Process(); while (true){ if($process->redis->exists('tastList')){ for ($i = 1 ; $i <= 100 ; $i ++){ $task_id = $process->redis->lPop('tastList'); if(!$process->task($task_id)){ $process->redis->lPush('tastList',$task_id); } } sleep(1); //每秒鐘100個 感受差很少了 } }
代碼如上,運行效果圖以下:(注意:tastList裏面只有1到10)redis
運行效果看起來還能夠的樣子。可是若是把$min_worker_num初始化線程池的數量改成5,再看以下效果圖:數組
爲何出現了那麼多task_id:1呢?這個問題得好好想一想,本文最後解答。swoole
以上只是簡單的例子,實際運用中還會有各類狀況。進程池的功能可能遠比此複雜,這裏只是拋磚引玉。 若是不須要動態擴容,能夠建立足夠多的子進程,開啓消息隊列模式,設置搶佔,空閒進程自動處理任務。用定時任務派發任務,也能夠經過定時器來主動退出一段時間內沒有處理任務的子進程等等。本身擼代碼折騰去。學習
如今回答爲何那麼多task_id:1的問題:測試
以上已經貼了代碼以及運行效果圖,甚至還寫了那麼多文字,其實那些都是浮雲。其實我也只是想來問問爲何出現那麼多的task_id:1,跪求大神幫忙看下而已、而已、而已……………………,答案:我不知道啊。this