PHP 命令行方式實現異步多進程模式的任務處理

用PHP來實現異步任務一直是個難題,現有的解決方案中:PHP知名的異步框架有 swooleWorkerman,但都是沒法在 web 環境中直接使用的,即使強行搭建 web 環境,異步調用也是使用 多進程模式實現的。但有時真的不須要用啓動服務的方式,讓服務端一直等待客戶端消息,況且中間還不能改動服務端代碼。本文就介紹一下不使用任何框架和第三方庫的狀況下,在 CLI 環境中如何實現 多進程以及在 web環境中的異步調用。

web 環境的異步調用

經常使用的方式有兩種php

1. 使用 socket 鏈接

這種方式就是典型的C/S架構,須要有服務端支持。web

// 1. 建立socket套接字
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 2. 進行socket鏈接
socket_connect($socket, '127.0.0.1', '3939');
//socket_set_nonblock($socket); // 以非阻塞模式運行,因爲在客戶端不實用,因此這裏不考慮
// 3. 向服務端發送請求
socket_write($socket, $request, strlen($request));
// 4. 接受服務端的迴應消息(忽略非阻塞的狀況,若是服務端不是提供異步服務,那這一步能夠省略)
$recv = socket_read($socket, 2048);
// 5. 關閉socket鏈接
socket_close($socket);

2. 使用 popen 打開進程管道

這種方式是使用操做系統命令,由操做系統直接執行。
本文討論的異步調用就是使用這種方式。redis

$sf = '/path/to/cli_async_task.php'; //要執行的腳本文件
$op = 'call'; //腳本文件接收的參數1
$data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //腳本文件接收的參數2
pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打開以後接着就關閉進程管道,讓該進程以守護模式運行
echo PHP_EOL.'異步任務已執行。'.PHP_EOL;

這種方式的優勢就是:一步解決,當前進程不須要任何開銷。
缺點也很明顯:沒法跟蹤任務腳本的運行狀態。
因此重頭戲會是在執行任務的腳本文件上,下面就介紹任務處理和多進程的實現方式。json

CLI 環境的多進程任務處理

注意:多進程模式僅支持Linux,不支持Windows!!數組

這裏會從0開始(未使用任何框架和類庫)介紹每個步驟,最後會附帶一份完整的代碼緩存

1. 建立腳本

  • 任何腳本不可忽視的地方就是錯誤處理。因此寫一個任務處理腳本首先就是寫錯誤處理方式。

在PHP中就是調用 set_exception_handler set_error_handler register_shutdown_function 這三個函數,而後寫上自定義的處理方法。swoole

  • 接着是定義自動加載函數 spl_autoload_register 免去每使用一個新類都要 require / include 的煩惱。
  • 定義日誌操做方法。
  • 定義任務處理方法。
  • 讀取來自命令行的參數,開始執行任務。

2. 多進程處理

PHP 建立多進程是使用 pcntl_fork 函數,該函數會 fork 一份當前進程(影分身術),因而就有了兩個進程,當前進程是主進程(本體),fork 出的進程是子進程(影分身)。須要注意的是兩個進程代碼環境是同樣的,兩個進程都是執行到了 pcntl_fork 函數位置。區別就是 getmypid 得到的進程號不同,最重要的區分是當調用 pcntl_fork函數時,子進程得到的返回值是 0,而主進程得到的是子進程的進程號 pid架構

好了,當咱們知道誰是子進程後,就可讓該子進程執行任務了。app

那麼主進程是如何得知子進程的狀態呢?
使用 pcntl_wait。該函數有兩個參數 $status$options$status 是引用類型,用來存儲子進程的狀態,$options 有兩個可選常量WNOHANG| WUNTRACED,分別表示不等待子進程結束當即返回和等待子進程結束。很明顯使用WUNTRACED會阻塞主進程。(也可使用 pcntl_waitpid 函數獲取特定 pid 子進程狀態)框架

在多進程中,主進程要作的就是管理每一個子進程的狀態,不然子進程極可能沒法退出而變成殭屍進程。

