一、程序中不免出現錯誤,而錯誤分紅兩種:python
a.語法錯誤:linux
b.邏輯錯誤(邏輯錯誤)nginx
二、異常定義:異常就是程序運行時發生錯誤的信號。程序員
在python中,錯誤觸發的異常,是以異常追蹤信息、 異常類型、異常值三部分組成,以下:web
執行的結果爲:shell
#異常追蹤信息編程
Traceback:
Traceback (most recent call last):
File "D:/python/day29/aa.py", line 5, in <module>
nuligejson
# 異常類型:異常值
NameError: name 'nulige' is not definedwindows
三、異常的種類:數組
a.經常使用分類
b.更多分類
四、異常處理定義:
python解釋器檢測到錯誤,觸發異常(也容許程序員本身觸發異常)
程序員編寫特定的代碼,專門用來捕捉這個異常(這段代碼與程序邏輯無關,與異常處理有關)
若是捕捉成功則進入另一個處理分支,執行你爲其定製的邏輯,使程序不會崩潰,這就是異常處理
五、異常處理的應用場景:
python解析器去執行程序,檢測到了一個錯誤時,觸發異常,異常觸發後且沒被處理的狀況下,程序就在當前異常處終止,後面的代碼不會運行,誰會去用一個運行着忽然就崩潰的軟件。因此你必須提供一種異常處理機制來加強你程序的健壯性與容錯性 。
六、異常處理的使用
首先須知,異常是由程序的錯誤引發的,語法上的錯誤跟異常處理無關,必須在程序運行前就修正
a.使用if判斷方式:
if判斷式的異常處理只能針對某一段代碼,對於不一樣的代碼段的相同類型的錯誤你須要寫重複的if來進行處理。
在你的程序中頻繁的寫與程序自己無關,與異常處理有關的if,可讀性極其的差
這是能夠解決異常的,只是存在上述兩個的問題,因此,千萬不要妄下定論if不能用來異常處理。
b.python爲每一種異常定製了一個類型,而後提供了一種特定的語法結構用來進行異常處理
1)基本語法:
try: #異常捕捉
pass # 被檢測的代碼塊
except <異常類型> as <e>:
pass # 一旦try檢測到異常,就執行這個位置的邏輯
<except ...: > # 異常處理的多分枝
<else:>
pass # 報錯執行邏輯
finally:
pass # 最後無論報不報錯都執行的邏輯(爲狀態邏輯)
2)異常類只能用來處理指定的異常狀況,若是非指定異常則沒法處理。
3)多分支
4)萬能異常:python萬能異常,Exception,他能夠捕獲任意異常
5)主動觸發異常
6)自定義異常
7)斷言
七、try...except的方式與if的比較:
try..except這種異常處理機制就是取代if那種方式,讓你的程序在不犧牲可讀性的前提下加強健壯性和容錯性。異常處理中爲每個異常定製了異常類型(python中統一了類與類型,類型即類),對於同一種異常,一個except就能夠捕捉到,能夠同時處理多段代碼的異常(無需‘寫多個if判斷式’)減小了代碼,加強了可讀性。
使用try..except的方式
a.把錯誤處理和真正的工做分開來
b.代碼更易組織,更清晰,複雜的工做任務更容易實現;
c.毫無疑問,更安全了,不至於因爲一些小的疏忽而使程序意外崩潰了;
八、使用異常處理注意事項。
try...except應該儘可能少用,由於它自己就是你附加給你的程序的一種異常處理的邏輯,與你的主要的工做是沒有關係的這種東西加的多了,會致使你的代碼可讀性變差。 異常處理只有在有些異常沒法預知的狀況下,才應該加上try...except,其餘的邏輯錯誤應該儘可能修正。
九、總結:
1)try..except的方式,只是python提供給你一種特定的語法結構去作這件事,對於不一樣代碼的同一種異常,python爲你定製了一中類型,一個expect就能夠捕捉到。
2)try...except應該儘可能少用,由於它自己就是你附加給你的程序的一種異常處理的邏輯,與你的主要的工做是沒有關係的這種東西加的多了,會致使你的代碼可讀性變差。異常處理只有在有些異常沒法預知的狀況下,才應該加上try...except,其餘的邏輯錯誤應該儘可能修正。
一、操做系統哦你的概念:
操做系統位於底層硬件與應用軟件之間的一層。工做方式:向下管理硬件,向上提供接口。
操做系統進行進程切換:1.出現IO操做;2.固定時間。
固定時間很短,人感覺不到。每個應用層運行起來的程序都是進程。
二、進程的概念:
程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。須要強調的是:同一個程序執行兩次,那也是兩個進程。
進程:資源管理單位(容器)。
線程:最小執行單位,管理線程的是進程。
進程定義:
進程就是一個程序在一個數據集上的一次動態執行過程。進程通常由程序、數據集、進程控制塊三部分組成。咱們編寫的程序
用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程當中所須要使用的資源;進程控制塊用來記錄進程的外
部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。
三、進程與線程的關係:
在傳統操做系統中,每一個進程有一個地址空間,並且默認就有一個控制線程。
多線程(即多個控制線程)的概念是,在一個進程中存在多個控制線程,控制該進程的地址空間。
進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位。
進程是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。或者說
進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位。
線程則是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。
四、並行和併發:
不管是並行仍是併發,在用戶看來都是'同時'運行的,而一個cpu同一時刻只能執行一個任務。
並行:同時運行,只有具有多個cpu才能實現並行。
併發:是僞並行,即看起來是同時運行,單個cpu+多道技術。
ps.多道技術:
內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另一個,使每一個進程各自運行幾十或幾百毫秒,
這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻能夠運行多個進程,這就給人產生了並行的錯覺,
即僞併發,以此來區分多處理器操做系統的真正硬件並行(多個cpu共享同一個物理內存)。
五、同步與異步
同步就是指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息,那麼這個進程將會一直等待下去,直到收到返回信息才繼續執行下去;
異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。
舉個例子,打電話時就是同步通訊,發短息時就是異步通訊。
六、進程的建立
但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。
而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程
1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)
2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)
3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)
4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)
不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的:
1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程)
2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。
關於建立的子進程,UNIX和windows
1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間,任何一個進程的在其地址空間中的修改都不會影響到另一個進程。
2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。
七、進程的終止
1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯退出(自願,程序員主動拋出異常,例如raise)
3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等)
4. 被其餘進程殺死(非自願,如kill -9)
八、進程的層次結構
不管UNIX仍是windows,進程只有一個父進程,不一樣的是:
1. 在UNIX中全部的進程,都是以init進程爲根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的全部成員。
2. 在windows中,沒有進程層次的概念,全部的進程都是地位相同的,惟一相似於進程層次的暗示,是在建立進程時,父進程獲得一個特別的令牌(稱爲句柄),該句柄能夠用來控制子進程,可是父進程有權把該句柄傳給其餘子進程,這樣就沒有層次了。
九、進程的狀態
tail -f access.log |grep '404'
執行程序tail,開啓一個子進程,執行程序grep,開啓另一個子進程,兩個進程之間基於管道'|'通信,將tail的結果做爲grep的輸入。
進程grep在等待輸入(即I/O)時的狀態稱爲阻塞,此時grep命令都沒法運行
其實在兩種狀況下會致使一個進程在邏輯上不能運行,
1. 進程掛起是自身緣由,遇到I/O阻塞,便要讓出CPU讓其餘進程去執行,這樣保證CPU一直在工做
2. 與進程無關,是操做系統層面,可能會由於一個進程佔用時間過多,或者優先級等緣由,而調用其餘的進程去使用CPU
於是一個進程由三種狀態
十、進程併發的實現
進程併發的實如今於,硬件中斷一個正在運行的進程,把此時進程運行的全部狀態保存下來,爲此,操做系統維護一張表格,即進程表(process table),每一個進程佔用一個進程表項(這些表項也稱爲進程控制塊)
該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配情況、全部打開文件的狀態、賬號和調度信息,以及其餘在進程由運行態轉爲就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啓動時,就像從未被中斷過同樣
一、multiprocessing模塊介紹:
python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了很是好用的多進程包multiprocessing。
multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。
multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
二、Process類的介紹:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
強調:
須要使用關鍵字的方式來指定參數,args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
1)group參數未使用,值始終爲None
2)target表示調用對象,即子進程要執行的任務
3)args表示調用對象的位置參數元組,args=(1,2,'egon',)
4)kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5)name爲子進程的名稱
方法介紹:
1)p.start():啓動進程,並調用該子進程中的p.run()
2)p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
3)p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
4)p.is_alive():若是p仍然運行,返回True
5)p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
屬性介紹:
1)p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
2)p.name:進程的名稱
3)p.pid:進程的pid
4)p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
5)p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
三、Process類的使用
1.建立並開啓子進程的兩種方式
ps.注意:在windows中Process()必須放到# if __name__ == '__main__':下
因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。
a.方法一:函數方法調用
1 #開進程的方法一: 2 import time 3 import random 4 from multiprocessing import Process 5 def piao(name): 6 print('%s piaoing' %name) 7 time.sleep(random.randrange(1,5)) 8 print('%s piao end' %name) 9 10 11 12 p1=Process(target=piao,args=('egon',)) #必須加,號 13 p2=Process(target=piao,args=('alex',)) 14 p3=Process(target=piao,args=('wupeqi',)) 15 p4=Process(target=piao,args=('yuanhao',)) 16 17 p1.start() 18 p2.start() 19 p3.start() 20 p4.start() 21 print('主線程')
b. 方法二:類定義式調用
1 #開進程的方法二: 2 import time 3 import random 4 from multiprocessing import Process 5 6 7 class Piao(Process): 8 def __init__(self,name): 9 super().__init__() 10 self.name=name 11 def run(self): 12 print('%s piaoing' %self.name) 13 14 time.sleep(random.randrange(1,5)) 15 print('%s piao end' %self.name) 16 17 p1=Piao('p1') 18 p2=Piao('p2') 19 p3=Piao('p3') 20 p4=Piao('p4') 21 22 p1.start() #start會自動調用run 23 p2.start() 24 p3.start() 25 p4.start() 26 print('主線程')
c. socket結合process使用
1 from socket import * 2 from multiprocessing import Process 3 4 server=socket(AF_INET,SOCK_STREAM) 5 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 6 server.bind(('127.0.0.1',8080)) 7 server.listen(5) 8 9 def talk(conn,client_addr): 10 while True: 11 try: 12 msg=conn.recv(1024) 13 if not msg:break 14 conn.send(msg.upper()) 15 except Exception: 16 break 17 18 if __name__ == '__main__': #windows下start進程必定要寫到這下面 19 while True: 20 conn,client_addr=server.accept() 21 p=Process(target=talk,args=(conn,client_addr)) 22 p.start()
1 from socket import * 2 3 client=socket(AF_INET,SOCK_STREAM) 4 client.connect(('127.0.0.1',8080)) 5 6 7 while True: 8 msg=input('>>: ').strip() 9 if not msg:continue 10 11 client.send(msg.encode('utf-8')) 12 msg=client.recv(1024) 13 print(msg.decode('utf-8'))
ps.每來一個客戶端,都在服務端開啓一個進程,若是併發來一個萬個客戶端,要開啓一萬個進程嗎,你本身嘗試着在你本身的機器上開啓一萬個,10萬個進程試一試。
解決方法:進程池
2.Process對象的其餘方法或屬性
a.進程對象的其餘方法一:terminate,is_alive
terminate()# 關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
is_alive() # 檢查子進程是否存活True/False
1 #進程對象的其餘方法一:terminate,is_alive 2 from multiprocessing import Process 3 import time 4 import random 5 6 class Piao(Process): 7 def __init__(self,name): 8 self.name=name 9 super().__init__() 10 11 def run(self): 12 print('%s is piaoing' %self.name) 13 time.sleep(random.randrange(1,5)) 14 print('%s is piao end' %self.name) 15 16 17 p1=Piao('egon1') 18 p1.start() 19 20 p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活 21 print(p1.is_alive()) #結果爲True 22 23 print('開始') 24 print(p1.is_alive()) #結果爲False
b.進程對象的其餘方法二:p.daemon=True,p.join
daemon() #默認爲False,能夠爲True 必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程死,p跟着一塊兒死
join()#等待子進程中止,在執行主進程,等0.0001秒就再也不等了
1 #進程對象的其餘方法二:p.daemon=True,p.join 2 from multiprocessing import Process 3 import time 4 import random 5 6 class Piao(Process): 7 def __init__(self,name): 8 self.name=name 9 super().__init__() 10 def run(self): 11 print('%s is piaoing' %self.name) 12 time.sleep(random.randrange(1,3)) 13 print('%s is piao end' %self.name) 14 15 16 p=Piao('egon') 17 p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程死,p跟着一塊兒死 18 p.start() 19 p.join(0.0001) #等待p中止,等0.0001秒就再也不等了 20 print('開始')
1 from multiprocessing import Process 2 3 import time 4 import random 5 def piao(name): 6 print('%s is piaoing' %name) 7 time.sleep(random.randint(1,3)) 8 print('%s is piao end' %name) 9 10 p1=Process(target=piao,args=('egon',)) 11 p2=Process(target=piao,args=('alex',)) 12 p3=Process(target=piao,args=('yuanhao',)) 13 p4=Process(target=piao,args=('wupeiqi',)) 14 15 p1.start() 16 p2.start() 17 p3.start() 18 p4.start() 19 20 #注意:進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了 21 #而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵 22 #join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等p1.join 23 #結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過 24 # 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間 25 p1.join() 26 p2.join() 27 p3.join() 28 p4.join() 29 30 print('主線程') 31 32 33 #上述啓動進程與join進程能夠簡寫爲 34 # p_l=[p1,p2,p3,p4] 35 # 36 # for p in p_l: 37 # p.start() 38 # 39 # for p in p_l: 40 # p.join()
c.進程對象的其餘屬性:name,pid
1 #進程對象的其餘屬性:name,pid 2 from multiprocessing import Process 3 import time 4 import random 5 class Piao(Process): 6 def __init__(self,name): 7 # self.name=name 8 # super().__init__() #Process的__init__方法會執行self.name=Piao-1, 9 # #因此加到這裏,會覆蓋咱們的self.name=name 10 11 #爲咱們開啓的進程設置名字的作法 12 super().__init__() 13 self.name=name 14 15 def run(self): 16 print('%s is piaoing' %self.name) 17 time.sleep(random.randrange(1,3)) 18 print('%s is piao end' %self.name) 19 20 p=Piao('p1') 21 p.start() 22 print('開始') 23 print(p.pid) #查看pid
d.進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的。
1 #多進程共享一個打印終端(用python2測試看兩個進程同時往一個終端打印,出現打印到一行的錯誤) 2 from multiprocessing import Process 3 import time 4 class Logger(Process): 5 def __init__(self): 6 super(Logger,self).__init__() 7 def run(self): 8 print(self.name) 9 10 11 for i in range(1000000): 12 l=Logger() 13 l.start()
1 #多進程共享一套文件系統 2 from multiprocessing import Process 3 import time,random 4 5 def work(f,msg): 6 f.write(msg) 7 f.flush() 8 9 10 f=open('a.txt','w') #在windows上沒法把f當作參數傳入,能夠傳入一個文件名,而後在work內用a+的方式打開文件,進行寫入測試 11 for i in range(5): 12 p=Process(target=work,args=(f,str(i))) 13 p.start()
四、隊列:進程間通訊(IPC)之一
進程彼此之間互相隔離,要實現進程間通訊,即IPC,multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
1.方法條用隊列使用
調用方法:
Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
參數說明:
maxsize是隊列中容許最大項數,省略則無大小限制。
方法介紹:
a.主要方法
q.put 方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
q.get 方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
q.get_nowait(): 同q.get(False)
q.put_nowait(): 同q.put(False)
q.empty(): 調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
q.full(): 調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
q.qsize(): 返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
b.其餘方法
q.cancel_join_thread(): 不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
q.close(): 關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.join_thread(): 鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲
1)使用示例:
1 ''' 2 multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 3 都是基於消息傳遞實現的,可是隊列接口 4 ''' 5 6 from multiprocessing import Process,Queue 7 import time 8 q=Queue(3) 9 10 11 #put ,get ,put_nowait,get_nowait,full,empty 12 q.put(3) 13 q.put(3) 14 q.put(3) 15 print(q.full()) #滿了 16 17 print(q.get()) 18 print(q.get()) 19 print(q.get()) 20 print(q.empty()) #空了
2)生產者消費者模型
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型
1 from multiprocessing import Process,Queue 2 import time,random,os 3 4 def consumer(q): 5 while True: 6 time.sleep(random.randint(1,3)) 7 res=q.get() 8 print('\033[45m消費者拿到了:%s\033[0m' %res) 9 10 def producer(seq,q): 11 for item in seq: 12 time.sleep(random.randint(1,3)) 13 print('\033[46m生產者生產了:%s\033[0m' %item) 14 15 q.put(item) 16 17 if __name__ == '__main__': 18 q=Queue() 19 20 seq=('包子%s' %i for i in range(10)) 21 c=Process(target=consumer,args=(q,)) 22 c.start() 23 producer(seq,q) 24 25 print('主線程')
1 from multiprocessing import Process,Queue 2 import time,random,os 3 4 5 def consumer(q): 6 while True: 7 time.sleep(random.randint(1,3)) 8 res=q.get() 9 if res is None:break 10 print('\033[45m消費者拿到了:%s\033[0m' %res) 11 12 def producer(seq,q): 13 for item in seq: 14 time.sleep(random.randint(1,3)) 15 print('\033[46m生產者生產了:%s\033[0m' %item) 16 17 q.put(item) 18 q.put(None) 19 20 if __name__ == '__main__': 21 q=Queue() 22 23 c=Process(target=consumer,args=(q,)) 24 c.start() 25 26 producer(('包子%s' %i for i in range(10)),q) 27 28 c.join() #主進程等待p結束,p等待c把數據都取完,c一旦取完數據,p.join就是再也不阻塞,進 29 # 而主進程結束,主進程結束會回收守護進程c,並且c此時也沒有存在的必要了 30 print('主線程')
2.類條用的隊列使用方法
調用方法:
JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
參數說明:
maxsize是隊列中容許最大項數,省略則無大小限制。
方法介紹:
JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
1 from multiprocessing import Process,JoinableQueue 2 import time,random 3 def consumer(q): 4 while True: 5 # time.sleep(random.randint(1,2)) 6 res=q.get() 7 print('消費者拿到了 %s' %res) 8 q.task_done() 9 10 11 def producer(seq,q): 12 for item in seq: 13 # time.sleep(random.randrange(1,2)) 14 q.put(item) 15 print('生產者作好了 %s' %item) 16 q.join() 17 18 if __name__ == '__main__': 19 q=JoinableQueue() 20 seq=('包子%s' %i for i in range(10)) 21 22 p=Process(target=consumer,args=(q,)) 23 p.daemon=True #設置爲守護進程,在主線程中止時p也中止,可是不用擔憂,producer內調用q.join保證了consumer已經處理完隊列中的全部元素 24 p.start() 25 26 producer(seq,q) 27 28 print('主線程')
ps.其餘示例:
一個生產者+多個消費者
1 from multiprocessing import Process,JoinableQueue 2 import time,random 3 def consumer(name,q): 4 while True: 5 time.sleep(random.randint(1,2)) 6 res=q.get() 7 print('\033[45m%s拿到了 %s\033[0m' %(name,res)) 8 q.task_done() 9 10 11 def producer(seq,q): 12 for item in seq: 13 time.sleep(random.randrange(1,2)) 14 q.put(item) 15 print('\033[46m生產者作好了 %s\033[0m' %item) 16 q.join() 17 18 if __name__ == '__main__': 19 q=JoinableQueue() 20 seq=('包子%s' %i for i in range(10)) 21 22 p1=Process(target=consumer,args=('消費者1',q,)) 23 p2=Process(target=consumer,args=('消費者2',q,)) 24 p3=Process(target=consumer,args=('消費者3',q,)) 25 p1.daemon=True 26 p2.daemon=True 27 p3.daemon=True 28 p1.start() 29 p2.start() 30 p3.start() 31 32 producer(seq,q) 33 34 print('主線程')
子進程當生產者
1 from multiprocessing import Process,JoinableQueue 2 import time,random 3 def consumer(name,q): 4 while True: 5 # time.sleep(random.randint(1,2)) 6 res=q.get() 7 print('\033[45m%s拿到了 %s\033[0m' %(name,res)) 8 q.task_done() 9 10 11 def producer(seq,q): 12 for item in seq: 13 # time.sleep(random.randrange(1,2)) 14 q.put(item) 15 print('\033[46m生產者作好了 %s\033[0m' %item) 16 q.join() 17 18 if __name__ == '__main__': 19 q=JoinableQueue() 20 seq=['包子%s' %i for i in range(10)] #在windows下沒法傳入生成器,咱們能夠用列表解析測試 21 22 p1=Process(target=consumer,args=('消費者1',q,)) 23 p2=Process(target=consumer,args=('消費者2',q,)) 24 p3=Process(target=consumer,args=('消費者3',q,)) 25 p1.daemon=True 26 p2.daemon=True 27 p3.daemon=True 28 p1.start() 29 p2.start() 30 p3.start() 31 32 # producer(seq,q) #也能夠是下面三行的形式,開啓一個新的子進程當生產者,不用主線程當生產者 33 p4=Process(target=producer,args=(seq,q)) 34 p4.start() 35 p4.join() 36 print('主線程')
五、管道:進程間通訊(IPC)之二
管道也能夠說是隊列的另一種形式,下面咱們就開始介紹基於管道實現金城之間的消息傳遞
調用方法:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
參數說明:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
方法介紹:
a.主要方法:
conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
b. 其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
1)基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def consumer(p,name): 5 left,right=p 6 left.close() 7 while True: 8 try: 9 baozi=right.recv() 10 print('%s 收到包子:%s' %(name,baozi)) 11 except EOFError: 12 right.close() 13 break 14 def producer(seq,p): 15 left,right=p 16 right.close() 17 for i in seq: 18 left.send(i) 19 # time.sleep(1) 20 else: 21 left.close() 22 if __name__ == '__main__': 23 left,right=Pipe() 24 25 c1=Process(target=consumer,args=((left,right),'c1')) 26 c1.start() 27 28 29 seq=(i for i in range(10)) 30 producer(seq,(left,right)) 31 32 right.close() 33 left.close() 34 35 c1.join() 36 print('主進程')
ps.注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。若是忘記執行這些步驟,程序可能再消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生產EOFError異常。所以在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。
2) 管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序,以下
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def adder(p,name): 5 server,client=p 6 client.close() 7 while True: 8 try: 9 x,y=server.recv() 10 except EOFError: 11 server.close() 12 break 13 res=x+y 14 server.send(res) 15 print('server done') 16 if __name__ == '__main__': 17 server,client=Pipe() 18 19 c1=Process(target=adder,args=((server,client),'c1')) 20 c1.start() 21 22 server.close() 23 24 client.send((10,20)) 25 print(client.recv()) 26 client.close() 27 28 c1.join() 29 print('主進程')
ps.注意:send()和recv()方法使用pickle模塊對對象進行序列化。
六、共享數據:進程間通訊(IPC)之三
展望將來,基於消息傳遞的併發編程是大勢所趨
即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合
經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,
還能夠擴展到分佈式系統中
進程間通訊應該儘可能避免使用本節所講的共享數據的方式
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此
1 from multiprocessing import Process,Manager 2 import os 3 4 def foo(name,d,l): 5 l.append(os.getpid()) 6 d[name]=os.getpid() 7 if __name__ == '__main__': 8 with Manager() as manager: 9 d=manager.dict({'name':'egon'}) 10 l=manager.list(['init',]) 11 12 p_l=[] 13 for i in range(10): 14 p=Process(target=foo,args=('p%s' %i,d,l)) 15 p.start() 16 p_l.append(p) 17 18 for p in p_l: 19 p.join() #必須有join否則會報錯 20 21 print(d) 22 print(l)
七、進程同步(鎖):
模擬搶票(Lock互斥鎖)
1 #文件db的內容爲:{"count":1} 2 #注意必定要用雙引號,否則json沒法識別 3 from multiprocessing import Process,Lock 4 import json 5 import time 6 import random 7 import os 8 9 def work(filename,lock): #買票 10 # lock.acquire() 11 with lock: 12 with open(filename,encoding='utf-8') as f: 13 dic=json.loads(f.read()) 14 # print('剩餘票數: %s' % dic['count']) 15 if dic['count'] > 0: 16 dic['count']-=1 17 time.sleep(random.randint(1,3)) #模擬網絡延遲 18 with open(filename,'w',encoding='utf-8') as f: 19 f.write(json.dumps(dic)) 20 print('%s 購票成功' %os.getpid()) 21 else: 22 print('%s 購票失敗' %os.getpid()) 23 # lock.release() 24 25 if __name__ == '__main__': 26 lock=Lock() 27 p_l=[] 28 for i in range(100): 29 p=Process(target=work,args=('db',lock)) 30 p_l.append(p) 31 p.start() 32 for p in p_l: 33 p.join() 34 35 print('主線程')
互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去,若是指定信號量爲3,那麼來一我的得到一把鎖,計數加1,當計數等於3時,後面的人均須要等待。一旦釋放,就有人能夠得到一把鎖信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
八、信號量:
1 from multiprocessing import Process,Semaphore 2 import time,random 3 4 def go_wc(sem,user): 5 sem.acquire() 6 print('%s 佔到一個茅坑' %user) 7 time.sleep(random.randint(0,3)) #模擬每一個人拉屎速度不同,0表明有的人蹲下就起來了 8 sem.release() 9 10 if __name__ == '__main__': 11 sem=Semaphore(5) 12 p_l=[] 13 for i in range(13): 14 p=Process(target=go_wc,args=(sem,'user%s' %i,)) 15 p.start() 16 p_l.append(p) 17 18 for i in p_l: 19 i.join() 20 print('============》')
九、事件
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
clear:將「Flag」設置爲False
set:將「Flag」設置爲True
1 #_*_coding:utf-8_*_ 2 #!/usr/bin/env python 3 4 from multiprocessing import Process,Event 5 import time,random 6 7 def car(e,n): 8 while True: 9 if not e.is_set(): #Flase 10 print('\033[31m紅燈亮\033[0m,car%s等着' %n) 11 e.wait() 12 print('\033[32m車%s 看見綠燈亮了\033[0m' %n) 13 time.sleep(random.randint(3,6)) 14 if not e.is_set(): 15 continue 16 print('走你,car', n) 17 break 18 19 def police_car(e,n): 20 while True: 21 if not e.is_set(): 22 print('\033[31m紅燈亮\033[0m,car%s等着' % n) 23 e.wait(1) 24 print('燈的是%s,警車走了,car %s' %(e.is_set(),n)) 25 break 26 27 def traffic_lights(e,inverval): 28 while True: 29 time.sleep(inverval) 30 if e.is_set(): 31 e.clear() #e.is_set() ---->False 32 else: 33 e.set() 34 35 if __name__ == '__main__': 36 e=Event() 37 # for i in range(10): 38 # p=Process(target=car,args=(e,i,)) 39 # p.start() 40 41 for i in range(5): 42 p = Process(target=police_car, args=(e, i,)) 43 p.start() 44 t=Process(target=traffic_lights,args=(e,10)) 45 t.start() 46 47 print('============》')
十、進程池
開多進程的目的是爲了併發,若是有多核,一般有幾個核就開幾個進程,進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行),但很明顯須要併發執行的任務要遠大於核數,這時咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數...
當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。
並且對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。
使用方法:
Pool([numprocess [,initializer [, initargs]]]):建立進程池
參數介紹:
numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組
方法介紹:
a.主要方法:
p.apply(func [, args [, kwargs]]): 在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]): 在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。
p.close(): 關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
p.terminate(): 當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
p.jion(): 等待全部工做進程退出。此方法只能在close()或teminate()以後調用
b.其餘方法:
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
1)使用進程池維護固定數目的進程
1 #Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count()) 2 #開啓6個客戶端,會發現2個客戶端處於等待狀態 3 #在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程 4 from socket import * 5 from multiprocessing import Pool 6 import os 7 8 server=socket(AF_INET,SOCK_STREAM) 9 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 10 server.bind(('127.0.0.1',8080)) 11 server.listen(5) 12 13 def talk(conn,client_addr): 14 print('進程pid: %s' %os.getpid()) 15 while True: 16 try: 17 msg=conn.recv(1024) 18 if not msg:break 19 conn.send(msg.upper()) 20 except Exception: 21 break 22 23 if __name__ == '__main__': 24 p=Pool() 25 while True: 26 conn,client_addr=server.accept() 27 p.apply_async(talk,args=(conn,client_addr)) 28 # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
1 from socket import * 2 3 client=socket(AF_INET,SOCK_STREAM) 4 client.connect(('127.0.0.1',8080)) 5 6 7 while True: 8 msg=input('>>: ').strip() 9 if not msg:continue 10 11 client.send(msg.encode('utf-8')) 12 msg=client.recv(1024) 13 print(msg.decode('utf-8'))
2)異步(非阻塞,apply_async)
1 #coding: utf-8 2 from multiprocessing import Process,Pool 3 import time 4 5 def func(msg): 6 print( "msg:", msg) 7 time.sleep(1) 8 return 'hahaha' 9 10 if __name__ == "__main__": 11 pool = Pool(processes = 3) 12 res_l=[] 13 for i in range(10): 14 msg = "hello %d" %(i) 15 res=pool.apply_async(func, (msg, )) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 16 res_l.append(res) 17 print("==============================>") 18 pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 19 pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 20 print("Sub-process(es) done.") 21 for i in res_l: 22 print(res.get())
3)同步(阻塞,apply)
1 #coding: utf-8 2 from multiprocessing import Process,Pool 3 import time 4 5 def func(msg): 6 print( "msg:", msg) 7 time.sleep(0.1) 8 return 'hahaha' 9 10 if __name__ == "__main__": 11 pool = Pool(processes = 3) 12 res_l=[] 13 for i in range(10): 14 msg = "hello %d" %(i) 15 res=pool.apply(func, (msg, )) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 16 res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個 17 print("==============================>") 18 pool.close() 19 pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 20 print("Sub-process(es) done.") 21 print(res_l) 22 for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法 23 print(res)
4)多進程池
1 #coding: utf-8 2 import multiprocessing 3 import os, time, random 4 5 def Lee(): 6 print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()獲取當前的進程的ID 7 start = time.time() 8 time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數 9 end = time.time() 10 print('Task Lee, runs %0.2f seconds.' %(end - start)) 11 12 def Marlon(): 13 print("\nRun task Marlon-%s" %(os.getpid())) 14 start = time.time() 15 time.sleep(random.random() * 40) 16 end=time.time() 17 print('Task Marlon runs %0.2f seconds.' %(end - start)) 18 19 def Allen(): 20 print("\nRun task Allen-%s" %(os.getpid())) 21 start = time.time() 22 time.sleep(random.random() * 30) 23 end = time.time() 24 print('Task Allen runs %0.2f seconds.' %(end - start)) 25 26 def Frank(): 27 print("\nRun task Frank-%s" %(os.getpid())) 28 start = time.time() 29 time.sleep(random.random() * 20) 30 end = time.time() 31 print('Task Frank runs %0.2f seconds.' %(end - start)) 32 33 def Egon(): 34 print("\nRun task Egon-%s" %(os.getpid())) 35 start = time.time() 36 time.sleep(random.random() * 20) 37 end = time.time() 38 print('Task Egon runs %0.2f seconds.' %(end - start)) 39 40 def Lily(): 41 print("\nRun task Lily-%s" %(os.getpid())) 42 start = time.time() 43 time.sleep(random.random() * 20) 44 end = time.time() 45 print('Task Lily runs %0.2f seconds.' %(end - start)) 46 47 if __name__=='__main__': 48 function_list= [Lee, Marlon, Allen, Frank, Egon, Lily] 49 print("parent process %s" %(os.getpid())) 50 51 pool=multiprocessing.Pool(4) 52 for func in function_list: 53 pool.apply_async(func) #Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中 54 55 print('Waiting for all subprocesses done...') 56 pool.close() 57 pool.join() #調用join以前,必定要先調用close() 函數,不然會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束 58 print('All subprocesses done.')
十一、回調函數
1 from multiprocessing import Pool 2 import time,random 3 4 def get_page(url): 5 time.sleep(random.randint(1,3)) 6 print('下載頁面: %s' %url) 7 return {'url':url} #模擬下載後的結果 8 9 def parse_page(page_content): 10 time.sleep(1) 11 print('解析頁面: %s' %page_content) 12 13 14 if __name__ == '__main__': 15 urls=[ 16 'http://maoyan.com/board/7', 17 'http://maoyan.com/board/1', 18 'http://maoyan.com/board/2' 19 ] 20 p=Pool() 21 res_l=[] 22 for url in urls: 23 res=p.apply_async(get_page,args=(url,),callback=parse_page) 24 res_l.append(res) 25 26 for i in res_l: 27 i.get()
爬蟲案例:
1 from multiprocessing import Pool 2 import time,random 3 import requests 4 import re 5 6 def get_page(url,pattern): 7 response=requests.get(url) 8 if response.status_code == 200: 9 return (response.text,pattern) 10 11 def parse_page(info): 12 page_content,pattern=info 13 res=re.findall(pattern,page_content) 14 for item in res: 15 dic={ 16 'index':item[0], 17 'title':item[1], 18 'actor':item[2].strip()[3:], 19 'time':item[3][5:], 20 'score':item[4]+item[5] 21 22 } 23 print(dic) 24 if __name__ == '__main__': 25 pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) 26 27 url_dic={ 28 'http://maoyan.com/board/7':pattern1, 29 } 30 31 p=Pool() 32 res_l=[] 33 for url,pattern in url_dic.items(): 34 res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) 35 res_l.append(res) 36 37 for i in res_l: 38 i.get() 39 40 # res=requests.get('http://maoyan.com/board/7') 41 # print(re.findall(pattern,res.text))