遠程執行命令python
#用戶名密碼方式: import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) ssh.connect(hostname='192.168.18.204',port=22,username='root',password='123456') stdin,stdout,stderr=ssh.exec_command('df -h && ip a') result=stdout.read() print(result.decode()) ssh.close() #密鑰方式: import paramiko private_key = paramiko.RSAKey.from_private_key_file('id_rsa') ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) ssh.connect(hostname='192.168.18.204',port=22,username='root',pkey=private_key) stdin,stdout,stderr=ssh.exec_command('df -h && ip a') result=stdout.read() print(result.decode())
複製文件到服務端linux
#用戶名密碼方式: import paramiko transport = paramiko.Transport(('192.168.18.204',22)) #必須爲元組,不然端口不爲22時(不輸入時默認爲22),會一直鏈接不上. transport.connect(username='root',password='123456') sftp=paramiko.SFTPClient.from_transport(transport) sftp.put('./1.txt','/root/2.txt') #上傳文件 sftp.get('/root/2.txt','./3.txt') #下載文件 transport.close() #密鑰方式: import paramiko private_key = paramiko.RSAKey.from_private_key_file('id_rsa') transport=paramiko.Transport(('192.168.18.204',22)) transport.connect(username='root',pkey=private_key) sftp=paramiko.SFTPClient.from_transport(transport) sftp.get('/root/2.txt','./4.txt') sftp.put('3.txt','/root/4.txt') transport.close()
進程與線程的區別: 進程: 一個程序執行的實例,就是各類資源(內存頁,文件描述符,Open Socket )的集合。 線程:是操做系統的最小的調度單位,是一串指令的集合,建立進程時會自動建立一個線程。 線程共享內存空間,能夠互相訪問。進程的內存空間是獨立的,不能互相訪問。 子進程至關於克隆一遍父進程。 進程要操做CPU,必需要建立一個線程,線程自己沒法操做CPU。 進程快仍是線程快,沒有可比性,一個是資源的集合,另外一個是執行任務的。 線程之間的能夠直接交流,兩個進程想通訊,必須經過一箇中間代理。 線程是獨立的,不會像進程那樣,殺了父進程子進程也會死掉。 主線程和其餘線程是並行的。
普通的調用方式nginx
import time import threading def run(n): print('task',n) time.sleep(2) t1=threading.Thread(target=run,args=('t1',)) t2=threading.Thread(target=run,args=('t2',)) t1.start() t2.start()
繼承類的調用方式,而且計算全部線程的總耗時git
import time import threading class MyThread(threading.Thread): def __init__(self,n): super().__init__() self.n=n def run(self): #沒法接收參數 print('task',self.n) time.sleep(2) res=[] start_time=time.time() for i in range(50): t=MyThread('t-%s' %i) res.append(t) t.start() for i in res: i.join() #join會堵塞,等待線程結束。 print('總耗時: {}'.format(time.time()-start_time))
threading.RLock() 多重鎖,在同一線程中可用被屢次acquire。若是使用RLock,那麼acquire和release必須成對出現 def run1(): print("grab the first part data\n") lock.acquire() #若是使用的是threading.Lock(),此處就會卡住,由於一次只能一個鎖定,其他鎖請求,需等待鎖釋放後才能獲取。 global num num += 1 lock.release() return num def run3(): lock.acquire() res = run1() lock.release() print(res, ) if __name__ == '__main__': num= 0 lock = threading.RLock() for i in range(1): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: #線程數不等於1就繼續等待 print(threading.active_count()) else: print('----all threads done---') print(num)
信號量github
信號量,指定容許幾個線程同時運行,但不是等待幾個線程都結束了,才容許下一批線程容許,而是結束一個放進來一個。用於鏈接池一類。 import threading, time def run(n): semaphore.acquire() time.sleep(1) print('run the thread: %s\n' %n) semaphore.release() if __name__ == '__main__': semaphore = threading.BoundedSemaphore(3) #同一時間只容許3個線程同時存在 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count() != 1: pass else: print('----all threads done---')
注:python多線程 不適合cpu密集型,適合io密集型任務,由於python的線程不支持使用多核,可是io不佔用cpu,因此適合io密集型。
Python多進程,適合cpu密集型,由於進程可使用多核。web
import threading import time event=threading.Event() def Traffic_lights(): event.set() count=0 while True: if count >=5 and count <10: event.clear() elif count >=10: event.set() count = 0 count += 1 time.sleep(1) def car(): while True: if event.is_set(): print('\033[36;1m變綠燈了\033[0m\n') print('\033[36;1m寶馬車開始運行\033[0m\n') time.sleep(1) else: print('\033[31;1m變紅燈了\033[0m\n') print('\033[31;1m寶馬車中止運行\033[0m\n') event.wait() t1=threading.Thread(target=Traffic_lights) t1.start() c1=threading.Thread(target=car) c1.start()
優勢:解耦,提升效率
列表與隊列的區別:列表取出一個數據,數據還存在在列表中,隊列取出數據後則會刪除隊列中的數據。shell
線程Queue: import queue q=queue.Queue(maxsize=3) #maxsize能夠設置隊列長度,先進先出。 q=queue.LifoQueue(maxsize=3) #後進先出。 q=queue.ProiorityQueue(maxsize=3) #能夠設置存儲優先級,put時使用元組q.put((-1,'zyl')),優先級是從小到大。 q.put() #存放數據 q.qsize() #查看隊列大小 q.get() #取出數據,若是沒有數據會一直卡住, q.get_nowait() #取出數據,若是沒有數據會報一個異常,或者使用q.get(block=False) 進程Queue: 爲何子進程能夠訪問到,過程是:有兩個Queue,子進程向它的Queue,Put數據經過picket序列化,而後複製到父類Queue,同時刪除自身的數據,反之亦然。 from multiprocessing import Queue import multiprocessing def run(q2): q2.put('1') if __name__ == '__main__': q=Queue() P=multiprocessing.Process(target=run,args=(q,)) P.start() #其餘選項與線程Queue相同。
import threading import time import queue q=queue.Queue(maxsize=10) def producer(name): i=1 while True: q.put('汽車 {}'.format(i)) print('生產了汽車%s' %i) i+=1 def consumer(name): while True: print('{} 開走了{}'.format(name,q.get())) time.sleep(1) p1=threading.Thread(target=producer,args=('zyl',)) c1=threading.Thread(target=consumer,args=('wq',)) c2=threading.Thread(target=consumer,args=('syf',)) p1.start() c1.start() c2.start()
from multiprocessing import Process,Pipe def run(conn): conn.send([1,2]) print(conn.recv()) conn.close() if __name__ == '__main__': parent_conn,child_conn=Pipe() #生成兩個鏈接,將子鏈接傳給子進程。 P=Process(target=run,args=(child_conn,)) P.start() print(parent_conn.recv()) parent_conn.send([4,5]) P.join()
進程之間的數據共享。管道和Queue只是傳遞。windows
import os from multiprocessing import Process,Manager def run(d,l): d['b'] = '2' l.append(os.getpid()) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) #初始五個數字 p_list=[] for i in range(10): P=Process(target=run,args=(d,l)) P.start() p_list.append(P) for res in p_list: res.join() #等待全部進程執行完畢 print(d) print(l)
from multiprocessing import Process,Lock def run(l,i): l.acquire() try: print('hellow word',i) finally: l.release() if __name__ == '__main__': lock=Lock() for num in range(10): Process(target=run,args=(lock,num)).start()
from multiprocessing import Process,Pool import time import os def run(i): time.sleep(2) print('in process',os.getpid()) return i+100 def Bar(arg): print('==>exec done:',arg) if __name__ == '__main__': #windows上必須寫這句話 pool=Pool(processes=5) for num in range(15): pool.apply_async(func=run,args=(num,),callback=Bar) #callback,回調函數,進程執行完畢後,由主進程執行這個函數。 print('所有開啓') pool.close() #必需要先close在關閉。 pool.join()
#gevent,遇到io自動切換。 #gevent默認不知道urllib和socket會進行io操做。解決方法: from gevent import monkey #對全部進行i/o的操做打上一個標記 monkey.patch_all() 示例(一),下載網頁: from urllib import request import gevent,time from gevent import monkey monkey.patch_all() def f(url): print('下載網頁:',url) headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'} ##防止403錯誤 req=request.Request(url=url,headers=headers) data=request.urlopen(req).read() print('%d bytes received from %s.' %(len(data),url)) urls=[ 'https://pythonwheels.com/', 'https://www.yahoo.com/', 'https://github.com' ] start=time.time() for url in urls: f(url) print('同步cost',time.time()-start) async_start=time.time() gevent.joinall([ gevent.spawn(f,'https://pythonwheels.com/'), gevent.spawn(f,'https://www.yahoo.com/'), gevent.spawn(f,'https://github.com') ]) print('異步cost:',time.time()-async_start) 示例(二),經過gevent實現一個Socket: import sys import socket import time import gevent from gevent import socket, monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
1.事件驅動模型就是根據一個事件來作反應,相似於生產者消費者模型。 gevent就是使用了事件驅動模型,遇到I/O時註冊一個事件,而後系統執行I/O,在I/O操做完畢後回調一個事件告訴gevent,它以前註冊的事件執行完畢了。 2.緩存I/O,數據會先被拷貝到操做系統的內核緩衝區(內存),而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間(內存)。內核態就是內核空間到用戶空間。 缺點是:「數據在傳輸過程當中要在應用程序地址空間和內核進行屢次數據拷貝」,爲何這麼作由於用戶空間沒法操做系統,只能調用操做系統的接口,來完成此次操做。 3.堵塞I/O的意思是,客戶端繼承使用recvfrom調用kernel,來查看是否有數據,有的話返回數據,沒有的話就會一直等待它有數據。 4.非堵塞I/O的意思是,客戶端繼承使用recvfrom調用kernel,來查看是否有數據,有的話返回數據,沒有的話就會返回一個錯誤(error),而後客戶端再次調用kernel來查看是否有數據,有的話返回數據,沒有的話就會返回一個錯誤(error),陷入循環。因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。 5.在單個線程中,若是使用的是堵塞I/O是無法實現多路I/O。 6.在單個線程中,若是使用的是非堵塞I/O,是能夠實現多路I/O的。單線程下若是有100個鏈接,使用的是非堵塞模式的話,說不許那個數據先到,因此就循環收取。某個鏈接沒有數據是他會返回一個(error),不會等待,可是仍是會在從內核態複製數據到用戶態時間卡住。 7.I/O多路複用的特色是一個線程能夠同時等待多個文件描述符(socket),其中任意一個進入就緒狀態,select()函數就能夠返回。返回時並不會告訴進程是哪個鏈接有數據了,能夠經過select,pool,epool來查看。 8.異步I/O,用戶進程多個鏈接發起read以後,馬上就能夠幹別的事情。Kernel來幫你等待數據,而後將數據拷貝到用戶內存。返回時並不會告訴進程是哪個鏈接有數據了。 select poll epoll的區別,所有應用在I/O多路複用狀態。 1.select單進程打開的默承認以打開的文件數量是1024,調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。 2.poll和select的區別不大,取消了最大打開文件數量。 3.epool只有linux支持,與select的區別是在異步I/O中若是有一個鏈接活躍了,kernel會告訴進程是哪個鏈接活躍了,沒有最大鏈接限制。`水平觸發`就是數據在內核態已經準備完畢了,可是進程沒有調用read去取。數據會一直保留在內核態。下次再有數據,會再次告訴進程數據準備完畢了。`邊緣觸發`就是數據在內核態已經準備完畢了,可是進程沒有調用read去取。數據會一直保留在內核態。下次再有數據,不會再次告訴進程數據準備完畢了。 4.nginx實際上是I/O多路複用。 Python3裏的模塊asyncio支持異步i/o.
Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。緩存
示例:使用select(I/O多路複用)實現socketServer。 import select #底層作了封裝,能夠直接返回活動的鏈接 import socket import sys import queue server=socket.socket() server.setblocking(0) #設置爲非堵塞 server.bind(('localhost',9999)) server.listen(10) inputs=[server,] outputs=[] message_queues={} while True: readable,writeable,exeptional=select.select(inputs,outputs,inputs) #select()方法接收並監控3個通訊列表, 第一個是要監控哪些鏈接,剛開始時監控自身,第2個是監控和接收全部要返回給客戶端的data(outgoing data),第3個監控那些鏈接的錯誤信息, #readable 返回活動的鏈接 for s in readable: if s is server: #若是是server的話表明有新鏈接進來了 conn,client_addr=s.accept() inputs.append(conn) #將新鏈接添加到監控列表 message_queues[conn]=queue.Queue() else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了,客戶端的數據過來了,在這接收。 data=s.recv(1024) if data: print("收到來自[%s]的數據:"%s.getpeername()[0],data) message_queues[s].put(data) #將用戶數據存儲到Queue中 if s not in outputs: outputs.append(s) else: print('客戶端斷開了: ',s) if s in outputs: outputs.remove(s) inputs.remove(s) #writeable 存儲要返回給用戶數據的鏈接 for s in writeable: try: msg=message_queues[s].get_nowait() except queue.Empty: outputs.remove(s) else: s.send(msg) #exeptional 存儲出現錯誤的鏈接 for s in exeptional: print("handling exception for ",s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
此模塊根據系統的不一樣,會使用不一樣的方式,Linux優先使用epoll網絡
import selectors import socket sel=selectors.DefaultSelector() def accept(sock,mask): conn,addr=sock.accept() print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) ##新鏈接註冊read回調函數 def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data.upper()) # Hope it won't block else: print('closing', conn) sel.unregister(conn) #移除註冊的鏈接 conn.close() sock = socket.socket() sock.bind(('localhost', 10000)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() #默認阻塞,有活動鏈接就返回活動的鏈接列表 for key, mask in events: callback = key.data #callback至關於accept函數 callback(key.fileobj, mask) #key.fileobj是客戶端socket
簡單主機批量管理工具
(1). 主機分組
(2). 主機信息配置文件用configparser解析
(3). 可批量執行命令、發送文件,結果實時返回,執行格式以下
batch_run -h h1,h2,h3 -g web_clusters,db_servers -cmd "df -h"
batch_scp -h h1,h2,h3 -g web_clusters,db_servers -action put -local test.py -remote /tmp/
(4). 主機用戶名密碼、端口能夠不一樣
(5). 執行遠程命令使用paramiko模塊
(6). 批量命令需使用multiprocessing併發
配置文件
#這是主機管理系統的配置文件; #[],裏面包含組名,組管理着多臺主機 #[],下面是包含在組內的主機信息,主機信息格式: 主機名=['ip:port','user_name','passwd'] [TestColony] test01=['192.168.18.195:666','root','12345678'] test02=['192.168.18.187:7878','zyl','123456qwe'] [Python3] Python3=['192.168.109.88:22','root','qazwsxedc']
運行Py
import time import shlex import configparser import threading import paramiko import os instructions_dict={ 'batch_run':['-h','-g','-cmd','help'], 'batch_scp':['-h','-g','-action','-local','-remote','help'], } class HOST_MANAGE(object): def __init__(self): print('\033[36;1m歡迎使用批量管理主機系統\033[0m'.center(50, '*')) print('\033[36;1m下面是該系統的使用方法;\n(1)根據序號顯示羣組下的主機信息\ \n(2)命令名稱 help 獲取命令幫助;\033[0m') print('\033[36;1m羣組信息\033[0m'.center(50, '=')) def batch_run_help(self): ''' 顯示batch_run命令幫助信息 :return: ''' print('\033[36;1mbatch_run 遠程執行命令;\ \n-h:指定單獨的主機序號,使用逗號分割;\ \n-g:指定羣組命令會發送到該羣組下全部機器,使用逗號分割;\ \n-cmd:指定命令,使用引號將命令包裹;\033[0m') def batch_scp_help(self): ''' 顯示batch_scp命令幫助信息 :return: ''' print('\033[36;1mbatch_scp 文件的上傳與下載;\ \n-h:指定單獨的主機,使用逗號分割;\ \n-g:指定羣組命令會發送到該羣組下全部機器,使用逗號分割;\ \n-action:指定動做,put:複製本地文件到遠程機器,get;將遠程主機文件複製到本地;\ \n-local:本地路徑\ \n-remote:遠程路徑\033[0m') def batch_run(self,parameter,GroupInfo,*args,**kwargs): ''' 此方法用於解析參數,執行命令 :param parameter: 命令參數 :param GroupInfo: 主機信息 :param args: :param kwargs: :return: ''' host_info=set() args=[ i for i in parameter.keys()] if 'help' in args: self.batch_run_help() return True elif '-h' not in args: print('缺乏關鍵參數: -h') return False elif '-cmd' not in args: print('缺乏關鍵參數: -cmd') return False for i in parameter['-h'].split(','): if not i.isdigit() or int(i)-1 not in range(len(parameter)): print('-h 參數錯誤,沒有序號爲%s的主機' %i) return False else: i=int(i)-1 host_list_info=eval(GroupInfo[i][1]) host_info.add('{}:{}:{}'.format(host_list_info[0],host_list_info[1],host_list_info[2])) res=[] return_info=[] for i in host_info: ip, port, user, passwd, = [ i for i in i.split(':') ] t=threading.Thread(target=self.run_shell,args=(ip,port,user,passwd,parameter['-cmd'],return_info)) res.append(t) t.start() for j in res: j.join() for k,v in return_info: print('{}'.format(k).center(50,'=')) print(v.decode()) return True def batch_scp(self,parameter,GroupInfo,*args,**kwargs): ''' 此方法,用於解析參數,生成線程執行復制文件的操做 :param parameter: :param GroupInfo: :param args: :param kwargs: :return: ''' host_info=set() args=[ i for i in parameter.keys()] if 'help' in args: self.batch_scp_help() return True elif '-h' not in args: print('缺乏關鍵參數: -h') return False elif '-action' not in args: print('缺乏關鍵參數: -action') return False elif '-local' not in args: print('缺乏關鍵參數: -local') return False elif '-remote' not in args: print('缺乏關鍵參數: -remote') return False for i in parameter['-h'].split(','): if not i.isdigit() or int(i)-1 not in range(len(parameter)): print('-h 參數錯誤,沒有序號爲%s的主機' %i) return False else: i=int(i)-1 host_list_info=eval(GroupInfo[i][1]) host_info.add('{}:{}:{}:{}:{}:{}'.format(host_list_info[0],host_list_info[1],host_list_info[2],parameter['-local'],parameter['-remote'],parameter['-action'])) print(host_info) res=[] for i in host_info: ip,port,user,passwd,local,remote,action,=[ i for i in i.split(':') ] if action == 'put': if os.path.isfile(local): local=os.path.abspath(local) else: print('本地沒有此文件: ',local) return False elif action != 'get': print('\033[31;1m -action 參數值錯誤,請從新輸入!\033[0m') return False t=threading.Thread(target=self.scp_file,args=(ip,port,user,passwd,local,remote,action)) res.append(t) t.start() for j in res: j.join() return True def scp_file(self,*args,**kwargs): ''' 此方法用於複製文件到遠程主機,args接收了ip,port等信息。 :param args: :param kwargs: :return: ''' ip, port, user, passwd, local, remote,action=[ i for i in args[:] ] try: transport = paramiko.Transport((ip,int(port))) transport.connect(username=user,password=passwd) sftp=paramiko.SFTPClient.from_transport(transport) if action == 'put': sftp.put(local,remote) else: sftp.get(remote,local) print('{}:傳輸完畢'.format(ip).center(50, '=')) transport.close() except Exception as e: print('\033[31;1m複製文件失敗,如下是錯誤信息\033[0m') print('錯誤信息: {}'.format(e)) return False else: return True def run_shell(self,*args, **kwargs): ip,port,user,passwd,cmd,return_info=[ i for i in args[:] ] ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) try: ssh.connect(hostname=ip, port=int(port), username=user, password=passwd) stdin,stdout,stderr=ssh.exec_command(cmd) if not stderr: return_info.append((ip, stderr.read())) else: return_info.append((ip, stdout.read())) ssh.close() except Exception as e: print('\033[31;1m執行命令失敗,如下是錯誤信息\033[0m') print('錯誤信息: {}'.format(e)) return False else: return True def Analytical_command(self,instruction,*args,**kwargs): ''' 用於解析命令的方法,返回命令和參數 :param instruction: :param args: :param kwargs: :return: ''' command = instruction.split(' ')[0] parameter = {key:value for key, value in zip(shlex.split(instruction)[1::2], \ shlex.split(instruction)[2::2] if len(shlex.split(instruction)[2::2]) != 0 else (None,))} err_arg = [] if command in instructions_dict: for arg in parameter.keys(): if arg not in instructions_dict[command]: err_arg.append(arg) if len(err_arg) == 0: return command,parameter else: print('\033[31;1m沒有此參數: {};請使用{} help獲取幫助\033[0m'.format(err_arg,command)) return False elif command == 'q': return command else: print('\033[31;1m沒有{}命令\033[0m'.format(command)) return False def print_GroupInfo(self): ''' 此方法用於打印配置文件中的,羣組信息,主機信息,返回組名和主機信息 :return: ''' config = configparser.ConfigParser() config.read('../conf/Batch_host.conf',encoding='utf-8') GroupInfo=config.sections() while True: for k,v in enumerate(GroupInfo,start=1): print('\033[35;1m({}).{}\033[0m'.format(k,v)) select_group=input('>>: ') if select_group.isdigit() and int(select_group) >=1 and int(select_group) <= len(GroupInfo): HostInfo=config.items(GroupInfo[int(select_group)-1]) print('\033[36;1m主機信息\033[0m'.center(50, '=')) for k,v in enumerate(HostInfo,start=2): print('\033[34;1m({}).{}: {}\033[0m'.format(k,v[0],eval(v[1])[0])) return GroupInfo[int(select_group)-1],HostInfo elif select_group == 'q': exit() else: print('\033[31;1m沒有此羣組!\033[0m') continue manage=HOST_MANAGE() while True: GroupInfo=manage.print_GroupInfo() while True: instruction = input('[%s]>>: ' %GroupInfo[0]).strip() result=manage.Analytical_command(instruction) if type(result) == tuple: getattr(manage,result[0])(result[1],GroupInfo[1]) elif result == 'q': break else: continue