PHP使用QPM實現多進程並行任務處理程序

考慮用PHP實現如下場景: 有一個抓站的URL列表保存在隊列裏,後臺程序讀取這個隊列,而後轉交給子進程去抓取HTML存放到文件裏。 爲了提升效率,容許多任務並行執行,但爲了不機器負載太高,限制了最大的並行任務數(爲了測試方便,咱們把這個數設爲3),當隊列中取到 END標記時,程序結束運行。php

這個場景用QPM的Supervisor::taskFactoryMode()實現,很是簡單。html

QPM全名是 Quick Process Management Module for PHP. PHP 是強大的web開發語言,以致於你們經常忘記PHP 能夠用來開發健壯的命令行(CLI)程序以致於daemon程序。 而編寫daemon程序免不了與各類進程管理打交道。QPM正式爲簡化進程管理而開發的類庫。QPM的項目地址是:https://github.com/Comos/qpmgit

爲了,簡化測試環境,咱們能夠用一個文本文件來模擬隊列的數據。完整的例子文件看這裏:spider_task_factory_data.txtgithub

http://ent.ifeng.com/
http://news.sina.com.cn/
http://news.ifeng.com/
http://news.163.com/
http://news.sohu.com/
http://ent.sina.com.cn/
http://ent.ifeng.com/
...
END

使用QPM的taskFactoryMode以前,咱們須要準備一個TaskFactory類。 咱們將其命名爲 SpiderTaskFactory,SpdierTaskFactory 的工廠方法fetchTask 正常返回 Runnable的子類的實例。當碰到END或文件結束,則throw StopSignal,這樣程序就會終止。web

如下是組裝 Supervisor 並執行的代碼片斷。完整的例子見:spider_task_factory.php併發

//若是沒有從參數指定輸入,把spider_task_factory_data.txt做爲數據源

$input = isset($argv[1]) ? $argv[1] : __DIR__.'/spider_task_factory_data.txt';
$spiderTaskFactory = new SpiderTaskFactory($input);$config = [
    //指定taskFactory對象和工廠方法
    'factoryMethod'=>[$spiderTaskFactory, 'fetchTask'],
    //指定最大併發數量爲3
    'quantity' => 3,
];
//啓動Supervisor
qpm\supervisor\Supervisor::taskFactoryMode($config)->start();

SpiderTaskFactory 的實現以下:框架

/**
 * 任務工廠,必須實現 fetchTask方法。
 * 該方法正常返回
 * 
*/class SpiderTaskFactory {
    private $_fh;
    public function __construct($input) {
        $this->_input = $input;
        $this->_fh = fopen($input, 'r');
        if ($this->_fh === false) {
            throw new Exception('fopen failed:'.$input);
        }
    }
    public function fetchTask() {
      while (true) {
        if (feof($this->_fh)) {
                throw new qpm\supervisor\StopSignal();
        }
        $line = trim(fgets($this->_fh));
        if ($line == 'END') {
            throw new qpm\supervisor\StopSignal();
        }
        if (empty($line)) {
            continue;
        }
        break;
      }
      return new SpiderTask($line);
    }
}

SpiderTask 的實現以下:ide

/**
 * 在子進程中執行任務的類
 * 必須實現 qpm\process\Runnable 接口
 */
 class SpiderTask implements qpm\process\Runnable {
    private $_target;
    public function __construct($target) {
      $this->_target = $target;
    }
    
    //在子進程中執行的部分
    public function run() {
      $r = @file_get_contents($this->_target);    
      if ($r===false) {        
        throw new Exception('fail to crawl url:'.$this->_target);
      }
      file_put_contents($this->getLocalFilename(), $r);   
    }
    private function getLocalFilename() {
      $filename = str_replace('/', '~', $this->_target);    
      $filename = str_replace(':', '_', $filename);    
      $filename = $filename.'-'.date('YmdHis');    
      return __DIR__.'/_spider/'.$filename.'.html';
    }
}

 真實的生產環境,用隊列替換文件輸入,便可實現持久運行的生產者/消費者模型的程序。測試

關於 QPM的使用,能夠參考:fetch

QPM介紹

安裝和使用PHP進程管理框架 QPM
使用qpm建立daemon程序
使用QPM編寫PHP 多進程程序
 PHP使用QPM實現多進程並行任務處理程序
相關文章
相關標籤/搜索