php多進程通訊,有名管道(pcntl學習 五)

對於多進程間的通訊,研究了一天的管道,封裝了一個管道類,以及處理多進程的類。管道的理論,後期再補上,先上代碼。php

<?php
/**
 * 管道封裝類
 *
 * @author  mingazi@163.com
 * @link    www.helloworldcoding.com
 * @since   2017-04-18
 */
class fifoPipeClass
{
    /**
     * 管道資源
     */
    protected $handler;
    /**
     * 管道路徑
     */
    protected $path;
    /**
     * 是否阻塞,false爲非阻塞,true爲阻塞
     */
    protected $block = false;
    /**
     * 建立管道
     */
    public function __construct($path = './weicool.pipe', $cover = false, $mode = 0666)
    {
        if (file_exists($path)) {
            if ($cover) {
                unlink($path);
            } else {
                $this->path = $path;
                return $this;
            }
        }
        if (posix_mkfifo($path,$mode)) {
            $this->path = $path;
            return $this;
        } else {
            $this->throwException('create pipe failed');
        }
    }
    /**
     * 拋異常方法
     */
    public function throwException($msg = 'failed')
    {
        throw new \Exception($msg);
    }
    /**
     * 設置阻塞方式
     * 
     * @param   bool   $block false爲非阻塞,true爲阻塞
     */
    public function setBlock($block = false)
    {
        $this->block = $block;
    }
    /**
     * 指定pipe文件路徑
     */
    public function setPath($path)
    {
        if (!file_exists($path)) {
            $msg = $path.' pipe  does not exists';
            $this->throwException($msg);
        }
        $this->path = $path;
    }
    /**
     * 獲取pipe文件路徑
     */
    public function getPath()
    {
        return $this->path;
    }
    /**
     * 打開一個管道
     *
     * @param   string   $mode  打開類型
     */
    public function pipeOpen($mode = 'r')
    {
        $handler = fopen($this->path, $mode);
        if (!is_resource($handler)) {
            $msg = 'open pipe '.$this->path.' falied';
            $this->throwException($msg);
        }
        // 設置阻塞類型
        stream_set_blocking($handler, $this->block);
        $this->handler = $handler;
        return $this;
    }
    /**
     * 已讀的方式打開管道
     *
     * @return resource
     */
    public function readOpen()
    {
        return $this->pipeOpen('r');
    }
    /**
     * 已寫的方式打開管道
     *
     * @return resource
     */
    public function writeOpen()
    {
        return $this->pipeOpen('w');
    }
    /**
     * 讀取一行,或給定的長度
     */
    public function readOne($byte = 1024)
    {
        $data = fread($this->handler,$byte);
        return $data;
    }
    /**
     * 讀取全部的內容
     */
    public function readAll()
    {
        $hd = $this->handler;
        $data = '';
        while (!feof($hd)) {
            $data .= fread($hd,1024);
        }
        return $data;
    }
    /**
     * 寫入數據
     */
    public function write($data)
    {
        $hd = $this->handler;
        try {
            fwrite($hd,$data);
        } catch(\Exception $e) {
            $this->throwException($e->getMessage());
        }
        return $this;
    }
    /**
     * 關閉管道
     */
    public function close()
    {
        return fclose($this->handler);
    }
    /**
     * 刪除管道
     */
    public function remove()
    {
        return unlink($this->path);
    }
    
}

多進程處理類,利用管道保存各個進程的返回結果,主進程處理最後的結果數組

<?php
require_once './fifoPipeClass.php';
class pipeMultiProcess
{
    protected $process = []; // 子進程
    protected $child   = []; // 子進程pid數組
    protected $result  = []; // 計算的結果
    public function __construct($process = [])
    {
        $this->process  = $process;
    }
    /**
     * 設置子進程
     */
    public function setProcess($process)
    {
        $this->process = $process;
    }
    /**
     * fork 子進程
     */
    public function forkProcess()
    {
        $process  = $this->process;
        foreach($process as $k => $item) {
            $pid = pcntl_fork();
            if ($pid ==  0) {
                $pipe = new fifoPipeClass();
                $id = getmypid();
                $pipe->writeOpen();
                $pipe->write($k.' pid:'.$id.PHP_EOL);
                $pipe->close();
                exit(0);
            } else if ($pid > 0) {
                $this->child[] = $pid;
            }
        }
        return $this;
    }
    /**
     * 等待子進程結束
     */
    public function waiteProcess()
    {
        $child = $this->child;
        $pipe  = new fifoPipeClass();
        $pipe->readOpen();
        echo 'get all begin'.PHP_EOL;
        while(count($child)) {
            foreach($child as $k => $pid){
                $res = pcntl_waitpid($pid,$status,WNOHANG);
                if ( -1 == $res || $res > 0 ) {
                    unset($child[$k]);
                }
            }
            $data = $pipe->readOne();
            if ($data) {
                $this->result[] = $data;
            }
        }
        $pipe->close();
        echo 'get all end'.PHP_EOL;
        $pipe->remove();
        return $this;
    }
    /**
     * 獲取返回結果
     */
    public function getResult()
    {
        return $this->result;
    }
}
$obj = new pipeMultiProcess();
$obj->setProcess(['name'=>1,'age'=>2,'sex'=>3]);
$res = $obj->forkProcess()->waiteProcess()->getResult();
print_r($res);

運行結果以下:ui

Array
(
    [0] => age pid:7436

    [1] => sex pid:7437
name pid:7435

)
相關文章
相關標籤/搜索