關於多進程間的消息通訊
這一塊須要涉及具體的業務邏輯,因此只能簡單的提一下。不考慮使用第三方好比 redis 等服務的狀況下,PHP原生能夠實現就是管道通訊共享內存等方式。實現起來都比較簡單,缺點就是可以使用的數據容量有限,只能用簡單文本協議交換數據。

如何手動結束全部進程任務
若是多進程處理不當,極可能致使進程任務卡死,甚至佔用過多系統資源,此時只能手動結束進程。
除了一個個的根據進程號來結束,還有一個快速的方法是首先在任務腳本里自定義進程名稱,就是調用cli_set_process_title函數,而後在命令行輸入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9 (裏面的 cli_async_worker 就是自定義的進程名稱),這樣就能夠快速結束多進程任務了。


未完待續...


如下是完整的任務執行腳本代碼:
可能沒法直接使用,須要修改的地方有:

  1. 腳本目錄和日誌目錄常量
  2. 自動加載任務類的方法(默認是加載腳本目錄中以Task結尾的文件)
  3. 其餘的如:錯誤和日誌處理方式和文本格式就隨意吧...
  4. 若是命名管道文件設置有錯誤,可能致使進程假死,你可能須要手動刪除進程管道通訊的代碼。
  5. 多進程的例子:execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);。執行狀況能夠在日誌文件中查看。execAsyncTask函數參考【__使用popen打開進程管道__】。
<?php

error_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);
@ini_set('display_errors', 0);
@ini_set('date.timezone', 'PRC');

chdir(__DIR__);

/* 任務腳本目錄 */
defined('TASK_PATH') or define('TASK_PATH', realpath(__DIR__ .'/tasks'));
/* 任務日誌目錄 */
defined('TASK_LOGS_PATH') or define('TASK_LOGS_PATH', __DIR__ .'/tasks/logs');

if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);

