paramiko模塊,基於SSH用於鏈接遠程服務器並執行相關操做shell
class SSHConnection(object): def __init__(self, host_dict): self.host = host_dict['host'] self.port = host_dict['port'] self.username = host_dict['username'] self.pwd = host_dict['pwd'] self.__k = None def connect(self): transport = paramiko.Transport((self.host,self.port)) transport.connect(username=self.username,password=self.pwd) self.__transport = transport def close(self): self.__transport.close() def run_cmd(self, command): """ 執行shell命令,返回字典 return {'color': 'red','res':error}或 return {'color': 'green', 'res':res} :param command: :return: """ ssh = paramiko.SSHClient() ssh._transport = self.__transport # 執行命令 stdin, stdout, stderr = ssh.exec_command(command) # 獲取命令結果 res = unicode_utils.to_str(stdout.read()) # 獲取錯誤信息 error = unicode_utils.to_str(stderr.read()) # 若是有錯誤信息,返回error # 不然返回res if error.strip(): return {'color':'red','res':error} else: return {'color': 'green', 'res':res} def upload(self,local_path, target_path): # 鏈接,上傳 sftp = paramiko.SFTPClient.from_transport(self.__transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put(local_path, target_path, confirm=True) # print(os.stat(local_path).st_mode) # 增長權限 # sftp.chmod(target_path, os.stat(local_path).st_mode) sftp.chmod(target_path, 0o755) # 注意這裏的權限是八進制的,八進制須要使用0o做爲前綴 def download(self,target_path, local_path): # 鏈接,下載 sftp = paramiko.SFTPClient.from_transport(self.__transport) # 將location.py 下載至服務器 /tmp/test.py sftp.get(target_path, local_path) # 銷燬 def __del__(self): self.close() #unicode_utils.py def to_str(bytes_or_str): """ 把byte類型轉換爲str :param bytes_or_str: :return: """ if isinstance(bytes_or_str, bytes): value = bytes_or_str.decode('utf-8') else: value = bytes_or_str return value