最壞的結果,不過是大器晚成;html
- 這種錯誤,根本過不了python解釋器的語法檢測,必須在程序運行前就修正;python
- 即邏輯錯誤,例如除零錯誤;linux
- 異常相關信息:異常的追蹤信息 + 異常類型 + 異常值程序員
- 異常種類web
1 ArithmeticError 2 AssertionError 3 AttributeError 4 BaseException 5 BufferError 6 BytesWarning 7 DeprecationWarning 8 EnvironmentError 9 EOFError 10 Exception 11 FloatingPointError 12 FutureWarning 13 GeneratorExit 14 ImportError 15 ImportWarning 16 IndentationError 17 IndexError 18 IOError 19 KeyboardInterrupt 20 KeyError 21 LookupError 22 MemoryError 23 NameError 24 NotImplementedError 25 OSError 26 OverflowError 27 PendingDeprecationWarning 28 ReferenceError 29 RuntimeError 30 RuntimeWarning 31 StandardError 32 StopIteration 33 SyntaxError 34 SyntaxWarning 35 SystemError 36 SystemExit 37 TabError 38 TypeError 39 UnboundLocalError 40 UnicodeDecodeError 41 UnicodeEncodeError 42 UnicodeError 43 UnicodeTranslateError 44 UnicodeWarning 45 UserWarning 46 ValueError 47 Warning 48 ZeroDivisionError
- 異常示例:算法
# IndexError: list index out of range list1 = [1,2] list1[7] # KeyError: 'k9' dict1 = { 'k1':'v1', 'k2':'v2', } dict1['k9'] # ValueError: invalid literal for int() with base 10: 'standby' str = 'standby' int(str) ...
- try...except... (能夠寫多個except)shell
- 從上向下匹配,匹配到了就再也不匹配下面的except,相似於 iptables編程
try: # msg=input('>>:') # int(msg) #ValueError print(x) #NameError d={'a':1} d['b'] #KeyError l=[1,2] l[10] #IndexError 1+'asdfsadfasdf' #TypeError except ValueError as e: print('ValueError: %s' % e) except NameError as e: print('NameError: %s' % e) except KeyError as e: print(e) print('=============>')
- 萬能異常 Exceptionjson
try: # d={'a':1} # d['b'] #KeyError l=[1,2] l[10] #IndexError 1+'asdfsadfasdf' #TypeError except Exception as e: print('捕獲到異常,異常類型:%s,異常值:%s' % (type(e),e)) print('=============>')
- try...except...else...finally... (finally 裏面主要是作一些清理工做)windows
try: # 1+'asdfsadfasdf' #TypeError print('aaaaaa') except Exception as e: print('捕獲到異常,異常類型:%s,異常值:%s' % (type(e), e)) else: print('沒有異常時發生會執行') finally: print('有沒有異常都會執行')
- 主動拋出異常
try: raise TypeError('類型錯誤') except Exception as e: print(e)
- 自定義異常類型
class standbyException(BaseException): def __init__(self,msg): self.msg=msg def __str__(self): return self.msg try: raise standbyException('--->> 自定義異常類型') except standbyException as e: print(e)
- 斷言 assert
# 待補充...
- 0.參考:操做系統簡介
- 1.精簡的說的話,操做系統就是一個協調、管理和控制計算機硬件資源和軟件資源的控制程序;
- 2.操做系統管理硬件,提供系統調用(接口,比方說:文件);對資源請求進行有序調度處理;
- 3.操做系統處在用戶應用程序和計算機硬件之間,本質也是一個軟件
- 4.操做系統由操做系統的內核(運行於內核態,管理硬件資源)以及系統調用(運行於用戶態,爲應用程序員寫的應用程序提供系統調用接口)兩部分組成,因此,單純的說操做系統是運行於內核態的,是不許確的。
# 1.隔離複雜度,提供簡單易用的接口 隱藏了醜陋的硬件調用接口,爲應用程序員提供調用硬件資源的更好,更簡單,更清晰的模型(系統調用接口)。 應用程序員有了這些接口後,就不用再考慮操做硬件的細節,專心開發本身的應用程序便可。 好比,磁盤資源的抽象是文件系統(C盤,D盤,E盤...下的目錄及文件), 有了文件的概念,咱們直接打開文件,讀或者寫就能夠了, 無需關心記錄是否應該使用修正的調頻記錄方式,以及當前電機的狀態等細節
# 2.將應用程序對硬件資源的競態請求變得有序化 例如:不少應用軟件實際上是共享一套計算機硬件, 比方說有可能有三個應用程序同時須要申請打印機來輸出內容, 那麼a程序競爭到了打印機資源就打印, 而後多是b競爭到打印機資源,也多是c,這就致使了無序; 打印機可能打印一段a的內容而後又去打印c..., 操做系統的一個功能就是將這種無序變得有序;
- 1.批處理
- 把一堆人的輸入攢成一大波輸入;把一堆人的輸出攢成一大波輸出;節省了機時;
- 2.多道程序設計
- 空間上的複用
- 將內存分爲幾部分,每一個部分放入一個程序,這樣,同一時間內存中就有了多道程序;
空間上的複用最大的問題是: 程序之間的內存必須分割,這種分割須要在硬件層面實現,由操做系統控制。 若是內存彼此不分割,則一個程序能夠訪問另一個程序的內存, 1.首先喪失的是安全性: 好比你的qq程序能夠訪問操做系統的內存,這意味着你的qq能夠拿到操做系統的全部權限。 2.其次喪失的是穩定性: 某個程序崩潰時有可能把別的程序的內存也給回收了, 比方說把操做系統的內存給回收了,則操做系統崩潰。
- 時間上的複用
- 快速的上下文切換
當一個資源在時間上覆用時,不一樣的程序或用戶輪流使用它,第一個程序獲取該資源使用結束後,在輪到第二個。。。第三個。。。 例如:只有一個cpu,多個程序須要在該cpu上運行,操做系統先把cpu分給第一個程序; 在這個程序運行的足夠長的時間(時間長短由操做系統的算法說了算)或者遇到了I/O阻塞,操做系統則把cpu分配給下一個程序; 以此類推,直到第一個程序從新被分配到了cpu而後再次運行,因爲cpu的切換速度很快,給用戶的感受就是這些程序是同時運行的,或者說是併發的,或者說是僞並行的。 至於資源如何實現時間複用,或者說誰應該是下一個要運行的程序,以及一個任務須要運行多長時間,這些都是操做系統的工做。 當一個程序在等待I/O時,另外一個程序可使用cpu,若是內存中能夠同時存放足夠多的做業,則cpu的利用率能夠接近100%;
- 3.分時操做系統
把處理機的運行時間分紅很短的時間片,按時間片輪流把處理機分配給各聯機做業使用; 分時操做系統,在多個程序之間切換,按時間片切換; 第三代計算機普遍採用了必須的保護硬件(程序之間的內存彼此隔離)以後,分時系統纔開始流行;
1 操做系統的做用: 2 1.把硬件醜陋複雜的接口隱藏起來,給應用程序提供簡單易用的接口 3 2.管理、調度進程,而且把進程之間對硬件的競爭變得有序化 4 5 6 多道技術: 7 1.產生背景:爲了實現單CPU下的併發效果 8 2.分爲兩部分: 9 1.空間上的複用(內存裏放入多個程序,必須實現硬件層面的隔離) 10 2.時間上的複用(複用的是CPU的時間片;快速切換) 11 什麼狀況下進行切換? 12 1.正在執行的任務遇到的阻塞(例如I/O) 13 2.正在執行的任務運行時間過長 14 15 進程:正在運行的一個過程/任務 16 由操做系統負責調度,而後由CPU負責執行 17 18 19 併發:僞並行,單核+多道實現 20 並行:只有多核才能實現真正的並行 21 22 23 同步:打電話 24 異步:發短信 25 26 27 進程的建立: 28 1.系統初始化的時候 29 2.與用戶交互:雙擊一個EXE 30 3.在執行一個進程的過程中,調用了Popen/os.fork 31 4.批處理任務 32 33 34 系統調用: 35 Linux:fork 36 Windows:CreateProcess 37 38 39 Linux與Windows下的進程的區別: 40 1.linux下的進程有父子關係,Windows下的進程沒有這個關係; 41 2.Linux下建立一個新的進程,須要拷貝父進程的地址空間;Windows下從最開始建立進程,兩個進程之間就是不同的; 42 43 44 進程的三種狀態: 45 1.就緒 46 2.運行 47 3.阻塞 48 49 50 進程間快速切換,前提是在切換以前須要保存當前進程當前的狀態 51 yield 就有這種操做系統級別的 保存狀態 的功能
- 進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu;起源於操做系統,是操做系統最核心的概念;
- 程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程;
- 同一個程序執行兩次,那也是兩個進程,好比打開暴風影音,雖然都是同一個軟件,可是一個能夠播放西遊記,一個能夠播放天龍八部;
An executing instance of a program is called a process. Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads. 程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。 程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。 在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行。這是這樣的設計,大大提升了CPU的利用率。 進程的出現讓每一個用戶感受到本身獨享CPU,所以,進程就是爲了在CPU上實現多道編程而提出的。
- 1.併發:是僞並行,即看起來是同時運行。單個cpu+多道技術就能夠實現併發,(並行也屬於併發);
- 2.並行:同時運行,只有具有多個cpu才能實現真正意義上的並行;
- 單核下,能夠利用多道技術;多個核,每一個核也均可以利用多道技術(多道技術是針對單核而言的)
有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4; 一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術; 而一旦任務1的I/O結束了,操做系統會從新調用它; (需知進程的調度、分配給哪一個cpu運行,由操做系統說了算),可能被分配給四個cpu中的任意一個去執行
多道技術: 內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另一個, 使每一個進程各自運行幾十或幾百毫秒; 這樣,雖然在某一個瞬間,一個cpu只能執行一個任務, 但在1秒內,cpu卻能夠運行多個進程,這就給人產生了並行的錯覺,即僞併發, 以此來區分多處理器操做系統的真正硬件並行(多個cpu共享同一個物理內存)
同步就是指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息, 那麼這個進程將會一直等待下去,直到收到返回信息才繼續執行下去; 異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。 當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。 同步:打電話
異步:發短息、MySQL主從複製
python中的多線程沒法利用多核優點; 若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。 Python提供了很是好用的多進程包multiprocessing。 1.multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。 2.multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock、Pool等組件。 須要再次強調的一點是: 與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內;
- 1.參數介紹
Process([group [, target [, name [, args [, kwargs]]]]]) 由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號 group 參數未使用,值始終爲None target 表示調用對象,即子進程要執行的任務 args 表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs 表示調用對象的字典,kwargs={'name':'egon','age':18} name 爲子進程的名稱
- 2.方法介紹
p.start() 啓動進程,並調用該子進程中的p.run() p.run() 進程啓動時運行的方法,正是它去調用target指定的函數, 咱們自定義類的類中必定要實現該方法 p.terminate() 強制終止進程p,不會進行任何清理操做, 若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。 若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 p.is_alive() 若是p仍然運行,返回True p.join([timeout]) hold住的是主進程 主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。 timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程, 而不能join住run開啓的進程
- 3.屬性介紹
p.daemon 默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止; 而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置; p.name:進程的名稱 p.pid:進程的pid p.exitcode 進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) p.authkey 進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。 這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性, 這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
- 4.注意
注意:在windows中Process()必須放到# if __name__ == '__main__':下 Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的緣由, 使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。
1. 在UNIX中該系統調用是:fork; fork會建立一個與父進程如出一轍的副本, 兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件; (在shell解釋器進程中,執行一個命令就會建立一個子進程) 2. 在windows中該系統調用是:CreateProcess, CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。
關於建立的子進程,UNIX和windows 1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離), 任何一個進程的在其地址空間中的修改都不會影響到另一個進程; 2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本, 提示:子進程和父進程是能夠有隻讀的共享內存區的。 可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。
- 建立進程示例1:
# Windows上調用Process,可執行代碼必定要放到 __main__ 裏 from multiprocessing import Process import time,random def func(name): print('%s is running...' % name) # time.sleep(random.randint(1,3)) time.sleep(1) print('%s run end.' % name) if __name__ == '__main__': # p1 = Process(target=func,args=('standby',)) p1 = Process(target=func,args=('standby',),name='sub-P1') # 指定進程的名字 p1.start() print(p1.name) print('Parent running') time.sleep(1) print('Parent run end') --- sub-P1 Parent running standby is running... Parent run end standby run end.
- 建立進程示例2:使用繼承的方式,必須在類中定義一個run()方法
# 繼承Process類 from multiprocessing import Process import time, random class Foo(Process): def __init__(self, name): super().__init__() # 調用父類Process的init方法進行初始化 self.name = name def run(self): # 必須定義run()方法 print('%s is running...' % self.name) # time.sleep(random.randint(1,3)) time.sleep(1) print('%s run end.' % self.name) if __name__ == '__main__': p1 = Foo('standby') p1.start() # start() 會自動調用 run() print('Parent running...') time.sleep(1) print('Parent run end.') --- Parent running... standby is running... Parent run end. standby run end.
就緒狀態:進程已經準備好,已分配到所需資源,只要分配到CPU就可以當即運行;
若是進程運行時間片使用完也會進入就緒狀態; 執行狀態:進程處於就緒狀態被調度後,進程進入執行狀態; 阻塞狀態:正在執行的進程因爲某些事件(I/O請求,申請緩存區失敗)而暫時沒法運行,進程受到阻塞; 在知足請求時進入就緒狀態等待系統調用;
其實在兩種狀況下會致使一個進程在邏輯上不能運行: 1. 進程掛起是自身緣由,遇到I/O阻塞,便要讓出CPU讓其餘進程去執行,這樣保證CPU一直在工做 2. 與進程無關,是操做系統層面,可能會由於一個進程佔用時間過多,或者優先級等緣由,而調用其餘的進程去使用CPU。
- 1.daemon=True與join()
p.daemon=True 以後,若是在子進程p內再建立子進程就會報錯: 'daemonic processes are not allowed to have children'
# 示例1:沒有join也不設置daemon from multiprocessing import Process def say(name): print('%s say hello.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) p.start() # 建立新的進程須要時間,因此主線程先打印;但子進程也會打印,由於主線程須要等待子進程執行完畢,才結束,避免出現殭屍進程(沒有父進程的子進程) print('這是主線程') ---結果--- 這是主線程 standby say hello. ===================> # 示例2:設置daemon=True,沒有join from multiprocessing import Process def say(name): print('%s say hello.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) p.daemon = True #必定要在p.start()前設置;設置p爲守護進程,禁止p建立子進程;而且父進程結束,p跟着一塊兒結束 p.start() print('這是主線程') ---結果--- 這是主線程 ===================> # 示例3:有join,沒有設置daemon=True from multiprocessing import Process def say(name): print('%s say hello.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) p.start() p.join() # 阻塞了主線程,使得子進程先執行完畢,主線程才繼續往下執行 print('這是主線程') ---結果--- standby say hello. 這是主線程 ===================> # 示例4:設置daemon=True,有join from multiprocessing import Process def say(name): print('%s say hello.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) p.daemon = True #把p設置成守護進程 p.start() p.join() print('這是主線程') ---結果--- standby say hello. 這是主線程 ===================> ===================> ===================> # 示例5:子進程爲何會不打印? from multiprocessing import Process import time def say(name): print('%s say hello.' % name) time.sleep(2) print('%s say bye.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) p.daemon = True #把p設置成守護進程 p.start() p.join(0.001) # 主線程等待p的結束,等0.0001秒就再也不等了; print('這是主線程') ---結果--- 這是主線程 ===================> # 示例6:把主線程阻塞的時間改大一點,改成0.1s from multiprocessing import Process import time def say(name): print('%s say hello.' % name) time.sleep(2) print('%s say bye.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) p.daemon = True #把p設置成守護進程 p.start() p.join(0.1) print('這是主線程') ---結果--- standby say hello. 這是主線程 ===================> # 示例7:把主線程阻塞的時間改大一點,改成2s from multiprocessing import Process import time def say(name): print('%s say hello.' % name) time.sleep(2) print('%s say bye.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) p.daemon = True #把p設置成守護進程 p.start() p.join(2) print('這是主線程') ---結果--- standby say hello. 這是主線程 ===================> # 示例8:把主線程阻塞的時間改大一點,改成2.1s from multiprocessing import Process import time def say(name): print('%s say hello.' % name) time.sleep(2) print('%s say bye.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) p.daemon = True #把p設置成守護進程 p.start() p.join(2.1) print('這是主線程') ---結果--- standby say hello. standby say bye. 這是主線程
- 2.terminate() 與 is_alive()
# 示例1:查看terminate以後的狀態 from multiprocessing import Process def say(name): print('%s say hello.' % name) print('%s say bye.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) print(p.is_alive()) p.start() p.terminate() print(p.is_alive()) print('這是主線程') print(p.is_alive()) ---結果--- False True 這是主線程 True ========================> # 示例2:在主線程中sleep一下 from multiprocessing import Process import time def say(name): print('%s say hello.' % name) print('%s say bye.' % name) if __name__ == '__main__': p = Process(target=say,args=('standby',)) print(p.is_alive()) p.start() p.terminate() # 關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活; print(p.is_alive()) print('這是主線程') time.sleep(0.01) print(p.is_alive()) ---結果--- False True 這是主線程 False
- 3.name 和 pid
from multiprocessing import Process import os def say(): print('Say hello,子進程id:%s' % os.getpid()) if __name__ == '__main__': p = Process(target=say) p.start() print('子進程的名字是:%s,子進程id:%s' % (p.name,p.pid)) print('這是主線程,主線程id:%s' % os.getpid()) ---結果--- 子進程的名字是:Process-1,子進程id:1612 這是主線程,主線程id:6004 Say hello,子進程id:1612
- 爲何要有線程?
有了進程爲何還要線程? 進程有不少優勢,它提供了多道編程, 讓咱們感受咱們每一個人都擁有本身的CPU和其餘資源,能夠提升計算機的利用率。 不少人就不理解了,既然進程這麼優秀,爲何還要線程呢? 其實,仔細觀察就會發現進程仍是有不少缺陷的,主要體如今兩點上: 1.進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。 2.進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起; 即便進程中有些工做不依賴於輸入的數據,也將沒法執行。
- 線程示例:
進程與進程之間的資源是隔離的; 一個進程裏的多個線程共享進程的資源; 例子:編譯一個文檔,有三個功能,接收用戶輸入+格式化+按期保存 1.用三個進程實現;但進程間數據是隔離的,這樣就須要維護三份資源數據; 2.用1個進程掛三個線程實現,三個線程共享一份資源;
- 線程是什麼?
- 進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位;
- 線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位;
- 一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務;
- 進程之間是競爭關係,線程之間是協做關係;
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions. Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers. If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were. Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU. On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers. Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads. Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).
- 線程和進程的區別
# 1 Threads share the address space of the process that created it; Processes have their own address space. # 2 Threads have direct access to the data segment of its process; Processes have their own copy of the data segment of the parent process. # 3 Threads can directly communicate with other threads of its process; Processes must use interprocess communication to communicate with sibling processes. # 4 New threads are easily created; New processes require duplication of the parent process. # 5 Threads can exercise considerable control over threads of the same process; Processes can only exercise control over child processes. # 6 Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; Changes to the parent process does not affect child processes.
- 建立進程的開銷要遠大於建立線程的開銷;
# 建立 500 個線程 import time from threading import Thread def work(): a = 99999 b = 101001010010101010 str1 = 'axaxxchaxchnahxalx' str2 = 'axaxxcedw2312haxchnahxalx' str3 = '121212axaxxchaxchnahxalx' dic = {'k1':'v1','k2':'v2'} if __name__ == '__main__': start_time = time.time() t_l = [] for i in range(500): t=Thread(target=work) t_l.append(t) t.start() for t in t_l: t.join() stop_time = time.time() print('Run time is %s' % (stop_time-start_time)) # Run time is 0.05900001525878906 # ++++++++++++++++++++++++++++++++++ # 建立 500 個進程 import time from multiprocessing import Process def work(): a = 99999 b = 101001010010101010 str1 = 'axaxxchaxchnahxalx' str2 = 'axaxxcedw2312haxchnahxalx' str3 = '121212axaxxchaxchnahxalx' dic = {'k1':'v1','k2':'v2'} if __name__ == '__main__': start_time = time.time() p_l = [] for i in range(500): p=Process(target=work) p_l.append(p) p.start() for p in p_l: p.join() stop_time = time.time() print('Run time is %s' % (stop_time-start_time)) # Run time is 19.552000045776367
關於線程和協程更多參見: ...
- 模擬建立多個子進程,未阻塞的狀況
from multiprocessing import Process import time,random def func(name): print('%s is running...' % name) time.sleep(random.randint(1,3)) print('%s run end.' % name) if __name__ == '__main__': # p1 = Process(target=func,args=('standby',)) p1 = Process(target=func,args=('進程1',),name='sub-P1') p2 = Process(target=func,args=('進程2',),name='sub-P2') p3 = Process(target=func,args=('進程3',),name='sub-P3') p4 = Process(target=func,args=('進程4',),name='sub-P4') sub_p_lits = [p1,p2,p3,p4] for p in sub_p_lits: p.start() print('Parent running') time.sleep(1) print('Parent run end') --- Parent running 進程2 is running... 進程4 is running... 進程3 is running... 進程1 is running... Parent run end 進程3 run end. 進程2 run end. 進程4 run end. 進程1 run end.
- 模擬建立多個子進程,join阻塞的狀況:
from multiprocessing import Process import time,random def func(name): print('%s is running...' % name) time.sleep(random.randint(1,3)) print('%s run end.' % name) if __name__ == '__main__': # p1 = Process(target=func,args=('standby',)) p1 = Process(target=func,args=('進程1',),name='sub-P1') p2 = Process(target=func,args=('進程2',),name='sub-P2') p3 = Process(target=func,args=('進程3',),name='sub-P3') p4 = Process(target=func,args=('進程4',),name='sub-P4') sub_p_lits = [p1,p2,p3,p4] for p in sub_p_lits: p.start() for p in sub_p_lits: p.join() print('Parent running') time.sleep(1) print('Parent run end') --- 進程2 is running... 進程1 is running... 進程3 is running... 進程4 is running... 進程2 run end. 進程3 run end. 進程4 run end. 進程1 run end. Parent running Parent run end
- socket + Process 實現併發處理多個客戶端鏈接
#!/usr/bin/python # -*- coding:utf-8 -*- # Server端 from multiprocessing import Process from socket import * server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,addr): while True: #通信循環 try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': while True: #連接循環 conn,addr=server.accept() print(addr) p=Process(target=talk,args=(conn,addr)) p.start()
#!/usr/bin/python # -*- coding:utf-8 -*- # Client端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
# 多進程實現socket存在的問題: 每來一個客戶端,都在服務端開啓一個進程; 若是併發來一個萬個客戶端,要開啓一萬個進程嗎? 你本身嘗試着在你本身的機器上開啓一萬個,10萬個進程試一試。(致使機器死機) 解決方法:進程池
Pool能夠提供指定數量的進程,供用戶調用; 當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求; 但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。 注意:是重用進程,而不是新建,Pool維護指定個數的進程,來循環執行不少的任務,進程的id都是不變的!
Pool([numprocess [,initializer [, initargs]]]):建立進程池 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值;
- 經常使用方法
參考:http://www.cnblogs.com/congbo/archive/2012/08/23/2652490.html
p.apply(func [, args [, kwargs]]) 進程池中的工做進程執行func(*args,**kwargs),而後返回結果; 是同步的; p.apply_async(func [, args [, kwargs]]) 與apply用法一致,但它是非阻塞的且支持結果返回後進行回調; 是異步的; 主進程循環運行過程當中不等待apply_async的返回結果; 在主進程結束後,即便子進程還未返回整個程序也會退出。 雖然 apply_async是非阻塞的,但其返回結果的get方法倒是阻塞的: 如使用result.get()會阻塞主進程。 p.close() 關閉進程池,即再也不像進程池中提交任務。若是全部操做持續掛起,它們將在工做進程終止前完成; P.join() 主進程阻塞等待子進程的退出, join方法要在close或terminate以後使用; p.terminate() 結束工做進程,再也不處理未處理的任務;
- apply(),同步
1.apply本質上就是apply_async().get() 2.apply_async().get()會返回結果,但同時也會阻塞,致使變爲串行;
- apply()示例
from multiprocessing import Pool import time def Foo(i): time.sleep(1) return i + 100 if __name__ == '__main__': start_time = time.time() pool = Pool(5) res_l = [] for i in range(5): res = pool.apply(func=Foo, args=(i,)) res_l.append(res) pool.close() for res in res_l: print(res) print('---> end') stop_time = time.time() print('Run time is: %s' % (stop_time-start_time)) ---結果--- 100 101 102 103 104 ---> end Run time is: 5.174000024795532
- apply_async(),異步非阻塞
若是使用異步提交的任務,主進程須要使用join,等待進程池內任務都處理完,而後能夠用get收集結果,
不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了;
調用join以前,必定要先調用close() 函數,不然會出錯; close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束;
- apply_async() 示例1:沒有後面的join() 和 get(),則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了;
# 在主線程內沒有使用join()的狀況 from multiprocessing import Pool import time def Foo(i): time.sleep(1) return i + 100 def Bar(arg): print('--->exec done:', arg) if __name__ == '__main__': start_time = time.time() pool = Pool(5) for i in range(5): pool.apply_async(func=Foo, args=(i,), callback=Bar) print('end') pool.close() stop_time = time.time() print('Run time is: %s' % (stop_time-start_time)) ---結果--- end Run time is: 0.08100008964538574
- apply_async() 示例2:主線程裏沒寫join(),可是使用get()獲取進程池中進程執行的結果
from multiprocessing import Pool import time def Foo(i): time.sleep(1) return i + 100 def Bar(arg): print('--->exec done:', arg) if __name__ == '__main__': start_time = time.time() pool = Pool(5) res_l = [] for i in range(5): res = pool.apply_async(func=Foo, args=(i,)) res_l.append(res) pool.close() # 關閉進程池,再也不向進程池中提交任務; for res in res_l: print(res.get()) # 使用get()獲取進程池中進程的執行結果 print('end') stop_time = time.time() print('Run time is: %s' % (stop_time-start_time)) ---結果--- 100 101 102 103 104 end Run time is: 1.2239999771118164
- apply_async() 示例3:使用join()但不使用get()
# 在主線程內使用 join() 的狀況 from multiprocessing import Pool import time def Foo(i): time.sleep(1) return i + 100 def Bar(arg): print('--->exec done:', arg) if __name__ == '__main__': start_time = time.time() pool = Pool(5) for i in range(5): pool.apply_async(func=Foo, args=(i,), callback=Bar) pool.close() # 關閉進程池,再也不向進程池中提交任務; pool.join() # 進程池中進程執行完畢後再關閉,若是註釋那麼程序直接關閉; print('end') stop_time = time.time() print('Run time is: %s' % (stop_time-start_time)) ---結果--- --->exec done: 100 --->exec done: 101 --->exec done: 102 --->exec done: 103 --->exec done: 104 end Run time is: 1.3329999446868896
- 進程池Pool改寫socket併發通訊,避免使用多進程的缺陷問題
# 進程池 Pool實現socket服務端 from multiprocessing import Pool from socket import * import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8090)) server.listen(5) def talk(conn,addr): print('子進程id:%s' % os.getpid()) while True: #通信循環 try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': print("cpu_count: %s" % os.cpu_count()) pool=Pool() while True: #連接循環 conn,addr=server.accept() print(addr) # pool.apply(talk,args=(conn,addr)) pool.apply_async(talk,args=(conn,addr))
# socket客戶端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8090)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
cpu_count: 4 <--- Pool()默認取這個值 ('127.0.0.1', 4944) 子進程id:2872 <---進程池中第一個進程的id ('127.0.0.1', 4945) 子進程id:1076 <---進程池中第二個進程的id ('127.0.0.1', 4948) 子進程id:5544 <---進程池中第三個進程的id ('127.0.0.1', 4951) 子進程id:5500 <---進程池中第四個進程的id ('127.0.0.1', 4952) ('127.0.0.1', 4953) 子進程id:2872 <=== 後面新來的鏈接複用原來的進程鏈接 子進程id:1076 <=== 若是進程池中4個進程都在用,則後面新來的鏈接將處在阻塞的狀態,一旦有進程釋放,新鏈接就會複用被釋放的進程id ('127.0.0.1', 4975) 子進程id:5544 ('127.0.0.1', 4982)
回調函數 是主進程在處理; 誰有返回值就通知主進程,而後主進程去執行回調函數裏的操做;
- 不須要回調函數的場景:若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數;此種狀況可以使用get()獲取執行結果; - 須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。 主進程則調用一個函數去處理該結果,該函數即回調函數; 咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行), 這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
- 應用示例:爬蟲
from multiprocessing import Pool import requests import re import json def get_page(url,pattern): # 判斷並下載網頁,並返回給回調函數 response=requests.get(url) if response.status_code == 200: return (response.text,pattern) else: print('response.status_code not 200.') def parse_page(info): # 做爲回調函數,按照寫好的正則去解析網頁內容 page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } with open('maoyan.txt','a',encoding='utf-8') as f: # 按照定義好的字典,寫入到文件中 f.write('%s\n' % json.dumps(dic)) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board':pattern1, } p=Pool() for url,pattern in url_dic.items(): p.apply_async(get_page,args=(url,pattern),callback=parse_page) p.close() p.join() # 等待進程池的進程任務都執行完畢 with open('maoyan.txt', mode='r', encoding='utf-8') as rf: item_list = rf.readlines() for item in item_list: res = json.loads(item) print(res) # 讀取文件內容,查看爬取的內容 ---maoyan.txt內容--- {"actor": "\u738b\u9719,\u4fdd\u5251\u950b,\u8463\u52c7", "index": "1", "score": "9.3", "time": "2017-06-30", "title": "\u8840\u6218\u6e58\u6c5f"} {"actor": "\u674e\u5fae\u6f2a,\u4ea6\u98ce", "index": "2", "score": "9.3", "time": "2017-06-16", "title": "\u91cd\u8fd4\u00b7\u72fc\u7fa4"} {"actor": "\u9ad8\u5f3a,\u4e8e\u6708\u4ed9,\u674e\u7389\u5cf0", "index": "3", "score": "9.2", "time": "2017-06-09", "title": "\u5fe0\u7231\u65e0\u8a00"} {"actor": "\u6768\u57f9,\u5c3c\u739b\u624e\u5806,\u65af\u6717\u5353\u560e", "index": "4", "score": "8.9", "time": "2017-06-20", "title": "\u5188\u4ec1\u6ce2\u9f50"} {"actor": "\u6234\u592b\u00b7\u5e15\u7279\u5c14,\u9c81\u59ae\u00b7\u739b\u62c9,\u5927\u536b\u00b7\u6587\u7ff0", "index": "5", "score": "8.8", "time": "2017-06-22", "title": "\u96c4\u72ee"} {"actor": "\u76d6\u5c14\u00b7\u52a0\u6735,\u514b\u91cc\u65af\u00b7\u6d3e\u6069,\u7f57\u5bbe\u00b7\u6000\u7279", "index": "6", "score": "8.6", "time": "2017-06-02", "title": "\u795e\u5947\u5973\u4fa0"} {"actor": "\u8521\u5353\u598d,\u5468\u67cf\u8c6a,\u949f\u6b23\u6f7c", "index": "7", "score": "8.5", "time": "2017-06-23", "title": "\u539f\u8c05\u4ed677\u6b21"} {"actor": "\u738b\u6653\u5f64,\u674e\u6654,\u6d2a\u6d77\u5929", "index": "8", "score": "8.1", "time": "2017-05-27", "title": "\u4e09\u53ea\u5c0f\u732a2"} {"actor": "\u590f\u96e8,\u95eb\u59ae,\u6f58\u658c\u9f99", "index": "9", "score": "8.0", "time": "2017-06-29", "title": "\u53cd\u8f6c\u4eba\u751f"} {"actor": "\u6768\u5e42,\u970d\u5efa\u534e,\u91d1\u58eb\u6770", "index": "10", "score": "7.9", "time": "2017-06-29", "title": "\u9006\u65f6\u8425\u6551"} ---打印結果--- {'actor': '王霙,保劍鋒,董勇', 'index': '1', 'score': '9.3', 'time': '2017-06-30', 'title': '血戰湘江'} {'actor': '李微漪,亦風', 'index': '2', 'score': '9.3', 'time': '2017-06-16', 'title': '重返·狼羣'} {'actor': '高強,於月仙,李玉峯', 'index': '3', 'score': '9.2', 'time': '2017-06-09', 'title': '忠愛無言'} {'actor': '楊培,尼瑪扎堆,斯朗卓嘎', 'index': '4', 'score': '8.9', 'time': '2017-06-20', 'title': '岡仁波齊'} {'actor': '戴夫·帕特爾,魯妮·瑪拉,大衛·文翰', 'index': '5', 'score': '8.8', 'time': '2017-06-22', 'title': '雄獅'} {'actor': '蓋爾·加朵,克里斯·派恩,羅賓·懷特', 'index': '6', 'score': '8.6', 'time': '2017-06-02', 'title': '神奇女俠'} {'actor': '蔡卓妍,周柏豪,鍾欣潼', 'index': '7', 'score': '8.5', 'time': '2017-06-23', 'title': '原諒他77次'} {'actor': '王曉彤,李曄,洪海天', 'index': '8', 'score': '8.1', 'time': '2017-05-27', 'title': '三隻小豬2'} {'actor': '夏雨,閆妮,潘斌龍', 'index': '9', 'score': '8.0', 'time': '2017-06-29', 'title': '反轉人生'} {'actor': '楊冪,霍建華,金士傑', 'index': '10', 'score': '7.9', 'time': '2017-06-29', 'title': '逆時營救'}
進程之間數據不共享,可是共享同一套文件系統; 因此訪問同一個文件,或同一個打印終端,是沒有問題的;
多個進程同時執行打印操做: 發現會有多行內容打印到一行的現象(多個進程共享並搶佔同一個打印終端,亂了)
- 示例:
# 共享同一個打印終端 import os,time from multiprocessing import Process class Logger(Process): def __init__(self): super().__init__() # super(Logger,self).__init__() def run(self): # time.sleep(1) print(self.name,'pid: %s' % os.getpid()) if __name__ == '__main__': for i in range(100): # i5, 4核,起1W個進程就會死機,慎跑... l=Logger() l.start()
共享同一個文件; 有的同窗會想到,既然能夠用文件共享數據,那麼進程間通訊用文件做爲數據傳輸介質就能夠了啊; 能夠,可是有問題: 1.效率 2.須要本身加鎖處理
- 示例:
#多進程共享一套文件系統 from multiprocessing import Process def write_to_file(file,mode,num): with open(file,mode=mode,encoding='utf-8') as wf: wf.write(num) if __name__ == '__main__': for i in range(50): p = Process(target=write_to_file,args=('a.txt','a',str(i))) p.start()
加鎖的目的是爲了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改; 沒錯,速度是慢了,犧牲了速度而保證了數據安全; 進程之間數據隔離,可是共享一套文件系統, 於是能夠經過文件來實現進程直接的通訊,但問題是必須本身加鎖處理
- 模擬搶票程序
#文件db的內容爲:{"count":1} #注意必定要用雙引號,否則json沒法識別;
- 沒有加鎖的搶票程序:因爲沒有加鎖,因此存在多個進程同時去讀寫文件的狀況,致使一張票被多我的搶到;
from multiprocessing import Process,Lock import json import time import random def work(dbfile,name,lock): with open(dbfile,encoding='utf-8') as f: dic=json.loads(f.read()) if dic['count'] > 0: dic['count'] -= 1 time.sleep(random.randint(1,3)) # 模擬網絡延遲 with open(dbfile,'w',encoding='utf-8') as f: f.write(json.dumps(dic)) print('\033[43m%s 搶票成功\033[0m' %name) else: print('\033[45m%s 搶票失敗\033[0m' %name) if __name__ == '__main__': lock=Lock() # 進程間不共享數據,因此須要把lock當作參數傳進入 p_l=[] for i in range(100): p=Process(target=work,args=('a.txt','用戶%s' %i,lock)) p_l.append(p) p.start() for p in p_l: p.join() print('主進程') ---結果--- 用戶6 搶票成功 用戶19 搶票失敗 用戶0 搶票失敗 用戶3 搶票成功 用戶45 搶票成功 用戶38 搶票失敗 用戶10 搶票成功 用戶51 搶票失敗 用戶5 搶票失敗 用戶58 搶票失敗 用戶8 搶票失敗 用戶54 搶票失敗 用戶87 搶票失敗 用戶71 搶票失敗 用戶94 搶票失敗 用戶66 搶票失敗 用戶93 搶票失敗 用戶1 搶票成功 用戶61 搶票失敗 用戶31 搶票失敗 用戶70 搶票失敗 用戶13 搶票失敗 用戶77 搶票失敗 用戶92 搶票失敗 用戶7 搶票失敗 用戶34 搶票失敗 用戶44 搶票失敗 用戶23 搶票失敗 用戶29 搶票失敗 用戶33 搶票失敗 用戶41 搶票失敗 用戶82 搶票失敗 用戶86 搶票失敗 用戶39 搶票失敗 用戶43 搶票失敗 用戶90 搶票失敗 用戶17 搶票失敗 用戶28 搶票失敗 用戶14 搶票失敗 用戶67 搶票失敗 用戶48 搶票失敗 用戶37 搶票失敗 用戶24 搶票失敗 用戶63 搶票失敗 用戶46 搶票失敗 用戶25 搶票失敗 用戶74 搶票失敗 用戶47 搶票失敗 用戶80 搶票失敗 用戶57 搶票失敗 用戶11 搶票失敗 用戶30 搶票失敗 用戶96 搶票失敗 用戶73 搶票失敗 用戶91 搶票失敗 用戶22 搶票失敗 用戶20 搶票失敗 用戶89 搶票失敗 用戶83 搶票失敗 用戶98 搶票失敗 用戶53 搶票失敗 用戶88 搶票失敗 用戶79 搶票失敗 用戶78 搶票失敗 用戶49 搶票失敗 用戶64 搶票失敗 用戶95 搶票失敗 用戶18 搶票失敗 用戶97 搶票失敗 用戶59 搶票失敗 用戶72 搶票失敗 用戶42 搶票失敗 用戶21 搶票失敗 用戶32 搶票失敗 用戶4 搶票失敗 用戶27 搶票失敗 用戶65 搶票失敗 用戶62 搶票失敗 用戶99 搶票失敗 用戶55 搶票失敗 用戶81 搶票失敗 用戶15 搶票失敗 用戶40 搶票失敗 用戶69 搶票失敗 用戶85 搶票失敗 用戶16 搶票失敗 用戶50 搶票失敗 用戶26 搶票失敗 用戶60 搶票失敗 用戶75 搶票失敗 用戶35 搶票失敗 用戶68 搶票失敗 用戶36 搶票失敗 用戶52 搶票失敗 用戶84 搶票失敗 用戶76 搶票失敗 用戶12 搶票成功 用戶2 搶票成功 用戶9 搶票成功 用戶56 搶票成功 主進程
- 加鎖的狀況:犧牲執行速度,保護了數據安全性,只有一個用戶能搶到票;
from multiprocessing import Process,Lock import json import time import random def work(dbfile,name,lock): # lock.acquire() with lock: with open(dbfile,encoding='utf-8') as f: dic=json.loads(f.read()) if dic['count'] > 0: dic['count']-=1 time.sleep(random.randint(1,3)) #模擬網絡延遲 with open(dbfile,'w',encoding='utf-8') as f: f.write(json.dumps(dic)) print('\033[43m%s 搶票成功\033[0m' %name) else: print('\033[45m%s 搶票失敗\033[0m' %name) # lock.release() if __name__ == '__main__': start_time = time.time() lock=Lock() # 進程間不共享數據,因此須要把lock當作參數傳進入 p_l=[] for i in range(100): p=Process(target=work,args=('a.txt','用戶%s' % i,lock)) p_l.append(p) p.start() for p in p_l: p.join() stop_time = time.time() print('Run time: %s' % (stop_time-start_time)) ---結果--- 用戶19 搶票成功 用戶1 搶票失敗 用戶0 搶票失敗 用戶3 搶票失敗 用戶7 搶票失敗 用戶13 搶票失敗 用戶5 搶票失敗 用戶55 搶票失敗 用戶51 搶票失敗 用戶43 搶票失敗 用戶39 搶票失敗 用戶59 搶票失敗 用戶63 搶票失敗 用戶62 搶票失敗 用戶2 搶票失敗 用戶50 搶票失敗 用戶47 搶票失敗 用戶23 搶票失敗 用戶14 搶票失敗 用戶9 搶票失敗 用戶18 搶票失敗 用戶75 搶票失敗 用戶21 搶票失敗 用戶27 搶票失敗 用戶54 搶票失敗 用戶11 搶票失敗 用戶61 搶票失敗 用戶15 搶票失敗 用戶31 搶票失敗 用戶38 搶票失敗 用戶25 搶票失敗 用戶35 搶票失敗 用戶6 搶票失敗 用戶30 搶票失敗 用戶34 搶票失敗 用戶42 搶票失敗 用戶36 搶票失敗 用戶67 搶票失敗 用戶26 搶票失敗 用戶46 搶票失敗 用戶17 搶票失敗 用戶49 搶票失敗 用戶71 搶票失敗 用戶22 搶票失敗 用戶45 搶票失敗 用戶10 搶票失敗 用戶53 搶票失敗 用戶65 搶票失敗 用戶29 搶票失敗 用戶69 搶票失敗 用戶73 搶票失敗 用戶33 搶票失敗 用戶52 搶票失敗 用戶58 搶票失敗 用戶37 搶票失敗 用戶41 搶票失敗 用戶24 搶票失敗 用戶40 搶票失敗 用戶48 搶票失敗 用戶81 搶票失敗 用戶57 搶票失敗 用戶32 搶票失敗 用戶83 搶票失敗 用戶79 搶票失敗 用戶77 搶票失敗 用戶20 搶票失敗 用戶95 搶票失敗 用戶89 搶票失敗 用戶93 搶票失敗 用戶91 搶票失敗 用戶44 搶票失敗 用戶99 搶票失敗 用戶97 搶票失敗 用戶90 搶票失敗 用戶87 搶票失敗 用戶85 搶票失敗 用戶74 搶票失敗 用戶66 搶票失敗 用戶16 搶票失敗 用戶86 搶票失敗 用戶82 搶票失敗 用戶98 搶票失敗 用戶70 搶票失敗 用戶78 搶票失敗 用戶72 搶票失敗 用戶84 搶票失敗 用戶28 搶票失敗 用戶94 搶票失敗 用戶4 搶票失敗 用戶60 搶票失敗 用戶8 搶票失敗 用戶96 搶票失敗 用戶80 搶票失敗 用戶92 搶票失敗 用戶76 搶票失敗 用戶88 搶票失敗 用戶12 搶票失敗 用戶64 搶票失敗 用戶56 搶票失敗 用戶68 搶票失敗 Run time: 4.325000047683716
- 需對文件加鎖:保護共享數據;
- 見上面搶票的例子;
進程彼此之間互相隔離,要實現進程間通訊(IPC), multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的;
- 1.隊列(推薦使用),隊列Queue是管道+鎖實現的;先進先出;
# Queue類 Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞; # 屬性 maxsize是隊列中容許最大項數,省略則無大小限制; # 經常使用方法 q.put() 用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。 若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。 若是超時,會拋出Queue.Full異常。 若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。 q.get() 能夠從隊列讀取而且刪除一個元素。get方法也有兩個可選參數:blocked和timeout。 若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。 若是blocked爲False,有兩種狀況存在:若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常. q.get_nowait(): 同 q.get(False) q.put_nowait(): 同 q.put(False) q.empty() 調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目; q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走; q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣;
def put(self, obj, block=True, timeout=None): # 隊列滿的時候,q.put()會阻塞,直到隊列不滿 pass def get(self, block=True, timeout=None): # 隊列空的時候,q,get()會阻塞,直到隊列有新元素 pass
- 示例1
from multiprocessing import Process,Queue q = Queue(3) q.put(1) q.put('liu') q.put(('a','b','c',)) print(q.full()) print(q.qsize()) print(q.get()) print(q.get()) print(q.get()) print(q.empty()) --- True 3 1 liu ('a', 'b', 'c') True
- 示例2:簡單的生產者消費者模型
from multiprocessing import Queue, Process import random, os, time def getter(name, queue): print('Son process name: %s, pid: %s, ppid: %s' % (name, os.getpid(), os.getppid())) while True: try: value = queue.get(block=True, timeout=3) # block爲True,就是若是隊列中無數據了。 # |—————— 若timeout默認是None,那麼會一直等待下去。 # |—————— 若timeout設置了時間,那麼會等待timeout秒後纔會拋出Queue.Empty異常 # block 爲False,若是隊列中無數據,就拋出Queue.Empty異常 print("Process getter get: %f" % value) except Exception as e: print('getter捕捉到異常:%s' % e) break def putter(name, queue): print('Son process name: %s, pid: %s, ppid: %s' % (name, os.getpid(), os.getppid())) for i in range(0, 10): time.sleep(1) value = random.randint(1,10) queue.put(value) # 放入數據 put(obj[, block[, timeout]]) # 若block爲True,如隊列是滿的: # |—————— 若timeout是默認None,那麼就會一直等下去 # |—————— 若timeout設置了等待時間,那麼會等待timeout秒後,若是仍是滿的,那麼就拋出Queue.Full. # 若block是False,若是隊列滿了,直接拋出Queue.Full print("Process putter put: %f" % value) if __name__ == '__main__': queue = Queue() getter_process = Process(target=getter, args=("Getter", queue)) putter_process = Process(target=putter, args=("Putter", queue)) getter_process.start() putter_process.start() getter_process.join() putter_process.join() print('Main process, pid: %s' % os.getpid()) ---結果--- Son process name: Getter, pid: 3088, ppid: 5656 Son process name: Putter, pid: 2712, ppid: 5656 Process putter put: 8.000000 Process getter get: 8.000000 Process putter put: 6.000000 Process getter get: 6.000000 Process putter put: 6.000000 Process getter get: 6.000000 Process putter put: 8.000000 Process getter get: 8.000000 Process putter put: 3.000000 Process getter get: 3.000000 Process putter put: 6.000000 Process getter get: 6.000000 Process putter put: 1.000000 Process getter get: 1.000000 Process putter put: 7.000000 Process getter get: 7.000000 Process putter put: 8.000000 Process getter get: 8.000000 Process putter put: 6.000000 Process getter get: 6.000000 getter捕捉到異常: Main process, pid: 5656
- 2.管道(不推薦)
tail -f access.log |grep '404'
# Pipe函數 def Pipe(duplex=True): return Connection(), Connection() Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe. 在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象; 強調一點:必須在產生Process對象以前產生管道; # 參數 dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
新建一個Pipe(duplex)的時候,若是duplex爲True,那麼建立的管道是雙向的;若是duplex爲False,那麼建立的管道是單向的。 # 經常使用方法 conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError; conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
- 管道Pipe創建的過程(Pipe的讀寫效率要高於Queue。)
進程間的Pipe基於fork機制創建。 當主進程建立Pipe的時候,Pipe的兩個Connections鏈接的的都是主進程。 當主進程建立子進程後,Connections也被拷貝了一份。此時有了4個Connections。 此後,關閉主進程的一個Out Connection,關閉一個子進程的一個In Connection。那麼就創建好了一個輸入在主進程,輸出在子進程的管道。
- 示例1:基於管道實現的進程間通訊
from multiprocessing import Pipe, Process def son_process(pipe): _out_pipe, _in_pipe = pipe # 關閉fork過來的輸入端 _in_pipe.close() while True: try: msg = _out_pipe.recv() print('子進程經過管道得到消息:%s' % msg) except Exception as e: print('子進程中捕獲到異常:%s' % e) # 當out_pipe接受不到輸出的時候且輸入被關閉的時候,會拋出EORFError,能夠捕獲而且退出子進程 break if __name__ == '__main__': out_pipe, in_pipe = Pipe(True) son_p = Process(target=son_process, args=((out_pipe, in_pipe),)) son_p.start() # 等pipe被fork 後,關閉主進程的輸出端 # 這樣,建立的Pipe一端鏈接着主進程的輸入,一端鏈接着子進程的輸出口 out_pipe.close() for x in range(20): in_pipe.send(x) in_pipe.close() son_p.join() print("主進程也結束了") ---結果--- 子進程經過管道得到消息:0 子進程經過管道得到消息:1 子進程經過管道得到消息:2 子進程經過管道得到消息:3 子進程經過管道得到消息:4 子進程經過管道得到消息:5 子進程經過管道得到消息:6 子進程經過管道得到消息:7 子進程經過管道得到消息:8 子進程經過管道得到消息:9 子進程經過管道得到消息:10 子進程經過管道得到消息:11 子進程經過管道得到消息:12 子進程經過管道得到消息:13 子進程經過管道得到消息:14 子進程經過管道得到消息:15 子進程經過管道得到消息:16 子進程經過管道得到消息:17 子進程經過管道得到消息:18 子進程經過管道得到消息:19 子進程中捕獲到異常: 主進程也結束了
- 示例2:基於管道實現的進程間通訊
from multiprocessing import Process,Pipe import time def consumer(p,name): left,right=p left.close() while True: try: baozi=right.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: right.close() break def producer(seq,p): left,right=p right.close() for i in seq: left.send(i) time.sleep(1) else: left.close() if __name__ == '__main__': left,right=Pipe() c1=Process(target=consumer,args=((left,right),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(left,right)) right.close() left.close() c1.join() print('主進程') ---結果--- c1 收到包子:0 c1 收到包子:1 c1 收到包子:2 c1 收到包子:3 c1 收到包子:4 c1 收到包子:5 c1 收到包子:6 c1 收到包子:7 c1 收到包子:8 c1 收到包子:9 主進程
- 示例3:基於管道的雙向通訊
from multiprocessing import Process,Pipe def adder(p,name): server,client=p client.close() while True: try: x,y=server.recv() print('%s 經過管道收到了:%s 和 %s' % (name,x,y)) except EOFError: server.close() break res = x + y server.send(res) print('%s 迴應消息:%s' % (name, res)) if __name__ == '__main__': server,client=Pipe() s1=Process(target=adder,args=((server,client),'s1')) s1.start() server.close() client.send((10,20)) print(client.recv()) client.close() s1.join() print('主進程') ---結果--- s1 經過管道收到了:10 和 20 30 s1 迴應消息:30 主進程
- 3.擴展:生產者消費者模型
- 1.主進程做爲生產者,一個子進程做爲消費者
# 生產者消費者模型 from multiprocessing import Process,Queue import time import random def consumer(q,name): while True: time.sleep(random.randint(1,3)) try: res=q.get(block=True,timeout=5) print('\033[41m消費者%s拿到了%s\033[0m' %(name,res)) except Exception as e: print('消費者捕獲異常:%s' % e) break def producer(seq,q,name): for item in seq: time.sleep(random.randint(1,3)) q.put(item) print('\033[45m生產者%s生產了%s\033[0m' %(name,item)) if __name__ == '__main__': q=Queue() c=Process(target=consumer,args=(q,'standby'),) c.start() seq=['包子%s' % i for i in range(10)] # 列表生成式 producer(seq,q,'廚師') print('生產者已經生產完畢') ---結果--- 生產者廚師生產了包子0 生產者廚師生產了包子1 消費者standby拿到了包子0 生產者廚師生產了包子2 消費者standby拿到了包子1 消費者standby拿到了包子2 生產者廚師生產了包子3 生產者廚師生產了包子4 消費者standby拿到了包子3 生產者廚師生產了包子5 消費者standby拿到了包子4 生產者廚師生產了包子6 消費者standby拿到了包子5 生產者廚師生產了包子7 消費者standby拿到了包子6 消費者standby拿到了包子7 生產者廚師生產了包子8 消費者standby拿到了包子8 生產者廚師生產了包子9 生產者已經生產完畢 消費者standby拿到了包子9 消費者捕獲異常:
- 2.開兩個子進程,一個做爲生產者,另外一個做爲消費者
from multiprocessing import Process,Queue import time import random def consumer(q,name): while True: time.sleep(random.randint(1,3)) res=q.get() if res is None:break print('\033[41m消費者%s拿到了%s\033[0m' %(name,res)) def producer(seq,q,name): for item in seq: time.sleep(random.randint(1,3)) q.put(item) print('\033[45m生產者%s生產了%s\033[0m' %(name,item)) q.put(None) if __name__ == '__main__': q=Queue() c=Process(target=consumer,args=(q,'standby'),) c.start() seq=['包子%s' %i for i in range(10)] p=Process(target=producer,args=(seq,q,'廚師')) p.start() c.join() print('主進程') ---結果--- 生產者廚師生產了包子0 消費者standby拿到了包子0 生產者廚師生產了包子1 消費者standby拿到了包子1 生產者廚師生產了包子2 生產者廚師生產了包子3 消費者standby拿到了包子2 生產者廚師生產了包子4 消費者standby拿到了包子3 生產者廚師生產了包子5 消費者standby拿到了包子4 生產者廚師生產了包子6 消費者standby拿到了包子5 生產者廚師生產了包子7 消費者standby拿到了包子6 生產者廚師生產了包子8 消費者standby拿到了包子7 生產者廚師生產了包子9 消費者standby拿到了包子8 消費者standby拿到了包子9 主進程
- 3.JoinableQueue
建立隊列的另一個類:JoinableQueue([maxsize]); 這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。 通知進程是使用共享的信號和條件變量來實現的。 #參數介紹: maxsize是隊列中容許最大項數,省略則無大小限制; #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法以外還具備: q.task_done(): 使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常; q.join(): 生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止;
- JoinableQueue示例
from multiprocessing import Process,JoinableQueue import time import random def consumer(q,name): while True: time.sleep(random.randint(1,3)) res=q.get() q.task_done() print('\033[41m消費者%s拿到了%s\033[0m' %(name,res)) def producer(seq,q,name): for item in seq: time.sleep(random.randint(1,3)) q.put(item) print('\033[45m生產者%s生產了%s\033[0m' %(name,item)) print('生產者生產完畢') q.join() print('消費者都消費完了') if __name__ == '__main__': q=JoinableQueue() c=Process(target=consumer,args=(q,'standby'),) c.daemon=True #設置守護進程,主進程結束c就結束 c.start() seq=['包子%s' %i for i in range(10)] p=Process(target=producer,args=(seq,q,'廚師')) p.start() p.join() #主進程等待p結束,p等待c把數據都取完,c一旦取完數據,p.join就是再也不阻塞,進 # 而主進程結束,主進程結束會回收守護進程c,並且c此時也沒有存在的必要了 print('主進程') ---結果--- 生產者廚師生產了包子0 消費者standby拿到了包子0 生產者廚師生產了包子1 消費者standby拿到了包子1 生產者廚師生產了包子2 消費者standby拿到了包子2 生產者廚師生產了包子3 消費者standby拿到了包子3 生產者廚師生產了包子4 消費者standby拿到了包子4 生產者廚師生產了包子5 消費者standby拿到了包子5 生產者廚師生產了包子6 消費者standby拿到了包子6 生產者廚師生產了包子7 消費者standby拿到了包子7 生產者廚師生產了包子8 消費者standby拿到了包子8 生產者廚師生產了包子9 生產者生產完畢 消費者standby拿到了包子9 消費者都消費完了 主進程
- 示例1:不加鎖的狀況
# Manager實現共享內存,沒加鎖的例子 # 這種狀況會出現同時有多個進程在寫同一個內存中所共享的數據,致使最後數據不對; from multiprocessing import Manager,Process def func(dic): dic['count'] -= 1 if __name__ == '__main__': m = Manager() dic = m.dict({'count':100}) obj_l = [] for i in range(100): p = Process(target=func,args=(dic,)) p.start() obj_l.append(p) for p in obj_l: p.join() print(dic) ---結果--- {'count': 2}
- 示例2:第一種加鎖的寫法
# Manager實現共享內存,加鎖實現的例子 # 第一種寫法: from multiprocessing import Manager,Process,Lock def func(dic,lock): lock.acquire() dic['count'] -= 1 lock.release() if __name__ == '__main__': lock = Lock() m = Manager() dic = m.dict({'count':100}) obj_l = [] for i in range(100): p = Process(target=func,args=(dic,lock)) p.start() obj_l.append(p) for p in obj_l: p.join() print(dic) ---結果--- {'count': 0}
- 示例3.第二種加鎖的寫法
from multiprocessing import Manager,Process,Lock def func(dic,lock): with lock: dic['count'] -= 1 if __name__ == '__main__': lock = Lock() # m = Manager() with Manager() as m: dic = m.dict({'count':100}) obj_l = [] for i in range(100): p = Process(target=func,args=(dic,lock)) p.start() obj_l.append(p) for p in obj_l: p.join() print(dic) ---結果--- {'count': 0}
Paramiko is a Python (2.6+, 3.3+) implementation of the SSHv2 protocol [1], providing both client and server functionality. While it leverages a Python C extension for low level cryptography (Cryptography), Paramiko itself is a pure Python interface around SSH networking concepts.
- paramiko是用python語言寫的一個模塊,遵循SSH2協議,支持以加密和認證的方式,進行遠程服務器的鏈接。
1 __all__ = [ 2 'Transport', 3 'SSHClient', 4 'MissingHostKeyPolicy', 5 'AutoAddPolicy', 6 'RejectPolicy', 7 'WarningPolicy', 8 'SecurityOptions', 9 'SubsystemHandler', 10 'Channel', 11 'PKey', 12 'RSAKey', 13 'DSSKey', 14 'Message', 15 'SSHException', 16 'AuthenticationException', 17 'PasswordRequiredException', 18 'BadAuthenticationType', 19 'ChannelException', 20 'BadHostKeyException', 21 'ProxyCommand', 22 'ProxyCommandFailure', 23 'SFTP', 24 'SFTPFile', 25 'SFTPHandle', 26 'SFTPClient', 27 'SFTPServer', 28 'SFTPError', 29 'SFTPAttributes', 30 'SFTPServerInterface', 31 'ServerInterface', 32 'BufferedFile', 33 'Agent', 34 'AgentKey', 35 'HostKeys', 36 'SSHConfig', 37 'util', 38 'io_sleep', 39 ]
- 實例1:密碼驗證登陸主機,遠程執行命令
import paramiko IP = '10.0.0.9' PORT = 22 USER = 'root' PASSWORD = '123456.' ssh_conn = paramiko.SSHClient() ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh_conn.connect(IP,PORT,USER,PASSWORD,timeout=3) except Exception as e: print('鏈接失敗:%s' % e) while True: cmd = input('Input cmd, q/Q to exit.>>>\t').strip() if 'Q' == cmd.upper(): print('Bye...') break stdin,stdout,stderr = ssh_conn.exec_command(cmd) # print(stdin.read()) res = stderr.read().decode('utf-8') if not res: res = stdout.read().decode('utf-8') print(res) ssh_conn.close()
- 實例2:密碼驗證登陸主機,執行ftp上傳下載操做
# 下載到本地 import paramiko t = paramiko.Transport(('10.0.0.9',22)) t.connect(username='root',password='123456.') sftp = paramiko.SFTPClient.from_transport(t) sftp.get(r'/data/pass.txt','1.txt') t.close() # 上傳到遠端服務器 import paramiko t = paramiko.Transport(('10.0.0.9',22)) t.connect(username='root',password='123456.') sftp = paramiko.SFTPClient.from_transport(t) sftp.put(r'D:\soft\work\Python_17\day09\paramiko_demo.py','/data/paramiko_demo.py') # sftp.get(r'/data/pass.txt','1.txt') t.close()
- 實例3:祕鑰驗證登陸遠程主機,執行命令
參見:Pool多進程示例
參考:http://www.dabeaz.com/python/UnderstandingGIL.pdf
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
- GIL並非Python的特性,Python徹底能夠不依賴於GIL;
GIL使得同一時刻統一進程中只有一個線程被執行; 進程能夠利用多核,可是開銷大,而python的多線程開銷小,但卻沒法利用多核優點;
seq=['數字:%s' %i for i in range(10)] print(type(seq)) print(seq) ---結果--- <class 'list'> ['數字:0', '數字:1', '數字:2', '數字:3', '數字:4', '數字:5', '數字:6', '數字:7', '數字:8', '數字:9']
題目:簡單主機批量管理工具
需求:
1 # Config for configparser test. 2 3 ;[DEFAULT] 4 ;Admin = 'liulixin@hitedu.org' 5 ;Description = 'Traffic Server' 6 7 [QLS_GROUP] 8 Master = True 9 Slave1 = True 10 Slave2 = False 11 Slave3 = False 12 13 [Master] 14 ip = 10.0.0.9 15 port = 22 16 user = root 17 password = 123456. 18 enable = True 19 20 [Slave1] 21 ip = 10.0.0.8 22 port = 55336 23 user = root 24 password = slave1. 25 enable = True 26 27 [Slave2] 28 ip = 10.0.0.7 29 port = 3333 30 user = root 31 password = slave2. 32 enable = False 33 34 [Slave3] 35 ip = 10.0.0.6 36 port = 3307 37 user = root 38 password = slave3. 39 enable = True
程序代碼:
#!/usr/bin/python # -*- coding:utf-8 -*- import os import paramiko import configparser from multiprocessing import Pool file_path = os.path.abspath(__file__) config = configparser.ConfigParser() config.read('my.cnf',encoding='utf-8') info = ''' Command Syntax like this: 批量執行命令:batch_run -h h1,h2 -g web_clusters,db_servers -cmd "df -h" 批量拷貝文件:batch_scp -h h1,h2 -g web_clusters,db_servers -action put test.py /tmp/ 退出:q/Q ''' def ssh_cmd(ip,port,user,password,cmd): ssh_conn = paramiko.SSHClient() ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh_conn.connect(ip,int(port),user,password,timeout=3) except Exception as e: print('鏈接失敗:%s' % e) stdin, stdout, stderr = ssh_conn.exec_command(cmd) ret = [] ret.append(ip) res = stderr.read().decode('utf-8') if not res: res = stdout.read().decode('utf-8') ret.append(res) return ret def ssh_cmd_callback(ret): print("This is callback func of %s" % ret[0]) def scp_put(ip,port,user,password,file_to_put,remote_dir): try: trans = paramiko.Transport((ip,int(port))) trans.connect(username=user, password=password) sftp = paramiko.SFTPClient.from_transport(trans) sftp.put(localpath=os.path.join(os.path.dirname(file_path),file_to_put),\ remotepath=r'%s/%s' % (remote_dir,file_to_put)) trans.close() except Exception as e: print('scp_put err: %s' % e) def scp_get(ip,port,user,password,file_to_get, local_file_name): try: trans = paramiko.Transport((ip, int(port))) trans.connect(username=user, password=password) sftp = paramiko.SFTPClient.from_transport(trans) print(file_to_get,local_file_name) sftp.get(file_to_get,local_file_name) trans.close() except Exception as e: print('scp_get err: %s' % e) def output(res_list): for res in res_list: # print(res.get()) print(res.get()[1]) def get_target_ip_dict(cmd_list): target_ip_dict = {} if '-h' in cmd_list: sectionos_list = config.sections() sectionos_list.remove('QLS_GROUP') h_index = cmd_list.index('-h') input_host_str = cmd_list[h_index + 1] input_host_list = input_host_str.split(',') for host in input_host_list: if host.capitalize() in sectionos_list: enable = config.get(host.capitalize(), 'enable') if 'False' == enable: print('The %s is offline now, continue...' % host.capitalize()) continue item_dict = { 'ip': None, 'port': None, 'user': None, 'password': None, } item_dict['ip'] = config.get(host.capitalize(), 'ip') item_dict['port'] = config.get(host.capitalize(), 'port') item_dict['user'] = config.get(host.capitalize(), 'user') item_dict['password'] = config.get(host.capitalize(), 'password') if host.capitalize() not in target_ip_dict: target_ip_dict[host.capitalize()] = item_dict else: print('No server: %s exist.' % host) if '-g' in cmd_list: sectionos_list = config.sections() g_index = cmd_list.index('-g') input_group_str = cmd_list[g_index + 1] input_group_list = input_group_str.split(',') for group in input_group_list: if group.upper() not in sectionos_list: print('No group: %s exist.' % group) else: available_tag_list = [] for tag, value in config.items(group.upper()): if 'True' == value: available_tag_list.append(tag.capitalize()) for tag in available_tag_list: if tag.capitalize() not in target_ip_dict: item_dict = { 'ip': None, 'port': None, 'user': None, 'password': None, } item_dict['ip'] = config.get(tag.capitalize(), 'ip') item_dict['port'] = config.get(tag.capitalize(), 'port') item_dict['user'] = config.get(tag.capitalize(), 'user') item_dict['password'] = config.get(tag.capitalize(), 'password') target_ip_dict[tag.capitalize()] = item_dict return target_ip_dict def batch_run(cmd_list): target_ip_dict = get_target_ip_dict(cmd_list) cmd_index = cmd_list.index('-cmd') cmd_to_exec = ' '.join(cmd_list[cmd_index+1:]) my_pool = Pool(len(target_ip_dict)) res_list = [] for host in target_ip_dict: res = my_pool.apply_async(func=ssh_cmd,args=(target_ip_dict[host]['ip'],\ target_ip_dict[host]['port'],\ target_ip_dict[host]['user'],\ target_ip_dict[host]['password'],\ cmd_to_exec.strip('"')), callback=ssh_cmd_callback) res_list.append(res) my_pool.close() my_pool.join() output(res_list) def batch_scp(cmd_list): target_ip_dict = get_target_ip_dict(cmd_list) action_index = cmd_list.index('-action') if 'PUT' != cmd_list[action_index+1].upper() and 'GET' != cmd_list[action_index+1].upper(): print("Scp option invaild, just support put and get option.") return # -action put test.py /tmp/ if 'PUT' == cmd_list[action_index+1].upper(): file_to_put = cmd_list[action_index+2] remote_dir = cmd_list[-1] if os.path.exists(file_to_put): print('Start to put %s' % file_to_put) my_pool = Pool(len(target_ip_dict)) res_list = [] for host in target_ip_dict: res = my_pool.apply_async(func=scp_put, args=(target_ip_dict[host]['ip'],\ target_ip_dict[host]['port'],\ target_ip_dict[host]['user'],\ target_ip_dict[host]['password'],\ file_to_put,remote_dir)) res_list.append(res) my_pool.close() my_pool.join() # output(res_list) print('End to put %s' % file_to_put) else: print('%s not exist.' % file_to_put) return # -action get /path/test.py local_file_name if 'GET' == cmd_list[action_index + 1].upper(): file_to_get = cmd_list[action_index + 2] local_file_name = cmd_list[-1] print('Start to get %s' % file_to_get) my_pool = Pool(len(target_ip_dict)) res_list = [] for host in target_ip_dict: res = my_pool.apply_async(func=scp_get, args=(target_ip_dict[host]['ip'],\ target_ip_dict[host]['port'],\ target_ip_dict[host]['user'],\ target_ip_dict[host]['password'],\ file_to_get, local_file_name)) res_list.append(res) my_pool.close() my_pool.join() print('End to get %s' % file_to_get) def bye(): print('Bye...') exit(0) cmd_option = { 'batch_run':batch_run, 'batch_scp':batch_scp, } if __name__ == '__main__': while True: print(info) cmd_input = input('請輸入命令>>>\t').strip() if 'Q' == cmd_input.upper(): bye() cmd_list = cmd_input.split() if cmd_list[0] not in cmd_option: print('輸入無效') elif '-h' != cmd_list[1] and '-g' != cmd_list[1]: print(type(cmd_list[1]),cmd_list[1]) print('目標主機/主機組無效') elif '-cmd' not in cmd_list and '-action' not in cmd_list: print('輸入的操做命令不符合語法規則') else: cmd_option[cmd_list[0]](cmd_list)