set_exception_handler(function($e) {
    $time = date('H:i:s', time());
    $msg = sprintf(''. '<h3>[%s] %s (%s)</h3>'. "\n". '<pre>%s</pre>',
        $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString()
    );
    file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX);
});
set_error_handler(function($errno, $errmsg, $filename, $line) {
    if (!(error_reporting() & $errno)) return;
    ob_start();
    debug_print_backtrace();
    $backtrace = ob_get_contents(); ob_end_clean();
    $datetime = date('Y-m-d H:i:s', time());
    $msg = <<<EOF
[{$errno}]
時間:{$datetime}
信息:{$errmsg}
文件:{$filename}
行號:{$line}
追蹤:
{$backtrace}

EOF;
    file_put_contents(TASK_LOGS_PATH .'/error-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX);
});
register_shutdown_function(function() {
    $last_error = error_get_last();
    if (in_array($last_error['type'], array(E_ERROR, E_WARNING, E_USER_ERROR))) {
        //
    }
    debug_log('End.', true);
});

function debug_log($log, $close=false) {
    static $fp;
    if (!$fp) {
        $fp = fopen(TASK_LOGS_PATH .'/debug-'.date('Ym').'.log', 'a+');
    }
    $log = '['. date('Y-m-d H:i:s') .'] [Task@'. getmypid() . '] ' . trim($log) . PHP_EOL;
    if (flock($fp, LOCK_EX)) {
        fwrite($fp, $log);
        fflush($fp);
        flock($fp, LOCK_UN);
    } else {
        //
    }
    if ($close) fclose($fp);
}

function call($job) {
    if (is_callable($job)) {
        $ret = call_user_func($job);
    } elseif (is_array($job) and is_callable(@$job[0])) {
        $ret = call_user_func_array($job[0], array_slice($job, 1));
    } else throw new \Exception('不是可執行的任務!');
    return $ret;
}

function grab(array $job) {
    /* 消息數據爲json,格式
    {
        "url":"fetch_url", //拉取的連接地址
        "method":"request_method", //請求方法
        "data":"post_data", //POST請求數據
        "args":[], //請求附加參數 headers|user_agent|proxy|timeout
        "callback":"callback_url", //回調地址(統一POST帶回應數據)
        "msg_id": "message_id" //消息ID
    }*/
    $url = $job['url'];
    $headers = @$job['args']['headers'] ?: [];
    $_headers = '';
    if (is_array($headers)) {
        foreach ($headers as $_k => $header) {
            if (!is_numeric($_k)) 
                $header = sprintf('%s: %s', $_k, $header);
            $_headers .= $header . "\r\n";
        }
    }
    $headers = "Connection: close\r\n" . $_headers;
    $opts = array(
        'http' => array(
            'method' => strtoupper(@$job['method'] ?: 'get'),
            'content' => @$job['data'] ?: null,
            'header' => $headers,
            'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)',
            'proxy' => @$job['args']['proxy'] ?: null,
            'timeout' => intval(@$job['args']['timeout'] ?: 120),
            'protocol_version' => @$job['args']['protocol_version'] ?: '1.1',
            'max_redirects' => 3,
            'ignore_errors' => true
        )
    );
    $ret = @file_get_contents($url, false, stream_context_create($opts));
    //debug_log($url.' -->'.strlen($ret));
    if ($ret and isset($job['callback'])) {
        $postdata = http_build_query(array(
                'msg_id' => @$job['msg_id'] ?: 0,
                'url' => @$job['url'],
                'result' => $ret
            ));
        $opts = array(
            'http' => array(
                'method' => 'POST',
                'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n",
                'content' => $postdata,
                'timeout' => 30
            )
        );
        file_get_contents($job['callback'], false, stream_context_create($opts));
        //debug_log(json_encode(@$http_response_header));
        //debug_log($job['callback'].' -->'.$ret2);
    }
    
    return $ret;
}

function clean($tmpdirs, $expires=3600*24*7) {
    $ret = [];
    foreach ((array)$tmpdirs as $tmpdir) {
        $ret[$tmpdir] = 0;
        foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) {
            if (fileatime($_file) < (time()-$expires)) {
                if (@unlink($_file)) $ret[$tmpdir]++;
            }
        }
    }
    return $ret;
}

function backup($file, $dest) {
    $zip = new \ZipArchive();
    if (!$zip->open($file, \ZipArchive::CREATE)) {
        return false;
    }
    _backup_dir($zip, $dest);
    
    $zip->close();
    return $file;
}
function _backup_dir($zip, $dest, $sub='') {
    $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
    $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
    $dir = opendir($dest);
    if (!$dir) return false;
    while (false !== ($file = readdir($dir))) {
        if (is_file($dest . $file)) {
            $zip->addFile($dest . $file, $sub . $file);
        } else {
            if ($file != '.' and $file != '..' and is_dir($dest . $file)) {
                //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR);
                _backup_dir($zip, $dest . $file, $file);
            }
        }
    }
    closedir($dir);
    return true;
}


