<?php abstract class Schedule{ protected $_consumerList = array(); protected $_msgqkey = null; protected $_consumerNum = 2; protected $_finishFlag = 'ALLDONE'; public function __construct($cNum = 0){ if ($cNum){ $this->_consumerNum = $cNum; } } public function setConsumerNum($num = 0){ if ($num){ $this->_consumerNum = $num; return true; } return false; } public function setFinishFlag($flag = null){ if ($flag){ $this->_finishFlag = $flag; return true; } return false; } public function run(){ $this->_consumerList = array(); for($i=0; $i<$this->_consumerNum; $i++){ $consumer = new swoole_process(function($worker){ $this->_consumerFunc($worker); }); if ($this->_msgqkey){ $consumer->useQueue($this->_msgqkey); } else{ $consumer->useQueue(); } $pid = $consumer->start(); $this->_consumerList[$pid] = $consumer; } $producer = new swoole_process(function($worker){ //echo "i'm passer\n"; exit(0); }); if ($this->_msgqkey){ $producer->useQueue($this->_msgqkey); } else{ $producer->useQueue(); } $pid = $producer->start(); echo "begin:\n"; echo sprintf("msgqkey:%s\n", $producer->msgQueueKey); $this->_producerFunc($producer); } protected function _producerFunc($worker){ if ($this->_onlyConsume()){ return; } foreach ($this->doProduce($worker) as $data){ $worker->push($data); } //任務數據被取完 while(true){ $c = $worker->statQueue(); $n = $c['queue_num']; if ($n === 0){ break; } } //放入consumer進程程結束標識 foreach($this->_consumerList as $pid => $w){ $w->push($this->_finishFlag); } //確認結束 while(true){ $c = $worker->statQueue(); $n = $c['queue_num']; if ($n === 0){ break; } } $worker->freeQueue(); } protected function _consumerFunc($worker){ while(1){ $data = $worker->pop(); $pid = $worker->pid; if ($data == $this->_finishFlag){ echo "consumer $pid exit\n"; $worker->exit(0); } else{ $this->doConsume($data, $worker); } } } protected function _onlyConsume(){ return !! $this->_msgqkey; } abstract protected function doProduce($worker); abstract protected function doConsume($data, $worker); }
<?php class Taskdemo extends Schedule{ protected $_consumerNum = 5; protected function doProduce($worker){ $all = 100; for($i=0; $i<$all; $i+=4){ yield json_encode(array('data'=>$i)); } } protected function doConsume($data, $worker){ //your process sleep(1); echo "consumer:{$worker->pid} redv {$data}\n"; } }
說明
1. 要繼承Schedule
2. _consumerNum爲消費者個數,不設置,默認2個。
3. doProduce($worker)用於產生任務數據的函數,要求返回值必須是數組或迭代器,每一項爲一條任務數據。$worker爲swoole進程句柄。
4. doConsume($data,$worker)用於消費者處理數據的函數。$data爲單條消息,$worker爲swoole進程句柄。
5. 通常狀況進程句柄$worker都不會用到,能夠忽略php
步驟:linux
1.確認當前隊列的key
程序運行時,會打出json
msgqkey:1078263
也能夠使用命令行數組
ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0x001073f7 2359298 ballqiu 666 165 15
key值便是所須要的swoole
protected $_msgqkey = 0x001073f7;
ipcrm -q $msgqkey
protected $_finishFlag = 'youflag';