一 multiprocessing模塊介紹python
python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。linux
multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。編程
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。json
二 Process類的介紹windows
建立進程的類安全
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) #強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹網絡
group參數未使用,值始終爲None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name爲子進程的名稱
1 p.start():啓動進程,並調用該子進程中的p.run() 2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 3 4 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 5 p.is_alive():若是p仍然運行,返回True 6 7 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
3、Process類的使用數據結構
注意:在windows中Process()必須放到# if __name__ == '__main__':下多線程
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。
簡言之,是因爲在windows系統下,子進程是經過導入模塊的方式拿到父進程的代碼,若是沒有main會一直開啓子進程,而子進程的申請是須要開闢內存以及申請pid等的併發
建立進程的兩種方式
方式一:調用內置的類
from multiprocessing import Process def func(name): print('%s,running...' % name) print('%s,ending...' % name) if __name__ == '__main__': obj = Process(target=func, args=('子進程一',)) # 若是隻有一個參數,args括號內必定要加逗號,確保以元組的形式傳入 obj.start() # 只是主進程給操做系統發送創建子進程的請求,並不是馬上創建子進程 print('主>>>')
方式二:自定義類
class MyProcess(Process): def run(self): # 必須爲這個名字 print('%s running...' % self.name) print('%s ending...' % self.name) if __name__ == '__main__': obj = MyProcess() obj.start() # 本質上是在調用父類的start方法,而start方法下會觸發run方法 print('主>>>')
進程之間的內存空間是隔離的
from multiprocessing import Process n=100 def work(): global n n=0 print('子進程內: ',n) if __name__ == '__main__': p=Process(target=work) p.start() print('主進程內: ',n)
進程對象相關的屬性和方法
join()
import time from multiprocessing import Process x = 1 def task(name, num): print('%s is running' % name) global x x = 0 time.sleep(num) print('%s is done' % name) if __name__ == '__main__': obj_list = [] start_time = time.time() for i in range(1, 3): obj = Process(target=task, args=('子進程%s' % i, i)) obj_list.append(obj) obj.start() for obj in obj_list: obj.join() # join()讓父進程原地等待,等子進程運行完畢後,才執行下一行代碼(也擁有wait方法,等子進程運行完畢後,向操做系統發送請求,回收子進程佔用的PID) end_start = time.time() print(x, end_start - start_time) # 能夠看出修改子進程內的變量值並不會影響到父進程--->進程之間內存空間彼此隔離
import time from multiprocessing import Process def task(name): print('%s is running'% name) time.sleep(15) # 給咱們在cmd內查看進程預留時間 if __name__ == '__main__': p = Process(target=task,args=('進程一',)) p.start() print(p.pid) p.join() print(p.pid) # 執行結果 26324 進程一 is running 26324 """ 說join()內也擁有wait()方法,會在子進程結束後向操做系統發送請求,回收子進程的pid,那麼爲何在join()以後仍然能查到子進程的pid? join()確實向操做系統發送了請求,操做系統也確實回收了子進程(經過cmd能夠查看到),可是在子進程被建立時,pid已經成爲父進程的一個屬性(指向子進程的pid),join以後咱們並無刪除這個屬性(只是這個屬性沒有任何意義) """
os.get_pid與os.get_ppid()
import time,os from multiprocessing import Process def task(name): print('%s is running'% name) print(os.getpid()) time.sleep(5) if __name__ == '__main__': p = Process(target=task,args=('進程一',)) p.start() p.join() print(os.getpid(),os.getppid()) time.sleep(100)
咱們能夠用os模塊下的get_pid()查看當前進程的pid,以及get_ppid()查看父進程的pid
經過cmd能夠看出子進程與父進程的pid都是指向python.exe,而父進程的父進程是pycharm
緣由是子進程與父進程執行的都是python代碼,須要經過解釋器執行(將所要執行的代碼做爲參數傳入解釋器內),咱們的進程是在pychram內執行的,若是經過cmd執行那麼父進程就是cmd.exe
咱們經過cmd執行這個py文件,而後用另外一個cmd2去殺死cmd1,發現cmd1並無關閉
緣由是子進程在佔用cmd的終端顯示(cmd1確實被回收了,在cmd2內查不到cmd1的pid號),若是將子進程設置爲後臺運行,就會發如今咱們殺死cmd1時,cmd1窗口就會當即關閉(進程與進程的內存空間彼此是隔離的)
obj.terminate():用於殺死子進程
obj.is_alive(): 判斷一個子進程是否存活
4、殭屍進程、孤兒進程、守護進程
殭屍進程
子進程在結束以後,釋放掉其佔用的絕大部份內存空間以及cpu等資源,可是會留下一個稱爲殭屍進程的數據結構(包含子進程的pid),等待父進程處理。這種狀況下的殭屍進程是無害的(待全部的子進程結束後,父進程會統一貫操做系統發送回收子進程pid的請求,或者使用join(),其內部也擁有wait()方法),可是,若是父進程是一個死循環,不斷的創造子進程,而又不發送回收請求,這就形成了大量的pid被佔用
孤兒進程
在子進程結束以前,父進程就掛掉了,該子進程稱之爲孤兒進程(無害),最後會由全部進程的父進程進行發送回收請求(linux中爲init進程)
守護進程
兩個關鍵詞:守護/進程
進程:其本質也是一種「子進程」
守護:伴隨的意思
即,守護進程會伴隨着父進程代碼的執行結束而死亡(不折不扣,而非成爲殭屍進程)
爲何要用守護進程
當該進程的代碼在父進程的代碼執行完畢後就沒有存在的意義了,則應該將該進程設置爲守護進程(例如在生產者與消費者模型中,生產者是專門負責產生數據的任務,而消費者是負責處理數據的任務,當生產者對象join以後,意味生產者再也不生產數據,也意味着執行父進程的下一行代碼,而消費者處理的數據來自生產者,因此應該將充當消費者的子進程設置爲守護進程)
如何將進程設置爲守護進程
import time from multiprocessing import Process def bar(name): print("%s is running" % name) time.sleep(3) print("%s is done" % name) def foo(name): print("%s is running" % name) time.sleep(3) print("%s is done" % name) if __name__ == '__main__': p1 = Process(target=bar, args=('守護進程',)) p2 = Process(target=foo, args=('子進程',)) p1.daemon = True # 開啓守護進程必定要在父進程發送請求以前 p1.start() p2.start() print('主is done')
在這種狀況下,執行結果可能有三種(取決於計算機的性能)
分析以前,首先要明確的是
1.start()操做只是表明父進程向操做系統發送建立子進程的請求(而非當即產生子進程),至於什麼建立,先建立誰由操做系統決定
2.產生進程是須要開闢內存空間的,爲子進程分配pid,經過導入的方式將父進程的代碼複製到子進程中(還有其它資源),這些都是須要必定的時間
3.守護進程是伴隨着父進程的代碼執行完畢而結束(而非父進程的死亡)
4.print()操做,是先在內存中產生字符串這個對象,而後將其輸出到屏幕上,這也是須要必定的時間
主進程is done 子進程 is running 子進程 is done #這是因爲執行兩個start()以後,當即執行了print()操做,待父進程代碼執行完畢後,守護進程仍然沒有被創造出來
主進程is done 守護進程 is running 子進程 is running 子進程 is done #這是因爲,在父進程執行print()操做的時候,守護進程被建立出來
守護進程 is running 主進程is done 子進程 is running 子進程 is done #這是因爲在父進程向操做系統發送申請子進程的時候,守護進程被建立出來
5、互斥鎖
將多個任務對修改共享數據的操做由併發變爲「串行」
# json文件 {"count": 2} # 執行文件 import os import time import json import random from multiprocessing import Process def check(): with open("a.json","r",encoding="utf-8") as f: data_dic = json.load(f) time.sleep(random.random()) print('[%s] 在%s 查看了車票,車票還剩:%s' % (os.getpid(), time.strftime('%Y-%m-%d %X'), data_dic['count'])) def pay(): with open("a.json","r",encoding="utf-8") as f: data_dic = json.load(f) time.sleep(random.random()) if data_dic['count'] > 0: data_dic['count'] -= 1 time.sleep(random.random()) with open("a.json","w",encoding="utf-8") as f: json.dump(data_dic,f) print('[%s] 在%s 購買了車票' % (os.getpid(), time.strftime('%Y-%m-%d %X'))) else: print('[%s] 在%s 購票失敗' % (os.getpid(), time.strftime('%Y-%m-%d %X'))) def buy_tickets(): check() pay() if __name__ == '__main__': for i in range(8): p = Process(target=buy_tickets) p.start() #發現,兩張票被8我的購買
# json 文件 {"count": 2} # 執行文件 import os import time import json import random from multiprocessing import Process, Lock def check(): with open("a.json", "r", encoding="utf-8") as f: data_dic = json.load(f) time.sleep(random.random()) print('[%s] 在%s 查看了車票,車票還剩:%s' % (os.getpid(), time.strftime('%Y-%m-%d %X'), data_dic['count'])) def pay(): with open("a.json", "r", encoding="utf-8") as f: data_dic = json.load(f) time.sleep(random.random()) if data_dic['count'] > 0: data_dic['count'] -= 1 time.sleep(random.random()) with open("a.json", "w", encoding="utf-8") as f: json.dump(data_dic, f) print('[%s] 在%s 購買了車票' % (os.getpid(), time.strftime('%Y-%m-%d %X'))) else: print('[%s] 在%s 購票失敗' % (os.getpid(), time.strftime('%Y-%m-%d %X'))) def buy_tickets(lock): # 若是將鎖加到這裏,就將整個任務串行了 check() lock.acquire() # 該進程拿到鎖,其他沒有鎖的進程等待 pay() lock.release() # 釋放鎖,其他等待進程競爭 ''' 或者使用with上下文管理 with lock: pay() ''' if __name__ == '__main__': mutex = Lock() # 得到鎖對象 for i in range(8): p = Process(target=buy_tickets, args=(mutex,)) # p = Process(target=buy_tickets, kwargs={'lock': mutex}) p.start()
總結
1.多個任務併發的去操做共享數據會形成數據錯亂,使用互斥鎖,雖然使任務對共享數據的操做由併發變爲「串行」,減低了效率,可是提升了數據的安全性
2.此方法的共享數據效率低(數據來自於硬盤) 而且須要本身去處理鎖的問題
因此,爲了解決第二個問題,就要尋求一種方法---->使多個進程共享一個內存空間中的共享數據,該方法能夠替咱們處理好鎖的問題
IPC通訊
管道與隊列
管道與隊列將數據存放於內存中,並且隊列是經過管道+鎖實現的
6、IPC
進程間的通訊
兩種實現方式
管道:pipe
隊列:queue(其實就是pipe+lock)
注意:二者實際上都是內存空間,不要往裏面放入大數據,只能放數據量較小的消息
IPC所解決的問題
1.當多個任務併發的去修改共享數據,就可能會形成數據錯亂,咱們經過加互斥鎖使多個任務對共享數據的操做由併發變爲「串行」,從而保證了共享數據的安全,而當出現須要修改多個共享數據的需求時,咱們就得再次加鎖處理
---->IPC幫咱們解決了須要本身加鎖的問題
2.進程間的內存空間是彼此隔離的,如何完成通訊(數據交互),就須要尋求一種共享的東西,硬盤是共享的,可是讀取硬盤的速度慢
---->IPC實現了一種內存空間上的共享(兩個進程之間經過隊列交流)
隊列的使用
from multiprocessing import Queue q = Queue(3) # 設置隊列存放大小 q.put('你好') q.put({'name':"bob",'age':26}) q.put([1,2,3]) print(q.get()) print(q.get()) print(q.get())
注意
1.隊列內能夠存放的是python任意類型的數據
2.隊列是先進先出
3.當put的數量大於隊列容許放入的數量時,就會發生阻塞(block=True),直到隊列中有消息被取走
4.當隊列中的內容被取空時,get操做也會發生阻塞(block=True),直到有新的數據放入隊列中
隊列的其它參數
q.put(obj=,block=,timeout=)
q.get(block=,timeout=)
put()
obj:表明要放入隊列的數據
block:默認爲True,表明當放入的數據數量大於隊列容許的數量時,就會發生阻塞;False,則直接拋出 queue.Full 異常
timeout:默認爲-1,表明當發生阻塞時,會一直等待(等待隊列中有數據被取走),若是阻塞時間大於自設置的時間,則拋出queue.Full 異常
get()
block:與put()的意思相似,默認值也爲True
timeout:與put()的意思相似,默認值爲-1
7、生產者與消費者模型
三個關鍵詞:消費者/生產者/模型
模型:能夠理解爲用於解決一類問題的統一方法/模板
生產者:比喻的是在程序中負責產生數據的任務
消費者:比喻的是在程序中負責處理數據的任務
實現
生產者 ------> queue < ------ 消費者 (經過隊列進行交互,實現解耦和)
好處
實現了生產者與消費者的解耦和,生產者能夠不斷的生產,消費者能夠不斷的消費,平衡了生產者的生產能力與消費者的消費能力,從而提高了程序的總體運行效率
何時使用該模型
當程序中明顯的出現了兩類任務,一類負責產生數據,一類負責處理數據,就可使用生產者與消費者模型來提高程序的總體效率
使用
import os import time import random from multiprocessing import Process, Queue def produce(q): for i in range(1, 5): res = '包子%s' % i time.sleep(random.uniform(1, 3)) q.put(res) print('廚師:%s生產了%s' % (os.getpid(), res)) def customer(q): while True: res = q.get() time.sleep(random.uniform(1, 3)) print('客戶:%s吃了包子%s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() p1 = Process(target=produce, args=(q,)) c1 = Process(target=customer, args=(q,)) p1.start() c1.start() print('主進程 is done')
執行上訴代碼,發現一個問題,當隊列內的數據取空後(生產者生產完畢),消費者的get操做發生了阻塞,因此要尋求一種方法讓消費者知道生產者生產完畢
import time import random from multiprocessing import Process,Queue def produce(name,food,q): for i in range(1,4): res = '%s%s'%(food,i) time.sleep(random.uniform(1,3)) q.put(res) print('\033[45m廚師%s生產了%s%s\033[0m'%(name,food,i)) def customer(name,q): while True: res = q.get() if res is None: break time.sleep(random.uniform(1,3)) print('\033[46m%s 吃了 %s\033[0m'%(name,res)) if __name__ == '__main__': q = Queue() p1 = Process(target=produce,args=('bob','包子',q)) p2 = Process(target=produce,args=('tom','饅頭',q)) p3 = Process(target=produce,args=('tony','花捲',q)) c1 = Process(target=customer,args=('顧客1',q)) c2 = Process(target=customer,args=('顧客1',q)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) print('主進程 is done')
3個join以後意味着生產者生產完畢,由主進程爲隊列添加結束信號(也能夠在生產者內添加),消費者拿到結束信號後結束
可是這種方法須要有幾個消費者就發送幾個信號(消費者個數沒法預知,這種作法也low)
另外一種隊列:JoinableQueue([maxsize]) # maxsize爲容許隊列存放的最大數,默認無限制
方法
1.擁有與Queue對象相同的方法
2.task_done().消費者調用該方法,記錄get()方法的返回項已經被處理
3.join().調用此方法發生阻塞,直到隊列中的內容所有被取乾淨
import time import random from multiprocessing import Process,JoinableQueue def produce(name,food,q): for i in range(1,4): res = '%s%s'%(food,i) time.sleep(random.uniform(1,3)) q.put(res) print('\033[45m廚師%s生產了%s%s\033[0m'%(name,food,i)) def customer(name,q): while True: res = q.get() time.sleep(random.uniform(1,3)) print('\033[46m%s 吃了 %s\033[0m'%(name,res)) q.task_done() if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=produce,args=('bob','包子',q)) p2 = Process(target=produce,args=('tom','饅頭',q)) p3 = Process(target=produce,args=('tony','花捲',q)) c1 = Process(target=customer,args=('顧客1',q)) c2 = Process(target=customer,args=('顧客1',q)) c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() ''' 讓父進程等待生產者進程執行完畢後再執行 q.join執行結束意味着--->生產者代碼執行完畢 ''' q.join() ''' 等待隊列被取乾淨 執行結束意味着父進程代碼執行結束,生產者執行結束而且隊列被取乾淨--->消費者沒有存在的意義 即,這種狀況下,消費者代碼應該伴隨着父進程代碼的結束而結束(守護進程) '''