[蟒蛇菜譜] Python封裝shell命令

# -*- coding: utf-8 -*-
import os
import subprocess
import signal
import pwd
import sys

class MockLogger(object):
    '''模擬日誌類。方便單元測試。'''
    def __init__(self):
        self.info = self.error = self.critical = self.debug

    def debug(self, msg):
        print "LOGGER:"+msg

class Shell(object):
    '''完成Shell腳本的包裝。
    執行結果存放在Shell.ret_code, Shell.ret_info, Shell.err_info中
    run()爲普通調用,會等待shell命令返回。
    run_background()爲異步調用,會馬上返回,不等待shell命令完成
    異步調用時,能夠使用get_status()查詢狀態,或使用wait()進入阻塞狀態,
    等待shell執行完成。
    異步調用時,使用kill()強行中止腳本後,仍然須要使用wait()等待真正退出。
    TODO 未驗證Shell命令含有超大結果輸出時的狀況。
    '''
    def __init__(self, cmd):
        self.cmd = cmd  # cmd包括命令和參數
        self.ret_code = None
        self.ret_info = None
        self.err_info = None
        #使用時可替換爲具體的logger
        self.logger = MockLogger() 
        
    def run_background(self):
        '''以非阻塞方式執行shell命令(Popen的默認方式)。
        '''
        self.logger.debug("run %s"%self.cmd)
        # Popen在要執行的命令不存在時會拋出OSError異常,但shell=True後,
        # shell會處理命令不存在的錯誤,所以沒有了OSError異常,故不用處理
        self._process = subprocess.Popen(self.cmd, shell=True, 
                stdout=subprocess.PIPE, stderr=subprocess.PIPE) #非阻塞
        
    def run(self):
        '''以阻塞方式執行shell命令。
        '''
        self.run_background()
        self.wait()
        
    def run_cmd(self, cmd):
        '''直接執行某條命令。方便一個實例重複使用執行多條命令。
        '''
        self.cmd = cmd
        self.run()

    def wait(self):
        '''等待shell執行完成。
        '''
        self.logger.debug("waiting %s"%self.cmd)
        self.ret_info, self.err_info = self._process.communicate() #阻塞
        # returncode: A negative value -N indicates that the child was 
        # terminated by signal N
        self.ret_code = self._process.returncode
        self.logger.debug("waiting %s done. return code is %d"%(self.cmd, 
                            self.ret_code))

    def get_status(self):
        '''獲取腳本運行狀態(RUNNING|FINISHED)
        '''
        retcode = self._process.poll()
        if retcode == None:
            status = "RUNNING"
        else:
            status = "FINISHED"
        self.logger.debug("%s status is %s"%(self.cmd, status))
        return status

    # Python2.4的subprocess尚未send_signal,terminate,kill
    # 因此這裏要山寨一把,2.7可直接用self._process的kill() 
    def send_signal(self, sig):
        self.logger.debug("send signal %s to %s"%(sig, self.cmd))
        os.kill(self._process.pid, sig)

    def terminate(self):
        self.send_signal(signal.SIGTERM)

    def kill(self):
        self.send_signal(signal.SIGKILL)
    
    def print_result(self):
        print "return code:", self.ret_code
        print "return info:", self.ret_info
        print " error info:", self.err_info

class RemoteShell(Shell):
    '''遠程執行命令(ssh方式)。
    XXX 含特殊字符的命令可能致使調用失效,如雙引號,美圓號$
    NOTE 若cmd含有雙引號,可以使用RemoteShell2
    '''
    def __init__(self, cmd, ip):
        ssh = ("ssh -o PreferredAuthentications=publickey -o "
                "StrictHostKeyChecking=no -o ConnectTimeout=10")
        # 沒必要檢查IP有效性,也沒必要檢查信任關係,有問題shell會報錯
        cmd = '%s %s "%s"'%(ssh, ip, cmd)
        Shell.__init__(self, cmd)
        
class RemoteShell2(RemoteShell):
    '''與RemoteShell相同,只是變換了引號。
    '''
    def __init__(self, cmd, ip):
        RemoteShell.__init__(self, cmd, ip)
        self.cmd = "%s %s '%s'"%(ssh, ip, cmd)

