中國廣東省深圳市龍華新區民治街道溪山美地
518131
+86 13113668890
+86 755 29812080
<netkiller@msn.com>php
版權 © 2014 http://netkiller.github.iohtml
版權聲明python
轉載請與做者聯繫,轉載時請務必標明文章原始出處和做者信息及本聲明。mysql
|
|
|
微信掃描二維碼進入 Netkiller 微信訂閱號linux QQ羣:128659835 請註明「讀者」nginx |
2014-09-01git
摘要github
2014-09-01 發表sql
2015-08-31 更新shell
2015-10-20 更新,增長優雅重啓
個人系列文檔
您可使用iBook閱讀當前文檔
目錄
守護進程是脫離於終端而且在後臺運行的進程。守護進程脫離於終端是爲了不進程在執行過程當中的信息在任何終端上顯示而且進程也不會被任何終端所產生的終端信息所打斷。
例如 apache, nginx, mysql 都是守護進程
不少程序以服務形式存在,他沒有終端或UI交互,它可能採用其餘方式與其餘程序交互,如TCP/UDP Socket, UNIX Socket, fifo。程序一旦啓動便進入後臺,直到知足條件他便開始處理任務。
以我當前的需求爲例,我須要運行一個程序,而後監聽某端口,持續接受服務端發起的數據,而後對數據分析處理,再將結果寫入到數據庫中; 我採用ZeroMQ實現數據收發。
若是我不採用守護進程方式開發該程序,程序一旦運行就會佔用當前終端窗框,還有受到當前終端鍵盤輸入影響,有可能程序誤退出。
咱們但願程序在非超級用戶運行,這樣一旦因爲程序出現漏洞被駭客控制,攻擊者只能繼承運行權限,而沒法得到超級用戶權限。
咱們但願程序只能運行一個實例,不運行同事開啓兩個以上的程序,由於會出現端口衝突等等問題。
例 1. 多線程守護進程例示
<?php class ExampleWorker extends Worker { #public function __construct(Logging $logger) { # $this->logger = $logger; #} #protected $logger; protected static $dbh; public function __construct() { } public function run(){ $dbhost = '192.168.2.1'; // 數據庫服務器 $dbport = 3306; $dbuser = 'www'; // 數據庫用戶名 $dbpass = 'qwer123'; // 數據庫密碼 $dbname = 'example'; // 數據庫名 self::$dbh = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array( /* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */ PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; } } /* the collectable class implements machinery for Pool::collect */ class Fee extends Stackable { public function __construct($msg) { $trades = explode(",", $msg); $this->data = $trades; print_r($trades); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); $insert = "INSERT INTO fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')"; $sth = $dbh->prepare($insert); $sth->bindValue(':ticket', $this->data[0]); $sth->bindValue(':login', $this->data[1]); $sth->bindValue(':volume', $this->data[2]); $sth->execute(); $sth = null; /* ...... */ $update = "UPDATE fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'"; $sth = $dbh->prepare($update); $sth->bindValue(':ticket', $this->data[0]); $sth->execute(); //echo $sth->queryString; //$dbh = null; } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } } } class Example { /* config */ const LISTEN = "tcp://192.168.2.15:5555"; const MAXCONN = 100; const pidfile = __CLASS__; const uid = 80; const gid = 80; protected $pool = NULL; protected $zmq = NULL; public function __construct() { $this->pidfile = '/var/run/'.self::pidfile.'.pid'; } private function daemon(){ if (file_exists($this->pidfile)) { echo "The file $this->pidfile exists.\n"; exit(); } $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie children exit($pid); } else { // we are the child file_put_contents($this->pidfile, getmypid()); posix_setuid(self::uid); posix_setgid(self::gid); return(getmypid()); } } private function start(){ $pid = $this->daemon(); $this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []); $this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP); $this->zmq->bind(self::LISTEN); /* Loop receiving and echoing back */ while ($message = $this->zmq->recv()) { //print_r($message); //if($trades){ $this->pool->submit(new Fee($message)); $this->zmq->send('TRUE'); //}else{ // $this->zmq->send('FALSE'); //} } $pool->shutdown(); } private function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, 9); unlink($this->pidfile); } } private function help($proc){ printf("%s start | stop | help \n", $proc); } public function main($argv){ if(count($argv) < 2){ printf("please input help parameter\n"); exit(); } if($argv[1] === 'stop'){ $this->stop(); }else if($argv[1] === 'start'){ $this->start(); }else{ $this->help($argv[0]); } } } $cgse = new Example(); $cgse->main($argv);
例 2. 消息隊列與守護進程
<?php declare(ticks = 1); require_once( __DIR__.'/autoload.class.php' ); umask(077); class EDM { protected $queue; public function __construct() { global $argc, $argv; $this->argc = $argc; $this->argv = $argv; $this->pidfile = $this->argv[0].".pid"; $this->config = new Config('mq'); $this->logging = new Logging(__DIR__.'/log/'.$this->argv[0].'.'.date('Y-m-d').'.log'); //.H:i:s //print_r( $this->config->getArray('mq') ); //pcntl_signal(SIGHUP, array(&$this,"restart")); } protected function msgqueue(){ $exchangeName = 'email'; //交換機名 $queueName = 'email'; //隊列名 $routeKey = 'email'; //路由key //建立鏈接和channel $connection = new AMQPConnection($this->config->getArray('mq')); if (!$connection->connect()) { die("Cannot connect to the broker!\n"); } $this->channel = new AMQPChannel($connection); $this->exchange = new AMQPExchange($this->channel); $this->exchange->setName($exchangeName); $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $this->exchange->setFlags(AMQP_DURABLE); //持久化 $this->exchange->declare(); //echo "Exchange Status:".$this->exchange->declare()."\n"; //建立隊列 $this->queue = new AMQPQueue($this->channel); $this->queue->setName($queueName); $this->queue->setFlags(AMQP_DURABLE); //持久化 $this->queue->declare(); //echo "Message Total:".$this->queue->declare()."\n"; //綁定交換機與隊列,並指定路由鍵 $bind = $this->queue->bind($exchangeName, $routeKey); //echo 'Queue Bind: '.$bind."\n"; //阻塞模式接收消息 while(true){ //$this->queue->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 $this->queue->consume(function($envelope, $queue) { $msg = $envelope->getBody(); $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答 $this->logging->info('('.'+'.')'.$msg); //$this->logging->debug("Message Total:".$this->queue->declare()); }); $this->channel->qos(0,1); //echo "Message Total:".$this->queue->declare()."\n"; } $conn->disconnect(); } protected function start(){ if (file_exists($this->pidfile)) { printf("%s already running\n", $this->argv[0]); exit(0); } $this->logging->warning("start"); $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { //pcntl_wait($status); //等待子進程中斷,防止子進程成爲殭屍進程。 exit(0); } else { posix_setsid(); //printf("pid: %s\n", posix_getpid()); file_put_contents($this->pidfile, posix_getpid()); //posix_kill(posix_getpid(), SIGHUP); $this->msgqueue(); } } protected function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, SIGTERM); //posix_kill($pid, SIGKILL); unlink($this->pidfile); $this->logging->warning("stop"); }else{ printf("%s haven't running\n", $this->argv[0]); } } protected function restart(){ $this->stop(); $this->start(); } protected function status(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); printf("%s already running, pid = %s\n", $this->argv[0], $pid); }else{ printf("%s haven't running\n", $this->argv[0]); } } protected function usage(){ printf("Usage: %s {start | stop | restart | status}\n", $this->argv[0]); } public function main(){ //print_r($this->argv); if($this->argc != 2){ $this->usage(); }else{ if($this->argv[1] == 'start'){ $this->start(); }else if($this->argv[1] == 'stop'){ $this->stop(); }else if($this->argv[1] == 'restart'){ $this->restart(); }else if($this->argv[1] == 'status'){ $this->status(); }else{ $this->usage(); } } } } $edm = New EDM(); $edm->main();
下面是程序啓動後進入後臺的代碼
經過進程ID文件來判斷,當前進程狀態,若是進程ID文件存在表示程序在運行中,經過代碼file_exists($this->pidfile)實現,但然後進程被kill須要手工刪除該文件才能運行
private function daemon(){ if (file_exists($this->pidfile)) { echo "The file $this->pidfile exists.\n"; exit(); } $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie children exit($pid); } else { // we are the child file_put_contents($this->pidfile, getmypid()); posix_setuid(self::uid); posix_setgid(self::gid); return(getmypid()); } }
程序啓動後,父進程會推出,子進程會在後臺運行,子進程權限從root切換到指定用戶,同時將pid寫入進程ID文件。
程序中止,只需讀取pid文件,而後調用posix_kill($pid, 9); 最後將該文件刪除。
private function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, 9); unlink($this->pidfile); } }
全部線程共用數據庫鏈接,在多線程中這個很是重要,若是每一個線程創建以此數據庫鏈接在關閉,這對數據庫的開銷是巨大的。
protected function getInstance(){ return self::$dbh; }
所謂優雅重啓是指進程不退出的狀況加實現從新載入包含重置變量,刷新配置文件,重置日誌等等
stop/start 或者 restart都會退出進程,從新啓動,致使進程ID改變,同時瞬間退出致使業務閃斷。因此不少守護進程都會提供一個reload功能,者就是所謂的優雅重啓。
reload 實現原理是給進程發送SIGHUP信號,能夠經過kill命令發送 kill -s SIGHUP 64881,也能夠經過庫函數實現 posix_kill(posix_getpid(), SIGUSR1);
<?php pcntl_signal(SIGTERM, function($signo) { echo "\n This signal is called. [$signo] \n"; Status::$state = -1; }); pcntl_signal(SIGHUP, function($signo) { echo "\n This signal is called. [$signo] \n"; Status::$state = 1; Status::$ini = parse_ini_file('test.ini'); }); class Status{ public static $state = 0; public static $ini = null; } $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } if($pid) { // parent } else { $loop = true; Status::$ini = parse_ini_file('test.ini'); while($loop) { print_r(Status::$ini); while(true) { // Dispatching... pcntl_signal_dispatch(); if(Status::$state == -1) { // Do something and end loop. $loop = false; break; } if(Status::$state == 1) { printf("This program is reload.\r\n"); Status::$state = 0; break; } echo '.'; sleep(1); } echo "\n"; } echo "Finish \n"; exit(); }
建立配置文件
[root@netkiller pcntl]# cat test.ini [db] host=192.168.0.1 port=3306
測試方法,首先運行該守護進程
# php signal.reload.php Array ( [host] => 192.168.0.1 [port] => 3306 )
如今修改配置文件,增長user=test配置項
[root@netkiller pcntl]# cat test.ini [db] host=192.168.0.1 port=3306 user=test
發送信號,在另外一個終端窗口,經過ps命令找到該進程的PID,而後使用kill命令發送SIGHUP信號,而後再經過ps查看進程,你會發現進程PID沒有改變
[root@netkiller pcntl]# ps ax | grep reload 64881 pts/0 S 0:00 php -c /srv/php/etc/php-cli.ini signal.reload.php 65073 pts/1 S+ 0:00 grep --color=auto reload [root@netkiller pcntl]# kill -s SIGHUP 64881 [root@netkiller pcntl]# ps ax | grep reload 64881 pts/0 S 0:00 php -c /srv/php/etc/php-cli.ini signal.reload.php 65093 pts/1 S+ 0:00 grep --color=auto reload
配置文件被從新載入
This signal is called. [1] This program is reload. Array ( [host] => 192.168.0.1 [port] => 3306 [user] => test )
優雅重啓完成。
<?php /* * PHP Daemon sample. * Home: http://netkiller.github.io * Author: netkiller<netkiller@msn.com> * */ class Logger { public function __construct(/*Logging $logger*/) { } public function logger($type, $message) { $log = sprintf ( "%s\t%s\t%s\n", date ( 'Y-m-d H:i:s' ), $type, $message ); file_put_contents ( sprintf(__DIR__."/../log/sender.%s.log", date ( 'Y-m-d' )), $log, FILE_APPEND ); } } final class Signal{ public static $signo = 0; protected static $ini = null; public static function set($signo){ self::$signo = $signo; } public static function get(){ return(self::$signo); } public static function reset(){ self::$signo = 0; } } class Test extends Logger { //public static $signal = null; public function __construct() { //self::$signal == null; } public function run(){ while(true){ pcntl_signal_dispatch(); printf("."); sleep(1); if(Signal::get() == SIGHUP){ Signal::reset(); break; } } printf("\n"); } } class Daemon extends Logger { /* config */ const LISTEN = "tcp://192.168.2.15:5555"; const pidfile = __CLASS__; const uid = 80; const gid = 80; const sleep = 5; protected $pool = NULL; protected $config = array(); public function __construct($uid, $gid, $class) { $this->pidfile = '/var/run/'.basename(get_class($class), '.php').'.pid'; //$this->config = parse_ini_file('sender.ini', true); //include_once(__DIR__."/config.php"); $this->uid = $uid; $this->gid = $gid; $this->class = $class; $this->classname = get_class($class); $this->signal(); } public function signal(){ pcntl_signal(SIGHUP, function($signo) /*use ()*/{ //echo "\n This signal is called. [$signo] \n"; printf("The process has been reload.\n"); Signal::set($signo); }); } private function daemon(){ if (file_exists($this->pidfile)) { echo "The file $this->pidfile exists.\n"; exit(); } $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie children exit($pid); } else { file_put_contents($this->pidfile, getmypid()); posix_setuid(self::uid); posix_setgid(self::gid); return(getmypid()); } } private function run(){ while(true){ printf("The process begin.\n"); $this->class->run(); printf("The process end.\n"); } } private function foreground(){ $this->run(); } private function start(){ $pid = $this->daemon(); for(;;){ $this->run(); sleep(self::sleep); } } private function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, 9); unlink($this->pidfile); } } private function reload(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); //posix_kill(posix_getpid(), SIGHUP); posix_kill($pid, SIGHUP); } } private function status(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); system(sprintf("ps ax | grep %s | grep -v grep", $pid)); } } private function help($proc){ printf("%s start | stop | restart | status | foreground | help \n", $proc); } public function main($argv){ if(count($argv) < 2){ $this->help($argv[0]); printf("please input help parameter\n"); exit(); } if($argv[1] === 'stop'){ $this->stop(); }else if($argv[1] === 'start'){ $this->start(); }else if($argv[1] === 'restart'){ $this->stop(); $this->start(); }else if($argv[1] === 'status'){ $this->status(); }else if($argv[1] === 'foreground'){ $this->foreground(); }else if($argv[1] === 'reload'){ $this->reload(); }else{ $this->help($argv[0]); } } } $daemon = new Daemon(80,80, new Test()); $daemon->main($argv); ?>
若是是很是重要的進程,必需要保證程序正常運行,一旦出現任何異常退出,都須要作即時作處理。下面的程序可能檢查進程是否異常退出,若是退出便當即啓動。
#!/bin/sh LOGFILE=/var/log/$(basename $0 .sh).log PATTERN="my.php" RECOVERY="/path/to/my.php start" while true do TIMEPOINT=$(date -d "today" +"%Y-%m-%d_%H:%M:%S") PROC=$(pgrep -o -f ${PATTERN}) #echo ${PROC} if [ -z "${PROC}" ]; then ${RECOVERY} >> $LOGFILE echo "[${TIMEPOINT}] ${PATTERN} ${RECOVERY}" >> $LOGFILE #else #echo "[${TIMEPOINT}] ${PATTERN} ${PROC}" >> $LOGFILE fi sleep 5 done &