PHP多進程系列筆記(五)

前面幾節都是講解pcntl擴展實現的多進程程序。本節給你們介紹swoole擴展的swoole_process模塊。php

swoole多進程

swoole_process 是swoole提供的進程管理模塊,用來替代PHP的pcntl擴展。html

首先,確保安裝的swoole版本大於1.7.2:redis

$ php --ri swoole

swoole

swoole support => enabled
Version => 1.10.1

注意:swoole_process在最新的1.8.0版本已經禁止在Web環境中使用了,因此也只能支持命令行。bash

swoole提供的多進程擴展基本功能和pcntl提供的同樣,但swoole更易簡單上手,而且提供了:swoole

  • 默認基於unixsock的進程間通訊;
  • 支持消息隊列做爲進程間通訊;
  • 基於signalfd和eventloop處理信號,幾乎沒有任何額外消耗;
  • 高精度微秒定時器;
  • 配合swoole_event模塊,建立的PHP子進程能夠異步的事件驅動模式

swoole_process 模塊提供的方法(Method)主要分爲四部分:異步

  • 基礎方法
swoole_process::__construct
swoole_process->start
swoole_process->name
swoole_process->exec
swoole_process->close
swoole_process->exit
swoole_process::kill
swoole_process::wait
swoole_process::daemon
swoole_process::setAffinity
  • 管道通訊
swoole_process->write
swoole_process->read
swoole_process->setTimeout
swoole_process->setBlocking
  • 消息隊列通訊
swoole_process->useQueue
swoole_process->statQueue
swoole_process->freeQueue
swoole_process->push
swoole_process->pop
  • 信號與定時器
swoole_process::signal
swoole_process::alarm

基礎應用

本例實現的是tcp server,特性:socket

  • 多進程處理客戶端鏈接
  • 子進程退出,Master進程會從新建立一個
  • 支持事件回調
  • 主進程退出,子進程在幹完手頭活後退出
<?php 

class TcpServer{
    const MAX_PROCESS = 3;//最大進程數
    private $pids = []; //存儲子進程pid
    private $socket;
    private $mpid;

    public function run(){
        $process = new swoole_process(function(){
            $this->mpid = $id = getmypid();   
            echo time()." Master process, pid {$id}\n"; 

            //建立tcp server
            $this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr);
            if(!$this->socket) exit("start server err: $errstr --- $errno");

            for($i=0; $i<self::MAX_PROCESS;$i++){
                $this->start_worker_process();
            }
    
            echo "waiting client...\n";
    
            //Master進程等待子進程退出,必須是死循環
            while(1){
                foreach($this->pids as $k=>$pid){
                    if($pid){
                        $res = swoole_process::wait(false);
                        if ( $res ){
                            echo time()." Worker process $pid exit, will start new... \n";
                            $this->start_worker_process();
                            unset($this->pids[$k]);
                        }
                    }
                }
                sleep(1);//讓出1s時間給CPU
            }
        }, false, false); //不啓用管道通訊
        swoole_process::daemon(); //守護進程
        $process->start();//注意:start以後的變量子進程裏面是獲取不到的
    }

    /**
     * 建立worker進程,接受客戶端鏈接
     */
    private function start_worker_process(){
        $process = new swoole_process(function(swoole_process $worker){
            $this->acceptClient($worker);
        }, false, false);
        $pid = $process->start();
        $this->pids[] = $pid;
    }

    private function acceptClient(&$worker)
    {
        //子進程一直等待客戶端鏈接,不能退出
        while(1){
            
            $conn = stream_socket_accept($this->socket, -1);
            if($this->onConnect) call_user_func($this->onConnect, $conn); //回調鏈接事件

            //開始循環讀取消息
            $recv = ''; //實際收到消息
            $buffer = ''; //緩衝消息
            while(1){
                $this->checkMpid($worker);
                
                $buffer = fread($conn, 20);

                //沒有收到正常消息
                if($buffer === false || $buffer === ''){
                    if($this->onClose) call_user_func($this->onClose, $conn); //回調斷開鏈接事件
                    break;//結束讀取消息,等待下一個客戶端鏈接
                }

                $pos = strpos($buffer, "\n"); //消息結束符
                if($pos === false){
                    $recv .= $buffer;                            
                }else{
                    $recv .= trim(substr($buffer, 0, $pos+1));

                    if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); //回調收到消息事件

                    //客戶端強制關閉鏈接
                    if($recv == "quit"){
                        echo "client close conn\n";
                        fclose($conn);
                        break;
                    }

                    $recv = ''; //清空消息,準備下一次接收
                }
            }
        }
    }

    //檢查主進程是否存在,若不存在子進程在幹完手頭活後退出
    public function checkMpid(&$worker){
        if(!swoole_process::kill($this->mpid,0)){
            $worker->exit();
            // 這句提示,實際是看不到的.須要寫到日誌中
            echo "Master process exited, I [{$worker['pid']}] also quit\n";
        }
    }

    function __destruct() {
        @fclose($this->socket);
    }
}

