<?php $workers = []; $worker_num = 2; for( $i = 1; $i <= $worker_num; $i++){ $process = new swoole_process('doProcess',false,false); $process->useQueue();//開啓隊列,相似全局函數 $pid = $process->start(); $worders[$pid] = $process; } function doProcess(swoole_process $process){ $recv = $process->pop(); echo "get data from master process is $recv \n"; sleep(5); $process->exit(0); } foreach($worders as $pid=>$process){ $process->push("hello son process $pid"); } for($i = 1;$i <= $worker_num; $i++){ $ret = swoole_process::wait(); $pid = $ret['pid']; unset($workers[$pid]); echo "son process $pid is out \n"; }
task使用案例php
<?php $serv = new swoole_server("192.168.50.66",9501); $serv->set([ 'task_worker_num' => 4 ]); $serv->on('receive',function($serv,$fd,$from_id,$data){ $task_id = $serv->task($data); echo "ID:$task_id\n"; }); $serv->on('task',function($serv,$task_id,$from_id,$data){ echo "do ID:$task_id\n"; $serv->finish("$data -> ok"); }); $serv->on('finish',function($serv,$task_id,$data){ echo "do ok\n"; }); $serv->start();
process以及task利用定時器進行動態加載進程池(稍難)swoole
<?php class BaseProcess { private $process; private $process_list = []; private $process_use = []; private $worker_min_num = 3; private $worker_max_num = 6; private $current_num; public function __construct() { $this->process = new swoole_process([$this,'run'],false,2); $this->process->start(); swoole_process::wait(); } public function run() { $this->current_num = $this->worker_min_num; for( $i = 0; $i < $this->current_num; $i++){ $process = new swoole_process([$this,'task_run'],false,2); $pid = $process->start(); $this->process_list[$pid] = $process; $this->process_use[$pid] = 0; } foreach($this->process_list as $pid=>$process){ swoole_event_add($process->pipe,function($pipe) use($process){ $data = $process->read(); var_dump($data); $process_use[$data] = 0; }); } //核心部分start swoole_timer_tick(1000,function($time_id){ static $index = 0; $index = $index + 1; $flag = true; foreach($this->process_use as $pid=>$used){ if($used == 0){ $flag = false; $this->process_use[$pid] = 1; $this->process_list[$pid]->write($index . ':hello'); break; } } if($flag = true && $this->current_num <$this->worker_max_num){ $process = new swoole_process([$this,'task_run'],false,2); $pid = $process->start(); $this->process_list[$pid] = $process; $this->process_use[$pid] = 1; $this->process_list[$pid]->write($index . ":hello"); $this->current_num ++; } if( $index = 10){ foreach($this->process_list as $process){ $process->write('exit'); } swoole_timer_clear($time_id); $this->process->exit(); } }); //核心部分end } public function task_run($worker) { swoole_event_add($worker->pipe,function($pipe) use ($worker){ $data = $worker->read(); var_dump($worker->pid.":".$data); if($data =='exit'){ $worker->exit(); exit; } sleep(5); $worker->write(''.$worker->pid); }); } } new BaseProcess();