對於多進程間的通訊,研究了一天的管道,封裝了一個管道類,以及處理多進程的類。管道的理論,後期再補上,先上代碼。php
<?php /** * 管道封裝類 * * @author mingazi@163.com * @link www.helloworldcoding.com * @since 2017-04-18 */ class fifoPipeClass { /** * 管道資源 */ protected $handler; /** * 管道路徑 */ protected $path; /** * 是否阻塞,false爲非阻塞,true爲阻塞 */ protected $block = false; /** * 建立管道 */ public function __construct($path = './weicool.pipe', $cover = false, $mode = 0666) { if (file_exists($path)) { if ($cover) { unlink($path); } else { $this->path = $path; return $this; } } if (posix_mkfifo($path,$mode)) { $this->path = $path; return $this; } else { $this->throwException('create pipe failed'); } } /** * 拋異常方法 */ public function throwException($msg = 'failed') { throw new \Exception($msg); } /** * 設置阻塞方式 * * @param bool $block false爲非阻塞,true爲阻塞 */ public function setBlock($block = false) { $this->block = $block; } /** * 指定pipe文件路徑 */ public function setPath($path) { if (!file_exists($path)) { $msg = $path.' pipe does not exists'; $this->throwException($msg); } $this->path = $path; } /** * 獲取pipe文件路徑 */ public function getPath() { return $this->path; } /** * 打開一個管道 * * @param string $mode 打開類型 */ public function pipeOpen($mode = 'r') { $handler = fopen($this->path, $mode); if (!is_resource($handler)) { $msg = 'open pipe '.$this->path.' falied'; $this->throwException($msg); } // 設置阻塞類型 stream_set_blocking($handler, $this->block); $this->handler = $handler; return $this; } /** * 已讀的方式打開管道 * * @return resource */ public function readOpen() { return $this->pipeOpen('r'); } /** * 已寫的方式打開管道 * * @return resource */ public function writeOpen() { return $this->pipeOpen('w'); } /** * 讀取一行,或給定的長度 */ public function readOne($byte = 1024) { $data = fread($this->handler,$byte); return $data; } /** * 讀取全部的內容 */ public function readAll() { $hd = $this->handler; $data = ''; while (!feof($hd)) { $data .= fread($hd,1024); } return $data; } /** * 寫入數據 */ public function write($data) { $hd = $this->handler; try { fwrite($hd,$data); } catch(\Exception $e) { $this->throwException($e->getMessage()); } return $this; } /** * 關閉管道 */ public function close() { return fclose($this->handler); } /** * 刪除管道 */ public function remove() { return unlink($this->path); } }
多進程處理類,利用管道保存各個進程的返回結果,主進程處理最後的結果數組
<?php require_once './fifoPipeClass.php'; class pipeMultiProcess { protected $process = []; // 子進程 protected $child = []; // 子進程pid數組 protected $result = []; // 計算的結果 public function __construct($process = []) { $this->process = $process; } /** * 設置子進程 */ public function setProcess($process) { $this->process = $process; } /** * fork 子進程 */ public function forkProcess() { $process = $this->process; foreach($process as $k => $item) { $pid = pcntl_fork(); if ($pid == 0) { $pipe = new fifoPipeClass(); $id = getmypid(); $pipe->writeOpen(); $pipe->write($k.' pid:'.$id.PHP_EOL); $pipe->close(); exit(0); } else if ($pid > 0) { $this->child[] = $pid; } } return $this; } /** * 等待子進程結束 */ public function waiteProcess() { $child = $this->child; $pipe = new fifoPipeClass(); $pipe->readOpen(); echo 'get all begin'.PHP_EOL; while(count($child)) { foreach($child as $k => $pid){ $res = pcntl_waitpid($pid,$status,WNOHANG); if ( -1 == $res || $res > 0 ) { unset($child[$k]); } } $data = $pipe->readOne(); if ($data) { $this->result[] = $data; } } $pipe->close(); echo 'get all end'.PHP_EOL; $pipe->remove(); return $this; } /** * 獲取返回結果 */ public function getResult() { return $this->result; } } $obj = new pipeMultiProcess(); $obj->setProcess(['name'=>1,'age'=>2,'sex'=>3]); $res = $obj->forkProcess()->waiteProcess()->getResult(); print_r($res);
運行結果以下:ui
Array ( [0] => age pid:7436 [1] => sex pid:7437 name pid:7435 )