# -*- coding: utf-8 -*- import os import subprocess import signal import pwd import sys class MockLogger(object): '''模擬日誌類。方便單元測試。''' def __init__(self): self.info = self.error = self.critical = self.debug def debug(self, msg): print "LOGGER:"+msg class Shell(object): '''完成Shell腳本的包裝。 執行結果存放在Shell.ret_code, Shell.ret_info, Shell.err_info中 run()爲普通調用,會等待shell命令返回。 run_background()爲異步調用,會馬上返回,不等待shell命令完成 異步調用時,能夠使用get_status()查詢狀態,或使用wait()進入阻塞狀態, 等待shell執行完成。 異步調用時,使用kill()強行中止腳本後,仍然須要使用wait()等待真正退出。 TODO 未驗證Shell命令含有超大結果輸出時的狀況。 ''' def __init__(self, cmd): self.cmd = cmd # cmd包括命令和參數 self.ret_code = None self.ret_info = None self.err_info = None #使用時可替換爲具體的logger self.logger = MockLogger() def run_background(self): '''以非阻塞方式執行shell命令(Popen的默認方式)。 ''' self.logger.debug("run %s"%self.cmd) # Popen在要執行的命令不存在時會拋出OSError異常,但shell=True後, # shell會處理命令不存在的錯誤,所以沒有了OSError異常,故不用處理 self._process = subprocess.Popen(self.cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) #非阻塞 def run(self): '''以阻塞方式執行shell命令。 ''' self.run_background() self.wait() def run_cmd(self, cmd): '''直接執行某條命令。方便一個實例重複使用執行多條命令。 ''' self.cmd = cmd self.run() def wait(self): '''等待shell執行完成。 ''' self.logger.debug("waiting %s"%self.cmd) self.ret_info, self.err_info = self._process.communicate() #阻塞 # returncode: A negative value -N indicates that the child was # terminated by signal N self.ret_code = self._process.returncode self.logger.debug("waiting %s done. return code is %d"%(self.cmd, self.ret_code)) def get_status(self): '''獲取腳本運行狀態(RUNNING|FINISHED) ''' retcode = self._process.poll() if retcode == None: status = "RUNNING" else: status = "FINISHED" self.logger.debug("%s status is %s"%(self.cmd, status)) return status # Python2.4的subprocess尚未send_signal,terminate,kill # 因此這裏要山寨一把,2.7可直接用self._process的kill() def send_signal(self, sig): self.logger.debug("send signal %s to %s"%(sig, self.cmd)) os.kill(self._process.pid, sig) def terminate(self): self.send_signal(signal.SIGTERM) def kill(self): self.send_signal(signal.SIGKILL) def print_result(self): print "return code:", self.ret_code print "return info:", self.ret_info print " error info:", self.err_info class RemoteShell(Shell): '''遠程執行命令(ssh方式)。 XXX 含特殊字符的命令可能致使調用失效,如雙引號,美圓號$ NOTE 若cmd含有雙引號,可以使用RemoteShell2 ''' def __init__(self, cmd, ip): ssh = ("ssh -o PreferredAuthentications=publickey -o " "StrictHostKeyChecking=no -o ConnectTimeout=10") # 沒必要檢查IP有效性,也沒必要檢查信任關係,有問題shell會報錯 cmd = '%s %s "%s"'%(ssh, ip, cmd) Shell.__init__(self, cmd) class RemoteShell2(RemoteShell): '''與RemoteShell相同,只是變換了引號。 ''' def __init__(self, cmd, ip): RemoteShell.__init__(self, cmd, ip) self.cmd = "%s %s '%s'"%(ssh, ip, cmd) class SuShell(Shell): '''切換用戶執行命令(su方式)。 XXX 只適合使用root切換至其它用戶。 由於其它切換用戶後須要輸入密碼,這樣程序會掛住。 XXX 含特殊字符的命令可能致使調用失效,如雙引號,美圓號$ NOTE 若cmd含有雙引號,可以使用SuShell2 ''' def __init__(self, cmd, user): if os.getuid() != 0: # 非root用戶直接報錯 raise Exception('SuShell must be called by root user!') cmd = 'su - %s -c "%s"'%(user, cmd) Shell.__init__(self, cmd) class SuShell2(SuShell): '''與SuShell相同,只是變換了引號。 ''' def __init__(self, cmd, user): SuShell.__init__(self, cmd, user) self.cmd = "su - %s -c '%s'"%(user, cmd) class SuShellDeprecated(Shell): '''切換用戶執行命令(setuid方式)。 執行的函數爲run2,而不是run XXX 以「不乾淨」的方式運行:僅切換用戶和組,環境變量信息不變。 XXX 沒法獲取命令的ret_code, ret_info, err_info XXX 只適合使用root切換至其它用戶。 ''' def __init__(self, cmd, user): self.user = user Shell.__init__(self, cmd) def run2(self): if os.getuid() != 0: # 非root用戶直接報錯 raise Exception('SuShell2 must be called by root user!') child_pid = os.fork() if child_pid == 0: # 子進程幹活 uid, gid = pwd.getpwnam(self.user)[2:4] os.setgid(gid) # 必須先設置組 os.setuid(uid) self.run() sys.exit(0) # 子進程退出,防止繼續執行其它代碼 else: # 父進程等待子進程退出 os.waitpid(child_pid, 0) if __name__ == "__main__": '''test code''' # 1. test normal sa = Shell('who') sa.run() sa.print_result() # 2. test stderr sb = Shell('ls /export/dir_should_not_exists') sb.run() sb.print_result() # 3. test background sc = Shell('sleep 1') sc.run_background() print 'hello from parent process' print "return code:", sc.ret_code print "status:", sc.get_status() sc.wait() sc.print_result() # 4. test kill import time sd = Shell('sleep 2') sd.run_background() time.sleep(1) sd.kill() sd.wait() # NOTE, still need to wait sd.print_result() # 5. test multiple command and uncompleted command output se = Shell('pwd;sleep 1;pwd;pwd') se.run_background() time.sleep(1) se.kill() se.wait() # NOTE, still need to wait se.print_result() # 6. test wrong command sf = Shell('aaaaa') sf.run() sf.print_result() # 7. test instance reuse to run other command sf.cmd = 'echo aaaaa' sf.run() sf.print_result() sg = RemoteShell('pwd', '127.0.0.1') sg.run() sg.print_result() # unreachable ip sg2 = RemoteShell('pwd', '17.0.0.1') sg2.run() sg2.print_result() # invalid ip sg3 = RemoteShell('pwd', '1711.0.0.1') sg3.run() sg3.print_result() # ip without trust relation sg3 = RemoteShell('pwd', '10.145.132.247') sg3.run() sg3.print_result() sh = SuShell('pwd', 'ossuser') sh.run() sh.print_result() # wrong user si = SuShell('pwd', 'ossuser123') si.run() si.print_result() # user need password si = SuShell('pwd', 'root') si.run() si.print_result()