場景:平常任務中,有時須要經過php腳本執行一些日誌分析,隊列處理等任務,當數據量比較大時,可使用多進程來處理。php
準備:php多進程須要pcntl,posix擴展支持,能夠經過 php - m 查看,沒安裝的話須要從新編譯php,加上參數--enable-pcntl,posix通常默認會有。
html
注意:node
多進程實現只能在cli模式下,在web服務器環境下,會出現沒法預期的結果,我測試報錯:
linuxCall to undefined function: pcntl_fork()
一個錯誤 pcntl_fork causing 「errno=32 Broken pipe」 #474 ,看https://github.com/phpredis/phpredis/issues/474git
注意兩點:若是是在循環中建立子進程,那麼子進程中最後要exit,防止子進程進入循環。
子進程中的打開鏈接不能拷貝,使用的仍是主進程的,須要用多例模式。github
pcntl_fork:web
一次調用兩次返回,在父進程中返回子進程pid,在子進程中返回0,出錯返回-1。redis
pcntl_wait ( int &$status [, int $options ] ):shell
阻塞當前進程,直到任意一個子進程退出或收到一個結束當前進程的信號,注意是結束當前進程的信號,子進程結束髮送的SIGCHLD不算。使用$status返回子進程的狀態碼,並能夠指定第二個參數來講明是否以阻塞狀態調用api
阻塞方式調用的,函數返回值爲子進程的pid,若是沒有子進程返回值爲-1;
非阻塞方式調用,函數還能夠在有子進程在運行但沒有結束的子進程時返回0。
/** 確保這個函數只能運行在SHELL中 */ if (substr(php_sapi_name(), 0, 3) !== 'cli') { die("cli mode only"); }
#!/bin/bash for((i=1;i<=8;i++)) do /usr/local/bin/php multiprocessTest.php & done wait
上面的shell程序,列了一個很簡單的多進程程序,用一個for循環,實現了8進程併發來跑multiprocessTest.php這個程序。最後的wait語句,也可使主進程,再等待全部進程都執行完後再往下執行的需求。
這個程序是沒有問題的,不少現有的代碼也都這樣實現,可是這個程序的併發數是不可控的,即咱們沒法根據機器的核數去調度每個進程的開關。
若咱們的機器有8核或者更多,上面的程序是沒有問題的,全部核都能充分利用,而且互相之間,沒有爭搶資源的狀況出現。
但咱們的機器要沒有8核的話會是什麼狀況,同一時間運行的進程數多於核數,那麼系統就會出現進程分配調度的問題,爭搶資源也跟着相應而來,一個進程不能保證獨立連續的執行,全部的進程運行會遵從系統的調度,這樣就會有更多的不肯定因素出現。
<?php //最大的子進程數量 $maxChildPro = 8; //當前的子進程數量 $curChildPro = 0; //當子進程退出時,會觸發該函數,當前子進程數-1 function sig_handler($sig) { global $curChildPro; switch ($sig) { case SIGCHLD: echo 'SIGCHLD', PHP_EOL; $curChildPro--; break; } } //配合pcntl_signal使用,簡單的說,是爲了讓系統產生時間雲,讓信號捕捉函數可以捕捉到信號量 declare(ticks = 1); //註冊子進程退出時調用的函數。SIGCHLD:在一個進程終止或者中止時,將SIGCHLD信號發送給其父進程。 pcntl_signal(SIGCHLD, "sig_handler"); while (true) { $curChildPro++; $pid = pcntl_fork(); if ($pid) { //父進程運行代碼,達到上限時父進程阻塞等待任一子進程退出後while循環繼續 if ($curChildPro >= $maxChildPro) { pcntl_wait($status); } } else { //子進程運行代碼 $s = rand(2, 6); sleep($s); echo "child sleep $s second quit", PHP_EOL; exit; } }
<?php $childs = array(); // Fork10個子進程 for ($i = 0; $i < 10; $i++) { $pid = pcntl_fork(); if ($pid == -1) die('Could not fork'); if ($pid) { echo "parent \n"; $childs[] = $pid; } else { // Sleep $i+1 (s). 子進程能夠獲得$i參數 sleep($i + 1); // 子進程須要exit,防止子進程也進入for循環 exit(); } } while (count($childs) > 0) { foreach ($childs as $key => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); //-1表明error, 大於0表明子進程已退出,返回的是子進程的pid,非阻塞時0表明沒取到退出子進程 if ($res == -1 || $res > 0) unset($childs[$key]); } sleep(1); }
<?php function _fetchLog() { $password = $this->_getPassword(); $online_log_path = NginxConf::getArchiveDir($this->_stat_day); $task_log_path = QFrameConfig::getConfig('LOG_PATH'); $children = array(); $success = true; foreach($this->_server_list as $host => $value) { $local_dir = $this->_prepareLocalDir($host); $task_log = "$task_log_path/fetch_log.$host"; $cmd = "sshpass -p $password rsync -av -e 'ssh -o StrictHostKeyChecking=no' $host:$online_log_path/* $local_dir >> $task_log 2>&1"; $pid = pcntl_fork(); if(-1 === $pid) { LogSvc::log('stat_pv_by_citycode_error', 'could not fork'); exit('could not fork'); } else if(0 === $pid) { system($cmd, $return_value); if(0 !== $return_value) { LogSvc::log('stat_pv_by_citycode_error', "rsync $host error"); } exit($return_value); } else { $children[$pid] = 1; } } while(!empty($children)) { $pid = pcntl_waitpid(-1, $status, WNOHANG); if(0 === $pid) { sleep(1); } else { if(0 !== pcntl_wexitstatus($status)) { $success = false; } unset($children[$pid]); } } return $success; }
posix_kill(posix_getpid(), SIGHUP); 爲本身生成SIGHUP信號
declare(ticks = 1); //php < 5.3
pcntl_signal()
函數僅僅是註冊信號和它的處理方法,真正接收到信號並調用其處理方法的是pcntl_signal_dispatch()
函數
必須在循環裏調用,爲了檢測是否有新的信號等待dispatching。
pcntl_signal_dispatch()
declare(ticks = 1);
表示每執行一條低級指令,就檢查一次信號,若是檢測到註冊的信號,就調用其信號處理器。
kill [PID]
命令,未加任何其餘參數的話,程序會接收到一個SIGTERM信號。
<?php // 定義一個處理器,接收到SIGINT信號後只輸出一行信息 function signalHandler($signal) { if ($signal == SIGINT) { echo 'SIGINT', PHP_EOL; } } // 信號註冊:當接收到SIGINT信號時,調用signalHandler()函數 pcntl_signal(SIGINT, 'signalHandler'); /** * PHP < 5.3 使用 * 配合pcntl_signal使用,表示每執行一條低級指令,就檢查一次信號,若是檢測到註冊的信號,就調用其信號處理器。 */ if (!function_exists("pcntl_signal_dispatch")) { declare(ticks=1); } while (true) { $s = sleep(10); echo $s, PHP_EOL; //信號會喚醒sleep,返回剩餘的秒數。 // do something for ($i = 0; $i < 5; $i++) { echo $i . PHP_EOL; usleep(100000); } /** * PHP >= 5.3 * 調用已安裝的信號處理器 * 必須在循環裏調用,爲了檢測是否有新的信號等待dispatching。 */ if (!function_exists("pcntl_signal_dispatch")) { pcntl_signal_dispatch(); } }
<?php declare(ticks = 1); function signal_handler($signal) { print "Caught SIGALRM\n"; pcntl_alarm(5); } pcntl_signal(SIGALRM, "signal_handler", true); pcntl_alarm(5); for(;;) { }
<?php /** * 父進程經過pcntl_wait等待子進程退出 * 子進程經過信號kill本身,也能夠在父進程中發送kil信號結束子進程 */ //生成子進程 $pid = pcntl_fork(); if($pid == -1){ die('could not fork'); }else{ if($pid){ $status = 0; //阻塞父進程,直到子進程結束,不適合須要長時間運行的腳本. //可以使用pcntl_wait($status, WNOHANG)實現非阻塞式 pcntl_wait($status); exit; }else{ //結束當前子進程,以防止生成殭屍進程 if(function_exists("posix_kill")){ posix_kill(getmypid(), SIGTERM); }else{ system('kill -9'. getmypid()); } exit; } }
(1) 父進程經過wait和waitpid等函數等待子進程結束,這會致使父進程掛起。它不適合子進程須要長時間運行的狀況(會致使超時)。
執行wait()或waitpid()系統調用,則子進程在終止後會當即把它在進程表中的數據返回給父進程,此時系統會當即刪除該進入點。在這種情形下就不會產生defunct進程。
(2) 若是父進程很忙,那麼能夠用signal函數爲SIGCHLD安裝handler。在子進程結束後,父進程會收到該信號,能夠在handler中調用wait回收。
(3) 若是父進程不關心子進程何時結束,那麼能夠用signal(SIGCLD, SIG_IGN)或signal(SIGCHLD, SIG_IGN)通知內核,本身對子進程的結束不感興趣,那麼子進程結束後,內核會回收,並再也不給父進程發送信號
(4)fork兩次,父進程fork一個子進程,而後繼續工做,子進程fork一個孫進程後退出,那麼孫進程被init接管,孫進程結束後,init會回收。不過子進程的回收還要本身作。
ps -A -ostat,ppid,pid,cmd | grep -e '^[Zz]'
命令註解:
-A 參數列出全部進程
-o 自定義輸出字段 咱們設定顯示字段爲 stat(狀態), ppid(進程父id), pid(進程id),cmd(命令)這四個參數
狀態爲 z或者Z 的進程爲殭屍進程,因此咱們使用grep抓取stat狀態爲zZ進程
運行結果以下:
這時,可使用 kill -HUP 5255 殺掉這個進程。若是再次查看殭屍進程還存在,能夠kill -HUP 5253(父進程)。
若是有多個殭屍進程,能夠經過
ps -A -ostat,ppid,pid,cmd | grep -e '^[Zz]'|awk 'print{$2}'|xargs kill -9
處理。
<?php static public function sendSDKMsg($version) {/*{{{*/ if(!self::sendRandChance(self::EA_LAST_TIME_KEY.":".$version)) return false; $fp = @fsockopen( "udp://".self::UDP_HOST , self::UDP_PORT , $errno ); if( !$fp ) return false; stream_set_timeout( $fp , 0 , 100 ); stream_set_blocking( $fp , 0 ); $sysinfo = posix_uname(); $msg = $version." - ".$sysinfo['nodename']." - ".date('Y-m-d H:i:s',time()); $res = fwrite( $fp , $msg ); fclose($fp); }/*}}}*/ static public function sendRandChance($key) {/*{{{*/ $now = microtime(true); if(function_exists("eaccelerator_get")) { $lastInserTime = eaccelerator_get($key); if(!$lastInserTime) $lastInserTime = 0; if( ($now - $lastInserTime) < self::SEND_INTERVAL ) return false; eaccelerator_put($key, $now); return true; }else if(function_exists("apc_fetch")) { $lastInserTime = apc_fetch($key); if(!$lastInserTime) $lastInserTime = 0; if( ($now - $lastInserTime) < self::SEND_INTERVAL ) return false; apc_store($key, $now); return true; } $rand = rand(1,60); if((time()%60 == $rand) && rand(0,20) == 3) { return true; } return false; }/*}}}*/