1、背景知識html
顧明思義,進程即正在執行的一個過程,進程是對正在雲的程序的一個抽象。python
進程的概念起源與操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一,操做系統的其餘全部內容都是圍繞進程的概念展開的。linux
ps:即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力,將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。nginx
必備的理論基礎:git
1 #一 操做系統的做用: 2 1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口 3 2:管理、調度進程,而且將多個進程對硬件的競爭變得有序 4 5 #二 多道技術: 6 1.產生背景:針對單核,實現併發 7 ps: 8 如今的主機通常是多核,那麼每一個核都會利用多道技術 9 有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個 10 cpu中的任意一個,具體由操做系統調度算法決定。 11 12 2.空間上的複用:如內存中同時有多道程序 13 3.時間上的複用:複用一個cpu的時間片 14 強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣 15 才能保證下次切換回來時,能基於上次切走的位置繼續運行
2、python併發編程之進程github
一、進程:正在進行的一個過程或者說一個任務,二負責執行任務則是cpu。web
舉例(單核+多道,實現多個進程的併發執行):
egon在一個時間段內有不少任務要作:python備課的任務,寫書的任務,交女友的任務,王者榮耀上分的任務,
但egon同一時刻只能作一個任務(cpu同一時間只能幹一個活),如何才能玩出多個任務併發執行的效果?
egon備一會課,再去跟李傑的女友聊聊天,再去打一會王者榮耀....這就保證了每一個任務都在進行中.
二、進程與程序的區別算法
程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。shell
舉例:
想象一位有一手好廚藝的計算機科學家egon正在爲他的女兒元昊烘製生日蛋糕。
他有作生日蛋糕的食譜,
廚房裏有所需的原料:麪粉、雞蛋、韭菜,蒜泥等。
在這個比喻中:
作蛋糕的食譜就是程序(即用適當形式描述的算法)
計算機科學家就是處理器(cpu)
而作蛋糕的各類原料就是輸入數據。
進程就是廚師閱讀食譜、取來各類原料以及烘製蛋糕等一系列動做的總和。
如今假設計算機科學家egon的兒子alex哭着跑了進來,說:XXXXXXXXXXXXXX。
科學家egon想了想,處理兒子alex蟄傷的任務比給女兒元昊作蛋糕的任務更重要,因而
計算機科學家就記錄下他照着食譜作到哪兒了(保存進程的當前狀態),而後拿出一本急救手冊,按照其中的指示處理蟄傷。這裏,咱們看處處理機從一個進程(作蛋糕)切換到另外一個高優先級的進程(實施醫療救治),每一個進程擁有各自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完以後,這位計算機科學家又回來作蛋糕,從他
離開時的那一步繼續作下去。
須要強調的是:同一個程序執行兩次,那也是兩個進程,好比打開暴風影音,雖然都是同一個軟件,可是一個能夠播放一個視頻。數據庫
三、併發與並行
不管是並行仍是併發,在用戶看來都是‘同時’運行的,無論是進程仍是線程,都只是一個任務而已,真是幹活的是cpu,cpu來作這些任務,而一個cpu同一時刻只能執行衣蛾任務。
併發:是僞並行,即看起來是同時運行,單個cpu+多道技術就能夠實現併發,(並行也屬於併發)
並行:同時運行,只有具有多個cpu才能實現並行
單核下,能夠利用多道技術,多個核,每一個核也均可以利用多道技術(多道技術就針對單核而言的)
有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4,一旦任務1遇到i/o就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術,而一旦任務1的i/o結束了,操做系統會從新調用它(須要進程的調度,分配給那個cpu運行,由操做系統說了算),可能被分配給四個cpu中的任意一個去執行
全部現代計算機常常會在同一時間作不少件事,一個用戶的pc(不管是單cpu仍是多cpu),均可以同時運行多個任務(一個任務能夠理解爲一個程序)
啓動一個進程來殺毒(360軟件)
啓動一個進程來看電影(暴風影音)
啓動一個進程來聊天(騰訊QQ)
全部的這些進程都需被管理,因而一個支持多進程的多道程序系統是相當重要的
多道技術概念回顧:內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另一個,使每一個進程各自運行幾十或幾百毫秒,這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻能夠運行多個進程,這就給人產生了並行的錯覺,即僞併發,以此來區分多處理器操做系統的真正硬件並行(多個cpu共享同一個物理內存)
四、同步與異步
同步執行:一個進程在執行某個任務時,另一個進程必須等待其執行完畢,才能繼續執行。
異步執行:一個進程在執行某個任務時,另一個進程無需等待其執行完畢,就能夠繼續執行,當有消息返回時,系統會通知進行處理,這樣能夠提升執行效率。
舉個例子,打電話時就是同步通訊,發短信時就是異步通訊。
五、進程的建立
但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。 而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程 1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印) 2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等) 3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音) 4. 一個批處理做業的初始化(只在大型機的批處理系統中應用) 不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的: 1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程) 2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。 關於建立的子進程,UNIX和windows 1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另一個進程。 2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。
六、進程的終止
1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess) 2. 出錯退出(自願,python a.py中a.py不存在) 3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...) 4. 被其餘進程殺死(非自願,如kill -9)
七、進程的層次結構
不管UNIX仍是windows,進程只有一個父進程,不一樣的是: 1. 在UNIX中全部的進程,都是以init進程爲根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的全部成員。 2. 在windows中,沒有進程層次的概念,全部的進程都是地位相同的,惟一相似於進程層次的暗示,是在建立進程時,父進程獲得一個特別的令牌(稱爲句柄),該句柄能夠用來控制子進程,可是父進程有權把該句柄傳給其餘子進程,這樣就沒有層次了
八、進程的狀態
tail -f access.log |grep '404' 執行程序tail,開啓一個子進程,執行程序grep,開啓另一個子進程,兩個進程之間基於管道'|'通信,將tail的結果做爲grep的輸入。 進程grep在等待輸入(即I/O)時的狀態稱爲阻塞,此時grep命令都沒法運行 其實在兩種狀況下會致使一個進程在邏輯上不能運行, 1. 進程掛起是自身緣由,遇到I/O阻塞,便要讓出CPU讓其餘進程去執行,這樣保證CPU一直在工做 2. 與進程無關,是操做系統層面,可能會由於一個進程佔用時間過多,或者優先級等緣由,而調用其餘的進程去使用CPU。 於是一個進程由三種狀態
九、進程併發的實現
進程併發的實如今於,硬件中斷一個正在運行的進程,把此時進程運行的全部狀態保存下來,爲此,操做系統維護一張表格,即進程表(process table),每一個進程佔用一個進程表項(這些表項也稱爲進程控制塊)
該表存放了進程狀態的重要信息:程序計數器,堆棧指針,內存分配情況,全部打開文件的狀態,賬號和調度信息,以及其餘在進程由運行狀態轉爲就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啓動時,就像從未被中斷過同樣。
3、python併發編程之進程
一、multiprocessing模塊介紹
python中多線程沒法利用多核優點,若是想要充分地使用多核cpu的資源(os.cpu_count()),在python中大部分狀況須要使用多進程,python提供了multiprocessing。
multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。
multiprocessing模塊的功能衆多:支持子進程,通訊和共享數據,執行不一樣形式的同步,提供了process、Queue、Lock等組件。
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限與該進程內。
二、process類的介紹
建立進程的類:
1 Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 2 3 強調: 4 1. 須要使用關鍵字的方式來指定參數 5 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
1 group參數未使用,值始終爲None 2 3 target表示調用對象,即子進程要執行的任務 4 5 args表示調用對象的位置參數元組,args=(1,2,'egon',) 6 7 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} 8 9 name爲子進程的名稱
方法介紹:
1 p.start():啓動進程,並調用該子進程中的p.run() 2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 3 4 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 5 p.is_alive():若是p仍然運行,返回True 6 7 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
屬性介紹:
1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置 2 3 p.name:進程的名稱 4 5 p.pid:進程的pid 6 7 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) 8 9 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
4、process類的使用
注意:在windows中process()必須放到 # if __name__ == '__main__':下
1 Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 2 If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 3 This is the reason for hiding calls to Process() inside 4 5 if __name__ == "__main__" 6 since statements inside this if-statement will not get called upon import. 7 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 8 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 9 這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。 10 11 詳細解釋
建立並開啓子進程的兩種方式
1 #開進程的方法一: 2 import time 3 import random 4 from multiprocessing import Process 5 def piao(name): 6 print('%s piaoing' %name) 7 time.sleep(random.randrange(1,5)) 8 print('%s piao end' %name) 9 10 11 12 p1=Process(target=piao,args=('egon',)) #必須加,號 13 p2=Process(target=piao,args=('alex',)) 14 p3=Process(target=piao,args=('wupeqi',)) 15 p4=Process(target=piao,args=('yuanhao',)) 16 17 p1.start() 18 p2.start() 19 p3.start() 20 p4.start() 21 print('主線程') 22 23 方法一
1 #開進程的方法二: 2 import time 3 import random 4 from multiprocessing import Process 5 6 7 class Piao(Process): 8 def __init__(self,name): 9 super().__init__() 10 self.name=name 11 def run(self): 12 print('%s piaoing' %self.name) 13 14 time.sleep(random.randrange(1,5)) 15 print('%s piao end' %self.name) 16 17 p1=Piao('egon') 18 p2=Piao('alex') 19 p3=Piao('wupeiqi') 20 p4=Piao('yuanhao') 21 22 p1.start() #start會自動調用run 23 p2.start() 24 p3.start() 25 p4.start() 26 print('主線程') 27 28 方法二
練習1:socket通訊變成併發的形式
1 from socket import * 2 from multiprocessing import Process 3 4 server=socket(AF_INET,SOCK_STREAM) 5 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 6 server.bind(('127.0.0.1',8080)) 7 server.listen(5) 8 9 def talk(conn,client_addr): 10 while True: 11 try: 12 msg=conn.recv(1024) 13 if not msg:break 14 conn.send(msg.upper()) 15 except Exception: 16 break 17 18 if __name__ == '__main__': #windows下start進程必定要寫到這下面 19 while True: 20 conn,client_addr=server.accept() 21 p=Process(target=talk,args=(conn,client_addr)) 22 p.start() 23 24 server端
1 from socket import * 2 3 client=socket(AF_INET,SOCK_STREAM) 4 client.connect(('127.0.0.1',8080)) 5 6 7 while True: 8 msg=input('>>: ').strip() 9 if not msg:continue 10 11 client.send(msg.encode('utf-8')) 12 msg=client.recv(1024) 13 print(msg.decode('utf-8')) 14 15 from socket import * 16 17 client=socket(AF_INET,SOCK_STREAM) 18 client.connect(('127.0.0.1',8080)) 19 20 21 while True: 22 msg=input('>>: ').strip() 23 if not msg:continue 24 25 client.send(msg.encode('utf-8')) 26 msg=client.recv(1024) 27 print(msg.decode('utf-8')) 28 29 多個client端
每來一個客戶端,都在服務端開啓一個進程,若是併發來一個萬個客戶端,要開啓一萬個進程嗎,你本身嘗試着在你本身的機器上開啓一萬個,10萬個進程試一試。
解決方法:進程池
process對象的join方法
1 from multiprocessing import Process 2 import time 3 import random 4 5 class Piao(Process): 6 def __init__(self,name): 7 self.name=name 8 super().__init__() 9 def run(self): 10 print('%s is piaoing' %self.name) 11 time.sleep(random.randrange(1,3)) 12 print('%s is piao end' %self.name) 13 14 15 p=Piao('egon') 16 p.start() 17 p.join(0.0001) #等待p中止,等0.0001秒就再也不等了 18 print('開始') 19 20 join:主進程等,等待子進程結束
1 from multiprocessing import Process 2 import time 3 import random 4 def piao(name): 5 print('%s is piaoing' %name) 6 time.sleep(random.randint(1,3)) 7 print('%s is piao end' %name) 8 9 p1=Process(target=piao,args=('egon',)) 10 p2=Process(target=piao,args=('alex',)) 11 p3=Process(target=piao,args=('yuanhao',)) 12 p4=Process(target=piao,args=('wupeiqi',)) 13 14 p1.start() 15 p2.start() 16 p3.start() 17 p4.start() 18 19 #有的同窗會有疑問:既然join是等待進程結束,那麼我像下面這樣寫,進程不就又變成串行的了嗎? 20 #固然不是了,必須明確:p.join()是讓誰等? 21 #很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p, 22 23 #詳細解析以下: 24 #進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了 25 #而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵 26 #join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待 27 # 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間 28 p1.join() 29 p2.join() 30 p3.join() 31 p4.join() 32 33 print('主線程') 34 35 36 #上述啓動進程與join進程能夠簡寫爲 37 # p_l=[p1,p2,p3,p4] 38 # 39 # for p in p_l: 40 # p.start() 41 # 42 # for p in p_l: 43 # p.join() 44 45 有了join,程序不就是串行了嗎???
process對象的其餘方法或屬性
1 #進程對象的其餘方法一:terminate,is_alive 2 from multiprocessing import Process 3 import time 4 import random 5 6 class Piao(Process): 7 def __init__(self,name): 8 self.name=name 9 super().__init__() 10 11 def run(self): 12 print('%s is piaoing' %self.name) 13 time.sleep(random.randrange(1,5)) 14 print('%s is piao end' %self.name) 15 16 17 p1=Piao('egon1') 18 p1.start() 19 20 p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活 21 print(p1.is_alive()) #結果爲True 22 23 print('開始') 24 print(p1.is_alive()) #結果爲False 25 26 terminate與is_alive
1 from multiprocessing import Process 2 import time 3 import random 4 class Piao(Process): 5 def __init__(self,name): 6 # self.name=name 7 # super().__init__() #Process的__init__方法會執行self.name=Piao-1, 8 # #因此加到這裏,會覆蓋咱們的self.name=name 9 10 #爲咱們開啓的進程設置名字的作法 11 super().__init__() 12 self.name=name 13 14 def run(self): 15 print('%s is piaoing' %self.name) 16 time.sleep(random.randrange(1,3)) 17 print('%s is piao end' %self.name) 18 19 p=Piao('egon') 20 p.start() 21 print('開始') 22 print(p.pid) #查看pid 23 24 name與pid
5、守護進程
主進程建立守護進程
主進程建立守護進程
其一:守護進程會在主進程代碼執行後結束就終止
其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError:daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止。
1 from multiprocessing import Process 2 import time 3 import random 4 5 class Piao(Process): 6 def __init__(self,name): 7 self.name=name 8 super().__init__() 9 def run(self): 10 print('%s is piaoing' %self.name) 11 time.sleep(random.randrange(1,3)) 12 print('%s is piao end' %self.name) 13 14 15 p=Piao('egon') 16 p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行 17 p.start() 18 print('主')
1 #主進程代碼運行完畢,守護進程就會結束 2 from multiprocessing import Process 3 from threading import Thread 4 import time 5 def foo(): 6 print(123) 7 time.sleep(1) 8 print("end123") 9 10 def bar(): 11 print(456) 12 time.sleep(3) 13 print("end456") 14 15 16 p1=Process(target=foo) 17 p2=Process(target=bar) 18 19 p1.daemon=True 20 p1.start() 21 p2.start() 22 print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止 23 24 迷惑人的例子
6、進程同步(鎖)
進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
part1:多個進程共享同一打印終端
1 #併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 2 from multiprocessing import Process 3 import os,time 4 def work(): 5 print('%s is running' %os.getpid()) 6 time.sleep(2) 7 print('%s is done' %os.getpid()) 8 9 if __name__ == '__main__': 10 for i in range(3): 11 p=Process(target=work) 12 p.start() 13 14 併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
1 #由併發變成了串行,犧牲了運行效率,但避免了競爭 2 from multiprocessing import Process,Lock 3 import os,time 4 def work(lock): 5 lock.acquire() 6 print('%s is running' %os.getpid()) 7 time.sleep(2) 8 print('%s is done' %os.getpid()) 9 lock.release() 10 if __name__ == '__main__': 11 lock=Lock() 12 for i in range(3): 13 p=Process(target=work,args=(lock,)) 14 p.start() 15 16 加鎖:由併發變成了串行,犧牲了運行效率,但避免了競爭
part2:多個進程共享同一文件
文件當數據庫,模擬搶票
1 #文件db的內容爲:{"count":1} 2 #注意必定要用雙引號,否則json沒法識別 3 from multiprocessing import Process,Lock 4 import time,json,random 5 def search(): 6 dic=json.load(open('db.txt')) 7 print('\033[43m剩餘票數%s\033[0m' %dic['count']) 8 9 def get(): 10 dic=json.load(open('db.txt')) 11 time.sleep(0.1) #模擬讀數據的網絡延遲 12 if dic['count'] >0: 13 dic['count']-=1 14 time.sleep(0.2) #模擬寫數據的網絡延遲 15 json.dump(dic,open('db.txt','w')) 16 print('\033[43m購票成功\033[0m') 17 18 def task(lock): 19 search() 20 get() 21 if __name__ == '__main__': 22 lock=Lock() 23 for i in range(100): #模擬併發100個客戶端搶票 24 p=Process(target=task,args=(lock,)) 25 p.start() 26 27 併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
1 #文件db的內容爲:{"count":1} 2 #注意必定要用雙引號,否則json沒法識別 3 from multiprocessing import Process,Lock 4 import time,json,random 5 def search(): 6 dic=json.load(open('db.txt')) 7 print('\033[43m剩餘票數%s\033[0m' %dic['count']) 8 9 def get(): 10 dic=json.load(open('db.txt')) 11 time.sleep(0.1) #模擬讀數據的網絡延遲 12 if dic['count'] >0: 13 dic['count']-=1 14 time.sleep(0.2) #模擬寫數據的網絡延遲 15 json.dump(dic,open('db.txt','w')) 16 print('\033[43m購票成功\033[0m') 17 18 def task(lock): 19 search() 20 lock.acquire() 21 get() 22 lock.release() 23 if __name__ == '__main__': 24 lock=Lock() 25 for i in range(100): #模擬併發100個客戶端搶票 26 p=Process(target=task,args=(lock,)) 27 p.start() 28 29 加鎖:購票行爲由併發變成了串行,犧牲了運行效率,但保證了數據安全
總結;
加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
一、效率低
二、須要本身加鎖處理
爲此multiprocessing模塊爲咱們提供了基於消息的IPC通訊機制:隊列和管道。
一、隊列和管道都是將數據存放與內存中。
二、隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來。
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
7、隊列(推薦使用)
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
建立隊列的類(底層就是以管道和鎖定的方式實現):
1 Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
參數介紹:
maxsize是隊列中容許最大項數,省略則無大小限制。
方法介紹:
主要方法:
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。 2 q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常. 3 4 q.get_nowait():同q.get(False) 5 q.put_nowait():同q.put(False) 6 7 q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。 8 q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。 9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
其餘方法:
1 q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞 2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。 3 q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲
應用:
1 ''' 2 multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 3 都是基於消息傳遞實現的,可是隊列接口 4 ''' 5 6 from multiprocessing import Process,Queue 7 import time 8 q=Queue(3) 9 10 11 #put ,get ,put_nowait,get_nowait,full,empty 12 q.put(3) 13 q.put(3) 14 q.put(3) 15 print(q.full()) #滿了 16 17 print(q.get()) 18 print(q.get()) 19 print(q.get()) 20 print(q.empty()) #空了
8、生產者消費者模型
生產者消費者模型
在併發編程中使用生產者和消費者模式可以解決大多數併發問題。該模式經過生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據,一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者,爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型
1 from multiprocessing import Process,Queue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 time.sleep(random.randint(1,3)) 7 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 8 9 def producer(q): 10 for i in range(10): 11 time.sleep(random.randint(1,3)) 12 res='包子%s' %i 13 q.put(res) 14 print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 15 16 if __name__ == '__main__': 17 q=Queue() 18 #生產者們:即廚師們 19 p1=Process(target=producer,args=(q,)) 20 21 #消費者們:即吃貨們 22 c1=Process(target=consumer,args=(q,)) 23 24 #開始 25 p1.start() 26 c1.start() 27 print('主')
此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q後,則一直處於死循環中且卡在q.get()這一步,
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號就能夠break出死循環。
1 from multiprocessing import Process,Queue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 if res is None:break #收到結束信號則結束 7 time.sleep(random.randint(1,3)) 8 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 9 10 def producer(q): 11 for i in range(10): 12 time.sleep(random.randint(1,3)) 13 res='包子%s' %i 14 q.put(res) 15 print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 16 q.put(None) #發送結束信號 17 if __name__ == '__main__': 18 q=Queue() 19 #生產者們:即廚師們 20 p1=Process(target=producer,args=(q,)) 21 22 #消費者們:即吃貨們 23 c1=Process(target=consumer,args=(q,)) 24 25 #開始 26 p1.start() 27 c1.start() 28 print('主') 29 30 生產者在生產完畢後發送結束信號None
注意:結束信號None,不必定要由生產者發,主進程一樣能夠發,但主進程須要等生產者結束後才應該發送該信號。
1 from multiprocessing import Process,Queue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 if res is None:break #收到結束信號則結束 7 time.sleep(random.randint(1,3)) 8 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 9 10 def producer(q): 11 for i in range(2): 12 time.sleep(random.randint(1,3)) 13 res='包子%s' %i 14 q.put(res) 15 print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 16 17 if __name__ == '__main__': 18 q=Queue() 19 #生產者們:即廚師們 20 p1=Process(target=producer,args=(q,)) 21 22 #消費者們:即吃貨們 23 c1=Process(target=consumer,args=(q,)) 24 25 #開始 26 p1.start() 27 c1.start() 28 29 p1.join() 30 q.put(None) #發送結束信號 31 print('主') 32 33 主進程在生產者生產完畢後發送結束信號None
但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決。
1 from multiprocessing import Process,Queue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 if res is None:break #收到結束信號則結束 7 time.sleep(random.randint(1,3)) 8 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 9 10 def producer(name,q): 11 for i in range(2): 12 time.sleep(random.randint(1,3)) 13 res='%s%s' %(name,i) 14 q.put(res) 15 print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 16 17 18 19 if __name__ == '__main__': 20 q=Queue() 21 #生產者們:即廚師們 22 p1=Process(target=producer,args=('包子',q)) 23 p2=Process(target=producer,args=('骨頭',q)) 24 p3=Process(target=producer,args=('泔水',q)) 25 26 #消費者們:即吃貨們 27 c1=Process(target=consumer,args=(q,)) 28 c2=Process(target=consumer,args=(q,)) 29 30 #開始 31 p1.start() 32 p2.start() 33 p3.start() 34 c1.start() 35 36 p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號 37 p2.join() 38 p3.join() 39 q.put(None) #有幾個生產者就應該發送幾回結束信號None 40 q.put(None) #發送結束信號 41 q.put(None) #發送結束信號 42 print('主') 43 44 有幾個生產者就須要發送幾回結束信號:至關low
其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制
1 #JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 2 3 #參數介紹: 4 maxsize是隊列中容許最大項數,省略則無大小限制。 5 #方法介紹: 6 JoinableQueue的實例p除了與Queue對象相同的方法以外還具備: 7 q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常 8 q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
1 from multiprocessing import Process,JoinableQueue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 time.sleep(random.randint(1,3)) 7 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 8 9 q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了 10 11 def producer(name,q): 12 for i in range(10): 13 time.sleep(random.randint(1,3)) 14 res='%s%s' %(name,i) 15 q.put(res) 16 print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) 17 q.join() 18 19 20 if __name__ == '__main__': 21 q=JoinableQueue() 22 #生產者們:即廚師們 23 p1=Process(target=producer,args=('包子',q)) 24 p2=Process(target=producer,args=('骨頭',q)) 25 p3=Process(target=producer,args=('泔水',q)) 26 27 #消費者們:即吃貨們 28 c1=Process(target=consumer,args=(q,)) 29 c2=Process(target=consumer,args=(q,)) 30 c1.daemon=True 31 c2.daemon=True 32 33 #開始 34 p_l=[p1,p2,p3,c1,c2] 35 for p in p_l: 36 p.start() 37 38 p1.join() 39 p2.join() 40 p3.join() 41 print('主') 42 43 #主進程等--->p1,p2,p3等---->c1,c2 44 #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據 45 #於是c1,c2也沒有存在的價值了,應該隨着主進程的結束而結束,因此設置成守護進程
9、管道
進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可)
1 #建立管道的類: 2 Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道 3 #參數介紹: 4 dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。 5 #主要方法: 6 conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。 7 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象 8 #其餘方法: 9 conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法 10 conn1.fileno():返回鏈接使用的整數文件描述符 11 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 12 13 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 14 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 15 16 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。 17 18 介紹
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def consumer(p,name): 5 left,right=p 6 left.close() 7 while True: 8 try: 9 baozi=right.recv() 10 print('%s 收到包子:%s' %(name,baozi)) 11 except EOFError: 12 right.close() 13 break 14 def producer(seq,p): 15 left,right=p 16 right.close() 17 for i in seq: 18 left.send(i) 19 # time.sleep(1) 20 else: 21 left.close() 22 if __name__ == '__main__': 23 left,right=Pipe() 24 25 c1=Process(target=consumer,args=((left,right),'c1')) 26 c1.start() 27 28 29 seq=(i for i in range(10)) 30 producer(seq,(left,right)) 31 32 right.close() 33 left.close() 34 35 c1.join() 36 print('主進程') 37 38 基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)
注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端,若是忘記執行這些步驟,程序可能再消費者中的recv()操做上掛起,管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生產EOFError異常,所以在生產者中管道不會有任何效果,付費消費者中也關閉了相同的管道端點。
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def adder(p,name): 5 server,client=p 6 client.close() 7 while True: 8 try: 9 x,y=server.recv() 10 except EOFError: 11 server.close() 12 break 13 res=x+y 14 server.send(res) 15 print('server done') 16 if __name__ == '__main__': 17 server,client=Pipe() 18 19 c1=Process(target=adder,args=((server,client),'c1')) 20 c1.start() 21 22 server.close() 23 24 client.send((10,20)) 25 print(client.recv()) 26 client.close() 27 28 c1.join() 29 print('主進程') 30 #注意:send()和recv()方法使用pickle模塊對對象進行序列化。 31 32 管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序
10、共享數據
展望將來,基於消息傳遞的併發編程是大勢所趨
即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合
經過消息隊列交換數據,這樣極大地減小使用鎖定好其餘同步手段的需求。
還能夠擴展到分佈式系統中
進程間通訊應該儘可能避免使用共享數據的方式
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
1 from multiprocessing import Manager,Process,Lock 2 import os 3 def work(d,lock): 4 # with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂 5 d['count']-=1 6 7 if __name__ == '__main__': 8 lock=Lock() 9 with Manager() as m: 10 dic=m.dict({'count':100}) 11 p_l=[] 12 for i in range(100): 13 p=Process(target=work,args=(dic,lock)) 14 p_l.append(p) 15 p.start() 16 for p in p_l: 17 p.join() 18 print(dic) 19 #{'count': 94} 20 21 進程之間操做共享的數據
11、進程池
在利用python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間,多進程是實現併發的手段之一,須要注意的問題是:
一、很明顯須要併發執行的任務一般要遠大於核數
二、一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程
三、進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數數目的進程也沒法作到並行)
例如當被操做對象數目不大時,能夠直接利用multiprocessing中process動態生成多個進程,司機十幾個還好,但若是是上百個,上千個......手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。
咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數。
ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這個三個進程去執行全部任務,不會開啓其餘進程。
Pool([numprocess [,initializer [, initargs]]]):建立進程池
參數介紹:
1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值 2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None 3 initargs:是要傳給initializer的參數組
方法介紹:
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
其餘方法:
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法 2 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。 3 obj.ready():若是調用完成,返回True 4 obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常 5 obj.wait([timeout]):等待結果變爲可用。 6 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
應用:
1 from multiprocessing import Pool 2 import os,time 3 def work(n): 4 print('%s run' %os.getpid()) 5 time.sleep(3) 6 return n**2 7 8 if __name__ == '__main__': 9 p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 10 res_l=[] 11 for i in range(10): 12 res=p.apply(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res 13 res_l.append(res) 14 print(res_l) 15 16 apply同步執行:阻塞式
1 from multiprocessing import Pool 2 import os,time 3 def work(n): 4 print('%s run' %os.getpid()) 5 time.sleep(3) 6 return n**2 7 8 if __name__ == '__main__': 9 p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 10 res_l=[] 11 for i in range(10): 12 res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res 13 res_l.append(res) 14 15 #異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果,不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了 16 p.close() 17 p.join() 18 for res in res_l: 19 print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get 20 21 apply_async異步執行:非阻塞
1 #一:使用進程池(非阻塞,apply_async) 2 #coding: utf-8 3 from multiprocessing import Process,Pool 4 import time 5 6 def func(msg): 7 print( "msg:", msg) 8 time.sleep(1) 9 return msg 10 11 if __name__ == "__main__": 12 pool = Pool(processes = 3) 13 res_l=[] 14 for i in range(10): 15 msg = "hello %d" %(i) 16 res=pool.apply_async(func, (msg, )) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 17 res_l.append(res) 18 print("==============================>") #沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了 19 20 pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 21 pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 22 23 print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果 24 for i in res_l: 25 print(i.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get 26 27 #二:使用進程池(阻塞,apply) 28 #coding: utf-8 29 from multiprocessing import Process,Pool 30 import time 31 32 def func(msg): 33 print( "msg:", msg) 34 time.sleep(0.1) 35 return msg 36 37 if __name__ == "__main__": 38 pool = Pool(processes = 3) 39 res_l=[] 40 for i in range(10): 41 msg = "hello %d" %(i) 42 res=pool.apply(func, (msg, )) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 43 res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個 44 print("==============================>") 45 pool.close() 46 pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 47 48 print(res_l) #看到的就是最終的結果組成的列表 49 for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法 50 print(i) 51 52 詳解:apply_async與apply
練習2:使用進程池維護固定數目的進程
1 #Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count()) 2 #開啓6個客戶端,會發現2個客戶端處於等待狀態 3 #在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程 4 from socket import * 5 from multiprocessing import Pool 6 import os 7 8 server=socket(AF_INET,SOCK_STREAM) 9 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 10 server.bind(('127.0.0.1',8080)) 11 server.listen(5) 12 13 def talk(conn,client_addr): 14 print('進程pid: %s' %os.getpid()) 15 while True: 16 try: 17 msg=conn.recv(1024) 18 if not msg:break 19 conn.send(msg.upper()) 20 except Exception: 21 break 22 23 if __name__ == '__main__': 24 p=Pool() 25 while True: 26 conn,client_addr=server.accept() 27 p.apply_async(talk,args=(conn,client_addr)) 28 # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問 29 30 server端
1 from socket import * 2 3 client=socket(AF_INET,SOCK_STREAM) 4 client.connect(('127.0.0.1',8080)) 5 6 7 while True: 8 msg=input('>>: ').strip() 9 if not msg:continue 10 11 client.send(msg.encode('utf-8')) 12 msg=client.recv(1024) 13 print(msg.decode('utf-8')) 14 15 客戶端
發現:併發開啓多個客戶端,服務端同一時間只有3個不一樣的pid,幹掉一個客戶端,另一個客戶端纔會進來,被3個進程之一處理。
12、進程池之回調函數
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了,你能夠處理個人結果了,主進程則調用一個函數去處理該結果,該函數即回調函數。
咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O過程,直接拿到的是任務的結果。
1 from multiprocessing import Pool 2 import requests 3 import json 4 import os 5 6 def get_page(url): 7 print('<進程%s> get %s' %(os.getpid(),url)) 8 respone=requests.get(url) 9 if respone.status_code == 200: 10 return {'url':url,'text':respone.text} 11 12 def pasrse_page(res): 13 print('<進程%s> parse %s' %(os.getpid(),res['url'])) 14 parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) 15 with open('db.txt','a') as f: 16 f.write(parse_res) 17 18 19 if __name__ == '__main__': 20 urls=[ 21 'http://www.baidu.com', 22 'http://www.python.org', 23 'http://www.openstack.org', 24 'http://help.github.com/', 25 'http://www.sina.com.cn/' 26 ] 27 28 p=Pool(3) 29 res_l=[] 30 for url in urls: 31 res=p.apply_async(get_page,args=(url,),callback=pasrse_page) 32 res_l.append(res) 33 34 p.close() 35 p.join() 36 print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了 37 38 ''' 39 打印結果: 40 <進程3388> get https://www.baidu.com 41 <進程3389> get https://www.python.org 42 <進程3390> get https://www.openstack.org 43 <進程3388> get https://help.github.com/ 44 <進程3387> parse https://www.baidu.com 45 <進程3389> get http://www.sina.com.cn/ 46 <進程3387> parse https://www.python.org 47 <進程3387> parse https://help.github.com/ 48 <進程3387> parse http://www.sina.com.cn/ 49 <進程3387> parse https://www.openstack.org 50 [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] 51 '''
1 from multiprocessing import Pool 2 import time,random 3 import requests 4 import re 5 6 def get_page(url,pattern): 7 response=requests.get(url) 8 if response.status_code == 200: 9 return (response.text,pattern) 10 11 def parse_page(info): 12 page_content,pattern=info 13 res=re.findall(pattern,page_content) 14 for item in res: 15 dic={ 16 'index':item[0], 17 'title':item[1], 18 'actor':item[2].strip()[3:], 19 'time':item[3][5:], 20 'score':item[4]+item[5] 21 22 } 23 print(dic) 24 if __name__ == '__main__': 25 pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) 26 27 url_dic={ 28 'http://maoyan.com/board/7':pattern1, 29 } 30 31 p=Pool() 32 res_l=[] 33 for url,pattern in url_dic.items(): 34 res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) 35 res_l.append(res) 36 37 for i in res_l: 38 i.get() 39 40 # res=requests.get('http://maoyan.com/board/7') 41 # print(re.findall(pattern,res.text)) 42 43 爬蟲案例
若是在主進程中等待進程池中全部都執行完畢後,再統一處理結果,則無需回調函數
1 from multiprocessing import Pool 2 import time,random,os 3 4 def work(n): 5 time.sleep(1) 6 return n**2 7 if __name__ == '__main__': 8 p=Pool() 9 10 res_l=[] 11 for i in range(10): 12 res=p.apply_async(work,args=(i,)) 13 res_l.append(res) 14 15 p.close() 16 p.join() #等待進程池中全部進程執行完畢 17 18 nums=[] 19 for res in res_l: 20 nums.append(res.get()) #拿到全部結果 21 print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理
十3、paramiko模塊
一、介紹:
paramiko是一個用於作遠程控制的模塊,使用該模塊能夠對遠程服務器進行命令或文件操做,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實。
二、下載安裝
pip3 install paramiko #在python3中
1 pycrypto,因爲 paramiko 模塊內部依賴pycrypto,因此先下載安裝pycrypto #在python2中 2 pip3 install pycrypto 3 pip3 install paramiko 4 注:若是在安裝pycrypto2.0.1時發生以下錯誤 5 command 'gcc' failed with exit status 1... 6 多是缺乏python-dev安裝包致使 7 若是gcc沒有安裝,請事先安裝gcc 8 9 在python2中
三、使用
sshclient
用於鏈接遠程服務器並執行基本命令
基於用戶名密碼鏈接:
1 import paramiko 2 3 # 建立SSH對象 4 ssh = paramiko.SSHClient() 5 # 容許鏈接不在know_hosts文件中的主機 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 鏈接服務器 8 ssh.connect(hostname='120.92.84.249', port=22, username='root', password='xxx') 9 10 # 執行命令 11 stdin, stdout, stderr = ssh.exec_command('df') 12 # 獲取命令結果 13 result = stdout.read() 14 print(result.decode('utf-8')) 15 # 關閉鏈接 16 ssh.close()
1 import paramiko 2 3 transport = paramiko.Transport(('120.92.84.249', 22)) 4 transport.connect(username='root', password='xxx') 5 6 ssh = paramiko.SSHClient() 7 ssh._transport = transport 8 9 stdin, stdout, stderr = ssh.exec_command('df') 10 res=stdout.read() 11 print(res.decode('utf-8')) 12 13 transport.close() 14 15 SSHClient 封裝 Transport
基於公鑰密鑰鏈接:
客戶端文件名:id_rsa
服務端必須有文件名:authorized_keys(在用ssh-keygen時,必須製做一個zuthorized_keys,能夠用ssh-copy-id來製做)
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa') 4 5 # 建立SSH對象 6 ssh = paramiko.SSHClient() 7 # 容許鏈接不在know_hosts文件中的主機 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 9 # 鏈接服務器 10 ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key) 11 12 # 執行命令 13 stdin, stdout, stderr = ssh.exec_command('df') 14 # 獲取命令結果 15 result = stdout.read() 16 print(result.decode('utf-8')) 17 # 關閉鏈接 18 ssh.close()
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa') 4 5 transport = paramiko.Transport(('120.92.84.249', 22)) 6 transport.connect(username='root', pkey=private_key) 7 8 ssh = paramiko.SSHClient() 9 ssh._transport = transport 10 11 stdin, stdout, stderr = ssh.exec_command('df') 12 result=stdout.read() 13 print(result.decode('utf-8')) 14 15 transport.close() 16 17 SSHClient 封裝 Transport
1 import paramiko 2 from io import StringIO 3 4 key_str = """-----BEGIN RSA PRIVATE KEY----- 5 MIIEoQIBAAKCAQEAsJmFLrSeCumJvga0Gl5O5wVOVwMIy2MpqIyQPi5J87dg89a4 6 Da9fczJog7qoSbRwHFOQoCHNphSlp5KPhGsF6RJewkIw9H1UKV4dCOyl/4HOAkAD 7 rKrsEDmrJ9JlzF2GTTZSnTgVQWcvBS2RKB4eM2R9aJ11xV6X2Hk4YDLTExIWeabb 8 h2TUKw0iyjI8pRuYLKkF2X16u9TBwfOTroGYgiNFHQvhsQppbEbI49NF2XkCkFMi 9 8/7tLjf95InE/VUUq56JqfzyHwdpHou+waXbwtvGgXN3sz+KkuEv6R2qDz06upZV 10 FCZRRpDhzoR8Uh/UEzTGZb8z7FB6EJXUiXJikQIBIwKCAQBBmBuGYFf1bK+BGG7H 11 9ySe81ecqVsJtx4aCFLVRGScWg4RbQKIvXs5an6XU/VdNGQnx0RYvBkvDvuzRRC8 12 J8Bd4kB0CfTtGJuaVigKoQp02HEWx1HSa17+tlWD0c4KFBvwywi+DYQ83S64x8gz 13 eOalX9bPFenqORPUD8R7gJeKvPVc6ZTPeorpuH7u9xayP0Eop8qKxZza9Xh3foVj 14 Qo4IxoYnDN57CIRX5PFSlDDggpmr8FtRF4nAxmFq8LhSp05ivzX/Ku1SNHdaMWZO 15 7va8tISXdLI5m0EGzoVoBvohIbwlxI6kfmamrh6Eas2Jnsc4CLzMsR4jBWt0LHLv 16 /SLnAoGBANaEUf/Jptab9G/xD9W2tw/636i3gLpTPY9KPtCcAxqStNeT6RAWZ5HF 17 lKJg+NKpu3pI45ldAwvts0i+aCZk2xakEWIZWqCmXm31JSPDQTaMGe7H0vOmUaxx 18 ncdpBVdvhMbfFUgei15iKfuafgrKaS9oIkntXEgrC+3wBOI0Gbx3AoGBANLAGxAF 19 TK7ydr+Q1+6/ujs6e8WsXt8HZMa/1khCVSbrf1MgACvZPSSSrDpVwaDTSjlRI4AL 20 bb0l0RFU+/0caMiHilscuJdz9Fdd9Ux4pjROZa3TF5CFhvP7PsZAoxOo+yqJg4zr 21 996GG/aAv4M8lQJ2rDFk/Dgn5y/AaAun1oM3AoGAGIQmoOPYjY4qkHNSRE9lYOl4 22 pZFQilKn8x5tlC8WTC4GCgJGhX7nQ9wQ/J1eQ/YkDfmznH+ok6YjHkGlgLsRuXHW 23 GdcDCwuzBUCWh76LHC1EytUCKnloa3qy8jfjWnMlHgrd3FtDILrC+C7p1Vj2FAvm 24 qVz0moiTpioPL8twp9MCgYEAin49q3EyZFYwxwdpU7/SJuvq750oZq0WVriUINsi 25 A6IR14oOvbqkhb94fhsY12ZGt/N9uosq22H+anms6CicoQicv4fnBHDFI3hCHE9I 26 pgeh50GTJHUA6Xk34V2s/kp5KpThazv6qCw+QubkQExh660SEdSlvoCfPKMCi1EJ 27 TukCgYAZKY1NZ2bjJyyO/dfNvMQ+etUL/9esi+40GUGyJ7SZcazrN9z+DO0yL39g 28 7FT9NMIc2dsmNJQMaGBCDl0AjO1O3b/wqlrNvNBGkanxn2Htn5ajfo+LBU7yHAcV 29 7w4X5HLarXiE1mj0LXFKJhdvFqU53KUQJXBqR6lsMqzsdPwLMJg== 30 -----END RSA PRIVATE KEY-----""" 31 32 private_key = paramiko.RSAKey(file_obj=StringIO(key_str)) 33 transport = paramiko.Transport(('120.92.84.249', 22)) 34 transport.connect(username='root', pkey=private_key) 35 36 ssh = paramiko.SSHClient() 37 ssh._transport = transport 38 39 stdin, stdout, stderr = ssh.exec_command('df') 40 result = stdout.read() 41 print(result.decode('utf-8')) 42 transport.close() 43 44 print(result) 45 46 基於私鑰字符串進行鏈接
sftpclient
用於鏈接遠程服務器並執行上傳下載
基於用戶名密碼上傳下載
1 import paramiko 2 3 transport = paramiko.Transport(('120.92.84.249',22)) 4 transport.connect(username='root',password='xxx') 5 6 sftp = paramiko.SFTPClient.from_transport(transport) 7 # 將location.py 上傳至服務器 /tmp/test.py 8 sftp.put('/tmp/id_rsa', '/etc/test.rsa') 9 # 將remove_path 下載到本地 local_path 10 sftp.get('remove_path', 'local_path') 11 12 transport.close()
基於公鑰密鑰上傳下載
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa') 4 5 transport = paramiko.Transport(('120.92.84.249', 22)) 6 transport.connect(username='root', pkey=private_key ) 7 8 sftp = paramiko.SFTPClient.from_transport(transport) 9 # 將location.py 上傳至服務器 /tmp/test.py 10 sftp.put('/tmp/id_rsa', '/tmp/a.txt') 11 # 將remove_path 下載到本地 local_path 12 sftp.get('remove_path', 'local_path') 13 14 transport.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import paramiko 4 import uuid 5 6 class Haproxy(object): 7 8 def __init__(self): 9 self.host = '172.16.103.191' 10 self.port = 22 11 self.username = 'root' 12 self.pwd = '123' 13 self.__k = None 14 15 def create_file(self): 16 file_name = str(uuid.uuid4()) 17 with open(file_name,'w') as f: 18 f.write('sb') 19 return file_name 20 21 def run(self): 22 self.connect() 23 self.upload() 24 self.rename() 25 self.close() 26 27 def connect(self): 28 transport = paramiko.Transport((self.host,self.port)) 29 transport.connect(username=self.username,password=self.pwd) 30 self.__transport = transport 31 32 def close(self): 33 34 self.__transport.close() 35 36 def upload(self): 37 # 鏈接,上傳 38 file_name = self.create_file() 39 40 sftp = paramiko.SFTPClient.from_transport(self.__transport) 41 # 將location.py 上傳至服務器 /tmp/test.py 42 sftp.put(file_name, '/home/root/tttttttttttt.py') 43 44 def rename(self): 45 46 ssh = paramiko.SSHClient() 47 ssh._transport = self.__transport 48 # 執行命令 49 stdin, stdout, stderr = ssh.exec_command('mv /home/root/tttttttttttt.py /home/root/ooooooooo.py') 50 # 獲取命令結果 51 result = stdout.read() 52 53 54 ha = Haproxy() 55 ha.run() 56 57 Demo