$server =  new TcpServer();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"onClose "."\n");
};

$server->run();

運行後可使用telnet鏈接:tcp

telnet 127.0.0.1 9201

因爲設置了最大三個子進程,最多隻能接受3個客戶端鏈接。oop

進程間通訊

前面講解的例子裏,主進程和子進程直接是沒有直接的數據交互的。若是主進程須要獲得的來自子進程的反饋,或者子進程接受來自主進程的數據,那麼就須要進程間通訊了。ui

swoole內置了管道通訊和消息隊列通訊。

管道通訊

管道通訊主要是數據傳輸:一個進程須要將數據發送給另一個進程。

這個swoole封裝後,使用很是簡單:

<?php 

$workers = [];

for ($i=0; $i<3; $i++) {
    $process = new swoole_process(function(swoole_process $worker){
        //子進程邏輯
        $cmd = $worker->read();

        ob_start();
        passthru($cmd);//執行外部程序而且顯示未經處理的、原始輸出,會直接打印輸出。
        $return = ob_get_clean() ? : ' ';
        $return = trim($return).". worker pid:".$worker->pid."\n";
        
        // $worker->write($return);//寫入數據到管道
        echo $return;//寫入數據到管道。注意:子進程裏echo也是寫入到管道
    }, true); //第二個參數爲true,啓用管道通訊
    $pid = $process->start();
    $workers[$pid] = $process;  
}

foreach($workers as $pid=>$worker){
    $worker->write('whoami'); //經過管道發數據到子進程。管道是單向的:發出的數據必須由另外一端讀取。不能讀取本身發出去的
    $recv = $worker->read();//同步阻塞讀取管道數據
    echo "recv result: $recv";
}

//回收子進程
while(count($workers)){
    // echo time(). "\n";
    foreach($workers as $pid=>$worker){
        $ret = swoole_process::wait(false);
        if($ret){
            echo "worker exit: $pid\n";
            unset($workers[$pid]);
        }
    }
}

運行:

$ php swoole_process_pipe.php 
recv result: Linux
recv result: 2018年 06月 24日 星期日 16:18:01 CST
recv result: yjc
worker exit: 14519
worker exit: 14522
worker exit: 14525

注意點:
一、管道數據讀取是同步阻塞的;上面的例子裏若是子進程裏再加一句$worker->read(),會一直阻塞。可使用swoole_event_add將管道加入到事件循環中,變爲異步模式。
二、子進程裏的輸出(例如echo)與write效果相同。
三、經過管道發數據到子進程。管道是單向的:發出的數據必須由另外一端讀取。不能讀取本身發出去的。

