使用 swoole_process 實現 PHP 進程池

swoole_process 主要是用來代替 PHP 的 pcntl 擴展。咱們知道 pcntl 是用來進行多進程編程的,而 pcntl 只提供了 fork 這樣原始的接口,容易使用錯誤,而且沒有提供進程間通訊以及重定向標準輸入輸出的功能。php

而 swoole_process 則提供了比 pcntl 更強大的功能,更易用的API,使PHP在多進程編程方面更加輕鬆。html

本文使用 swoole_process 與 EventLoop 完成一個 php 的進程池,而且支持動態建立新進程。

EventLoop

swoole 有一個 Reactor 線程,這個線程能夠說是對 epoll 模型的封裝,能夠設置 read 事件和 write 事件的監聽回調函數。編程

下面會用到一個函數:數組

bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null, int $flags = null);
  • 參數1爲一個文件描述符,包括swoole_client->$sockswoole_process->$pipe或者其餘 fd(socket_create 建立的資源 , stream_socket_client/fsockopen建立的資源)
  • 參數2爲可讀事件回調函數
  • 參數3爲可寫事件回調函數

多進程編程少不了進程之間的通信,swoole 的進程之間有兩種通訊方式,一種是消息隊列(queue),另外一種是管道(pipe)。那麼本文使用的是 pipe 的方式。swoole

下面是一個定時向進程池投遞任務的例子。

代碼:socket

<?php
class ProcessPool{

    private $process;

    /**
     * Worker 進程數組
     * @var array
     */
    private $process_list = [];

    /**
     * 正在被使用的進程
     * @var array
     */
    private $process_use = [];

    /**
     * 最少進程數量
     * @var int
     */
    private $min_worker_num = 3;

    /**
     * 最多進程數量
     * @var int
     */
    private $max_worker_num = 6;

    /**
     * 當前進程數量
     * @var int
     */
    private $current_num;


    public function __construct()
    {
        $this->process = new swoole_process(array($this, 'run'), false, 2);
        $this->process->start();
        swoole_process::wait();
    }

    public function run()
    {
        $this->current_num = $this->min_worker_num;
        //建立全部的worker進程
        for($i = 0; $i < $this->current_num; $i++){
            $process = new swoole_process(array($this, 'task_run'), false, 2);
            $pid = $process->start();
            $this->process_list[$pid] = $process;
            $this->process_use[$pid] = 0;
        }

        foreach($this->process_list as $process){
            swoole_event_add($process->pipe, function ($pipe) use ($process){
                $data = $process->read();
                var_dump($data . '空閒');
                //接收子進程處理完成的信息,而且重置爲空閒
                $this->process_use[$data] = 0;
            });
        }

        //每秒定時向worker管道投遞任務
        swoole_timer_tick(1000 ,function ($timer_id){
            static $index = 0;
            $index = $index + 1;
            $flag = true; //是否新建worker
            foreach ($this->process_use as $pid => $used){
                if($used == 0){
                    $flag = false;
                    //標記爲正在使用
                    $this->process_use[$pid] = 1;
                    // 在父進程內調用write,子進程能夠調用read接收此數據
                    $this->process_list[$pid]->write($index. "hello");
                    break;
                }
            }

            if($flag && $this->current_num < $this->max_worker_num){
                //沒有閒置worker,新建worker來處理
                $process = new swoole_process(array($this, 'task_run'), false, 2);
                $pid = $process->start();
                $this->process_list[$pid] = $process;
                $this->process_use[$pid] = 1;
                $this->process_list[$pid]->write($index. "hello");
                $this->current_num++;
            }
            var_dump('第' .$index. '個任務');
            if($index == 10){
                foreach($this->process_list as $process){
                    $process->write("exit");
                }
                swoole_timer_clear($timer_id);
                $this->process->exit();
            }

        });
    }

    /**
     * 子進程處理
     * @param $worker
     */
    public function task_run($worker)
    {
        swoole_event_add($worker->pipe, function($pipe)use($worker){
            $data = $worker->read();
            var_dump($worker->pid . ':' . $data);
            if($data == 'exit'){
                $worker->exit();
                exit;
            }
            //模擬耗時任務
            sleep(5);
            //告訴主進程處理完成
            //在子進程內調用write,父進程能夠調用read接收此數據
            $worker->write($worker->pid);
        });
    }

}

new ProcessPool();

首先定義幾個重要的屬性:函數

  • $process_list :Worker 進程數組
  • $process_use:正在被使用的進程
  • $min_worker_num :最少進程數量
  • $max_worker_num :最多進程數量
  • $current_num :當前進程數量
  • $process : 主進程

在實例化的時候建立主進程,而且運行 run 方法,在 run 方法裏面先建立全部的 worker 進程,而且設置爲空閒狀態。oop

接着遍歷全部的 worker 進程,而且加入 EventLoop 中,設置可讀事件,用於接收子進程的空閒信號。this

最後每隔一秒向 worker 進程投遞任務。動態擴充進程池則在這裏實現,若是沒有閒置的進程,而此時又有新的任務,則須要動態建立一個新的進程而且置爲繁忙狀態。因爲只模擬了十次任務,則第十個任務完成以後在父進程中發送 exit 使全部子進程退出。spa

運行效果與圖解:

圖片描述

參考連接:

https://wiki.swoole.com/wiki/...
https://opso.coding.me/2018/0...

相關文章
相關標籤/搜索