class SuShell(Shell):
    '''切換用戶執行命令(su方式)。
    XXX 只適合使用root切換至其它用戶。
        由於其它切換用戶後須要輸入密碼,這樣程序會掛住。
    XXX 含特殊字符的命令可能致使調用失效,如雙引號,美圓號$
    NOTE 若cmd含有雙引號,可以使用SuShell2
    '''
    def __init__(self, cmd, user):
        if os.getuid() != 0: # 非root用戶直接報錯
            raise Exception('SuShell must be called by root user!')
        cmd = 'su - %s -c "%s"'%(user, cmd)
        Shell.__init__(self, cmd)

class SuShell2(SuShell):
    '''與SuShell相同,只是變換了引號。
    '''
    def __init__(self, cmd, user):
        SuShell.__init__(self, cmd, user)
        self.cmd = "su - %s -c '%s'"%(user, cmd)

class SuShellDeprecated(Shell):
    '''切換用戶執行命令(setuid方式)。
    執行的函數爲run2,而不是run
    XXX 以「不乾淨」的方式運行:僅切換用戶和組,環境變量信息不變。
    XXX 沒法獲取命令的ret_code, ret_info, err_info
    XXX 只適合使用root切換至其它用戶。
    '''
    def __init__(self, cmd, user):
        self.user = user
        Shell.__init__(self, cmd)

    def run2(self):
        if os.getuid() != 0: # 非root用戶直接報錯
            raise Exception('SuShell2 must be called by root user!')
        child_pid = os.fork()
        if child_pid == 0: # 子進程幹活
            uid, gid = pwd.getpwnam(self.user)[2:4]
            os.setgid(gid) # 必須先設置組
            os.setuid(uid)
            self.run()
            sys.exit(0) # 子進程退出,防止繼續執行其它代碼
        else: # 父進程等待子進程退出
            os.waitpid(child_pid, 0)
        
if __name__ == "__main__":
    '''test code'''
    # 1. test normal
    sa = Shell('who')
    sa.run()
    sa.print_result()
    
    # 2. test stderr
    sb = Shell('ls /export/dir_should_not_exists')
    sb.run()
    sb.print_result()
    
    # 3. test background
    sc = Shell('sleep 1')
    sc.run_background()
    print 'hello from parent process'
    print "return code:", sc.ret_code
    print "status:", sc.get_status()
    sc.wait()
    sc.print_result()
    
    # 4. test kill
    import time
    sd = Shell('sleep 2')
    sd.run_background()
    time.sleep(1)
    sd.kill()
    sd.wait() # NOTE, still need to wait
    sd.print_result()
    
    # 5. test multiple command and uncompleted command output
    se = Shell('pwd;sleep 1;pwd;pwd')
    se.run_background()
    time.sleep(1)
    se.kill()
    se.wait() # NOTE, still need to wait
    se.print_result()
    
    # 6. test wrong command
    sf = Shell('aaaaa')
    sf.run()
    sf.print_result()
    
    # 7. test instance reuse to run other command
    sf.cmd = 'echo aaaaa'
    sf.run()
    sf.print_result()
    
    sg = RemoteShell('pwd', '127.0.0.1')
    sg.run()
    sg.print_result()
    
    # unreachable ip
    sg2 = RemoteShell('pwd', '17.0.0.1')
    sg2.run()
    sg2.print_result()
    
    # invalid ip
    sg3 = RemoteShell('pwd', '1711.0.0.1')
    sg3.run()
    sg3.print_result()
    
    # ip without trust relation
    sg3 = RemoteShell('pwd', '10.145.132.247')
    sg3.run()
    sg3.print_result()
    
    
    sh = SuShell('pwd', 'ossuser')
    sh.run()
    sh.print_result()
    
    # wrong user
    si = SuShell('pwd', 'ossuser123')
    si.run()
    si.print_result()
    
    # user need password
    si = SuShell('pwd', 'root')
    si.run()
    si.print_result()
相關文章
相關標籤/搜索