[本文出自天外歸雲的博客園]app
Python環境:Python3.6spa
ProcessManager參數日誌
logger:本身的日誌輸入loggercode
targets:包含目標字符串(目標cmd的一部分,能夠用來識別cmd的字符串)的列表orm
duration:目標進程的最大存活時間,單位分鐘,超過就會被殺掉blog
程序運行後每隔60s檢查一次進程
代碼以下:字符串
import psutil from os import system as run_cmd from os import name as system_name from time import time, sleep import logging from os.path import dirname, abspath, join def get_logger(): log_dir = abspath(dirname(__file__)) log_name = f"{__file__}.log" log_path = join(log_dir, log_name) formatter = logging.Formatter('%(asctime)s %(message)s') _logger = logging.getLogger() for h in [logging.FileHandler(log_path), logging.StreamHandler()]: h.setFormatter(formatter) _logger.addHandler(h) _logger.setLevel(logging.INFO) return _logger my_logger = get_logger() def every(duration: int): def keep_run(func): def wrapper(*args, **kwargs): while True: sleep(duration) print("KEEP RUNNING") func(*args, **kwargs) return wrapper return keep_run """ ProcessManager Keep checking process with target cmd every 60 seconds If process with target cmd found and lives longer than duration Just kill it """ class ProcessManager: def __init__(self, logger, targets, duration): """ :param logger: logging.getLogger() :param targets: [strA, strB, ...] :param duration: int """ self.log = logger self.targets = targets self.duration = duration def p_kill(self, pid): try: if system_name == "nt": run_cmd(f"taskkill /pid {pid}") else: run_cmd(f"kill -9 {pid}") self.log.info(f"[PROC KILL] {pid}") except Exception as e: self.log.info(f"[PROC KILL EXP] {e}") def is_target_cmd(self, cmd): for target in self.targets: if target in cmd: self.log.info(f"[TARGET FOUND] {cmd}") return True return False def need_kill(self, p: psutil.Process): duration = abs(int(p.create_time())-int(time()))/60 if int(duration) > self.duration: self.log.info(f"[NEED KILL] {p.pid} with duration: {duration}") return True return False @every(60) def kill_dead_processes(self): for p in psutil.process_iter(): try: cmds = p.cmdline() if len(cmds) > 0: for cmd in cmds: if self.is_target_cmd(cmd) and self.need_kill(p): self.p_kill(p.pid) except Exception as e: if type(e) == psutil.AccessDenied: pass elif type(e) == psutil.ZombieProcess: pass elif type(e) == TypeError: pass else: self.log.info(p.pid, e) if __name__ == '__main__': my_targets = ["-xctestrun"] ProcessManager( logger=my_logger, targets=my_targets, duration=4 ).kill_dead_processes()