function execute_task($op, $data) {
    debug_log('Start...');
    $t1 = microtime(true);
    switch($op) {
    case 'call': //執行任務腳本類
        $cmd = $data;
        if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd;
        elseif (is_array($cmd)) {
            if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0];
        }
        $ret = call($cmd);
        break;
    case 'grab': //抓取網頁
        if (is_string($data)) $data = ['url' => $data];
        if (is_array($data)) $ret = grab($data);
        else throw new \Exception('無效的命令參數!');
        break;
    case 'clean': //清理緩存文件夾:dirs 須要清理的文件夾列表,expires 過時時間(秒,默認7天)
        if (isset($data['dirs'])) {
            $ret = clean($data['dirs'], @$data['expires']);
        } else {
            $ret = clean($data);
        }
        break;
    case 'backup': //備份文件:zip 備份到哪一個zip文件,dest 須要備份的文件夾
        if (isset($data['zip']) and is_dir($data['dest']))
            $ret = backup($data['zip'], $data['dest']);
        else
            throw new \Exception('沒有指定須要備份的文件!');
        break;
    case 'require': //加載腳本文件
        if (is_file($data)) $ret = require($data);
        else throw new \Exception('不是可請求的文件!');
        break;
    case 'test':
        sleep(rand(1, 5));
        $ret = ucfirst(strval($data)). '.PID:'. getmypid();
        break;
    case 'multi': //多進程處理模式
        $results = $childs = [];
        $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid();
        if (!file_exists($fifo)) {
            if (!posix_mkfifo($fifo, 0666)) { //開啓進程數據通訊管道
                throw new Exception('make pipe failed!');
            }
        }
        //$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享內存
        //shmop_write($shmid, serialize([]), 0);
        //$data = unserialize(shmop_read($shmid, 0, 4096));
        //shmop_delete($shmid);
        //shmop_close($shmid);
        foreach($data as $_op => $_datas) {
            $_datas = (array)$_datas; //data 格式爲數組表示一個 op 有多個執行數據
            foreach($_datas as $_data) {
                $pid = pcntl_fork();
                if ($pid == 0) { //子進程中執行任務
                    $_ret = execute_task($_op, $_data);
                    $_pid = getmypid();
                    $pipe = fopen($fifo, 'w'); //寫
                    //stream_set_blocking($pipe, false);
                    $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]);
                    if (strlen($_ret) > 4096) //寫入管道的數據最大4K
                        $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']);
                    //debug_log('write pipe: '.$_ret);
                    fwrite($pipe, $_ret.PHP_EOL);
                    fflush($pipe);
                    fclose($pipe);
                    exit(0); //退出子進程
                } elseif ($pid > 0) { //主進程中記錄任務
                    $childs[] = $pid;
                    $results[$pid] = 0;
                    debug_log('fork by child: '.$pid);
                    //pcntl_wait($status, WNOHANG);
                } elseif ($pid == -1) {
                    throw new Exception('could not fork at '. getmygid());
                }
            }
        }
        $pipe = fopen($fifo, 'r+'); //讀
        stream_set_blocking($pipe, true); //阻塞模式,PID與讀取的管道數據可能會不一致。
        $n = 0;
        while(count($childs) > 0) {
            foreach($childs as $i => $pid) {
                $res = pcntl_waitpid($pid, $status, WNOHANG);
                if (-1 == $res || $res > 0) {
                    $_ret = @unserialize(fgets($pipe)); //讀取管道數據
                    $results[$pid] = $_ret;
                    unset($childs[$i]);
                    debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256));
                }
                if ($n > 1000) posix_kill($pid, SIGTERM); //超時(10分鐘)結束子進程
            }
            usleep(200000); $n++;
        }
        debug_log('child process completed.');
        @fclose($pipe);
        @unlink($fifo);
        $ret = json_encode($results, 64|256);
        break;
    default:
        throw new \Exception('沒有可執行的任務!');
        break;
    }
    $t2 = microtime(true);
    $times = round(($t2 - $t1) * 1000, 2);
    $log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op), 
        @json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times);
    debug_log($log);
    return $ret;
}


// 讀取 CLI 命令行參數
$params = getopt('', array('op:', 'data:'));
$op = $params['op'];
$data = unserialize(base64_decode($params['data']));
// 開始執行任務
execute_task($op, $data);



function __autoload($classname) {
    $parts = explode('\\', ltrim($classname, '\\'));
    if (false !== strpos(end($parts), '_')) {
        array_splice($parts, -1, 1, explode('_', current($parts)));
    }
    $filename = implode(DIRECTORY_SEPARATOR, $parts) . '.php';
    if ($filename = stream_resolve_include_path($filename)) {
        include $filename;
    } else if (preg_match('/.*Task$/', $classname)) { //查找以Task結尾的任務腳本類
        include TASK_PATH . DIRECTORY_SEPARATOR . $classname . '.php';
    } else {
        return false;
    }
}
相關文章
相關標籤/搜索