這裏額外講解一下swoole_process::wait()
一、swoole_process::wait()默認是阻塞的, swoole_process::wait(false)則是非阻塞的;
二、swoole_process::wait()阻塞模式調用一次僅能回收一個子進程,非阻塞模式調用一次不必定能當前就能回收子進程;
三、若是不加swoole_process::wait(),主進程又是死循環,主進程退出後會變成殭屍進程。

ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'能夠查詢殭屍進程。


防盜版聲明:本文系原創文章,發佈於公衆號飛鴻影的博客(fhyblog)及博客園,轉載需做者贊成。


消息隊列通訊

消息隊列與管道有些不同:消息隊列是全局的,全部進程均可以發送、讀取。你能夠把它看作redis list結構。

消息隊列更常見的用途是主進程分配任務,子進程消費執行。

<?php 

$workers = [];

for ($i=0; $i<3; $i++) {
    $process = new swoole_process(function(swoole_process $worker){
        //子進程邏輯
        sleep(1); //防止父進程還未往消息隊列中加入內容直接退出
        while($cmd = $worker->pop()){
            // echo "recv from master: $cmd\n";

            ob_start();
            passthru($cmd);//執行外部程序而且顯示未經處理的、原始輸出,會直接打印輸出。
            $return = ob_get_clean() ? : ' ';
            $return = "res: ".trim($return).". worker pid: ".$worker->pid."\n";
            
            echo $return;
            // sleep(1);
        }

        $worker->exit(0);
    }, false, false); //不建立管道

    $process->useQueue(1, 2 | swoole_process::IPC_NOWAIT); //使用消息隊列
    $pid = $process->start();
    $workers[$pid] = $process;
}

//因爲全部進程是共享使用一個消息隊列,因此只需向一個子進程發送消息便可
$worker = current($workers);
for ($i=0; $i<3; $i++) {
    $worker->push('whoami'); //發送消息
}


//回收子進程
while(count($workers)){
    foreach($workers as $pid=>$worker){
        $ret = swoole_process::wait();
        if($ret){
            echo "worker exit: $pid\n";
            unset($workers[$pid]);
        }
    }
}

運行結果:

$ php swoole_process_quene.php 
res: yjc. worker pid: 15885
res: yjc. worker pid: 15886
res: yjc. worker pid: 15887
worker exit: 15885
worker exit: 15886
worker exit: 15887

注意點:
一、全部進程共享使用一個消息隊列;
二、消息隊列的讀取操做是阻塞的,能夠在useQueue的時候第2個參數mode改成2 | swoole_process::IPC_NOWAIT,則是異步的。mode僅僅設置爲2是阻塞的,示例裏去掉swoole_process::IPC_NOWAIT後讀取消息的while會死循環。
三、子進程前面加了個sleep(1);,這是爲了防止父進程還未往消息隊列中加入內容直接退出。
四、子進程末尾也加了sleep,這是爲了防止一個進程把全部消息都消費完了,實際應用須要去掉。

信號與定時器

swoole_process::alarm支持微秒定時器:

<?php 

function ev_timer(){
    static $i = 0;
    echo "#{$i}\talarm\n";
    $i++;
    if ($i > 5) {
        //清除定時器
        swoole_process::alarm(-1);

        //退出進程
        swoole_process::kill(getmypid());
        
    }
}

//安裝信號
swoole_process::signal(SIGALRM, 'ev_timer');

//觸發定時器信號:單位爲微秒。若是爲負數表示清除定時器
swoole_process::alarm(100 * 1000);//100ms

echo getmypid()."\n"; //該句會順序執行,後續無需使用while循環防止進程直接退出

運行:

$ php swoole_process_alarm.php 
13660
#0  alarm
#1  alarm
#2  alarm
#3  alarm
#4  alarm
#5  alarm

注:alarm不能和Swoole\Timer同時使用。

參考

一、Process-Swoole-Swoole文檔中心 https://wiki.swoole.com/wiki/page/p-process.html

相關文章
相關標籤/搜索