在進入多進程的學習以前, 必定須要先了解一個應用程序是如何開啓一個進程的, 以及操做系統對進程是如何進行分配資源的, 進程、線程、進程池、進程三態、同步、異步、併發、並行、串行的概念也要很是的明確, 下面將介紹 Python 併發編程之多進程html
ps : 值得注意的是 : 與線程不一樣,進程沒有任何共享狀態,多個進程的內存空間相互物理隔離, 進程修改的數據,改動僅限於該進程內python
multiprocessing 模塊提供了 Process 類,該類可用來在 Windows 平臺上建立新進程編程
使用 Process 類建立實例化對象,其本質是調用該類的構造方法建立新進程json
Process([group [, target [, name [, args [, kwargs]]]]]) # 其實是調用了下面的構造方法 def __init__(self,group=None,target=None,name=None,args=(),kwargs={})
值得注意的是 :數組
參數的指定須要使用關鍵字的方式安全
args 指定的值是爲 target 指定的函數的位置參數, 而且是一個元組形式, 一個值必須帶逗號bash
參數名 | 說明 |
---|---|
group | 該參數未進行實現,不須要傳參 |
target | 爲新建進程指定執行任務,也就是指定一個函數 |
name | 爲新建進程設置名稱 |
args | 爲 target 參數指定的參數傳遞非關鍵字參數 |
kwargs | 爲 target 參數指定的參數傳遞關鍵字參數 |
方法 | 做用 |
---|---|
run( ) | 第 2 種建立進程的方式須要用到,繼承類中須要對方法進行重寫,該方法中包含的是新進程要執行的代碼 |
start( ) | 和啓動子線程同樣,新建立的進程也須要手動啓動,該方法的功能就是啓動新建立的線程 |
join([timeout]) | 主線程等待子進程終止(強調:是主線程處於等的狀態,而p是處於運行的狀態),timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程 |
is_alive( ) | 判斷當前進程是否還活着 |
terminate( ) | 中斷該進程 |
屬性 | 做用 |
---|---|
name | 能夠爲該進程重命名,也能夠得到該進程的名稱。 |
daemon | 和守護線程相似,經過設置該屬性爲 True,可將新建進程設置爲「守護進程」 |
pid | 返回進程的 ID 號。大多數操做系統都會爲每一個進程配備惟一的 ID 號 |
exitcode | 進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) |
authkey | 進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網 |
在 Windows 中 Process( ) 必須放在 if __name__ == '__main__':
之下網絡
ps : fork 是 OS提供的方法 os.fork()
, 該方法能夠在當前程序中再建立出一個進程, 可是在 Windows 平臺上無效, 只在 Linux, UNIX, Mac OSX上有效數據結構
from multiprocessing import Process import time,os def test(n): print(f"父進程{os.getppid()},紫禁城{os.getpid()}") time.sleep(n) print(f"父進程{os.getppid()},紫禁城{os.getpid()}") if __name__ == '__main__': p = Process(target=test,args=(2,)) p.start() # 作發起系統調用的活 print(f"當前執行文件{os.getpid()}") ''' 當前執行文件16860 父進程16860,紫禁城6404 父進程16860,紫禁城6404 '''
from multiprocessing import Process import time,os class MyProcess(Process): def __init__(self,n): super().__init__() self.n = n def run(self) -> None: print(f"父進程{os.getppid()},紫禁城{self.pid}") time.sleep(self.n) print(f"父進程{os.getppid()},紫禁城{os.getpid()}") if __name__ == '__main__': p = MyProcess(2) p.start() print(f"當前執行文件{os.getpid()}") ''' 當前執行文件8136 父進程8136,紫禁城1280 父進程8136,紫禁城1280 '''
from multiprocessing import Process import time x = 222 def test(): global x x = 111 if __name__ == '__main__': p = Process(target=test) p.start() # 發送系統調用 time.sleep(1) # 等待子進程運行完 print(x) # 222 (仍是原來的)
子進程 test 函數中聲明全局變量 x, 並修改 x 的值, 等待子進程運行完畢, 最後打印 x , 發現 x 的值並無改變多線程
from multiprocessing import Process x = 222 def test(): global x x = 111 if __name__ == '__main__': p = Process(target=test) p.start() # 發送系統調用 p.join() # 等待子進程運行完(以前咱們使用sleep並不能精確的知道子進程結束運行的時間) print(x) # 222 (仍是原來的)
timeout
是可選的超時間, 等多久就不等了from multiprocessing import Process x = 222 def test(): global x x = 111 if __name__ == '__main__': p = Process(target=test) p.start() # 發送系統調用 p.join(0.001) # 等待 0.001 秒就不等了
start()
只是發起系統調用, 並非運行子進程, 當 start()
執行完後緊接着就執行後面的代碼start()
發起調用以後, 是通知操做系統建立一個子進程, 操做系統須要申請一個內存空間, 將父進程的數據複製一份到子進程的內存空間中做爲初始化用 (Linux是將父進程的數據原本來本的複製一份, 而Windows 稍有些不一樣), 而後子進程才運行起來import time,os def test(n): time.sleep(n) print(f"父進程{os.getppid()} 子進程{os.getpid()}") if __name__ == '__main__': p1 = Process(target=test,args=(3,)) p2 = Process(target=test,args=(2,)) p3 = Process(target=test,args=(1,)) p1.start() # 用時 3 秒 p2.start() # 用時 2 秒 p3.start() # 用時 1 秒 start_time = time.time() p1.join() p2.join() p3.join() # 三個進程都在併發的運行, 主進程一共運行3秒多 stop_time = time.time() print(f'主進程{os.getpid()} 用時{stop_time-start_time}') ''' 父進程10888 子進程6792 父進程10888 子進程13368 父進程10888 子進程14800 主進程10888 用時3.131737470626831 '''
from multiprocessing import Process import time def test(): time.sleep(1) if __name__ == '__main__': p = Process(target=test,args=(2,)) p.start() p.terminate() # 只是發起系統調用, 通知操做系統關閉子進程 print(p.is_alive()) # True
由上面可知
terminate()
只是發起系統調用, 並非當即關閉子進程, 操做系統關閉子進程回收資源也要一小會, 咱們可使用sleep簡單延時
from multiprocessing import Process import time def test(): time.sleep(1) if __name__ == '__main__': p = Process(target=test,args=(2,)) p.start() p.terminate() # 只是發起系統調用, 通知操做系統關閉子進程 time.sleep(0.1) # 稍微延時一點 print(p.is_alive()) # False
from multiprocessing import Process import time,os class MyProcess(Process): def __init__(self,n,name): super().__init__() self.n = n self.name = name def run(self) -> None: time.sleep(self.n) print(f"子進程pid:{self.pid}") # 子進程pid:14156 print(f"子進程模塊名:{__name__}") # 子進程模塊名:__mp_main__ print(f"子進程名:{self.name}") # 子進程名:aaaa if __name__ == '__main__': p = MyProcess(1,"aaaa") p.start() p.join() print(f"打印子進程pid:{p.pid}") # 打印子進程pid:14156 print(f"打印主進程pid:{os.getpid()}") # 打印主進程pid:16340 print(f"子進程名:{p.name}") # 子進程名:aaaa print(f"主進程模塊名:{__name__}") # 主進程模塊名:__main__
__name__
: Python中每一個模塊都有本身的名字,__name__
是一個系統變量, 是模塊的標識符, 值是模塊的名稱, 而且在自身模塊中:__name__
的值等於__mian__
ps : 任何正常結束的子進程都會進入到殭屍狀態, 而被強制終止的進程的全部信息將會被清除
這些軟件在開啓子進程時, 父進程內部會及時調用"wait" / "waitpid" 通知操做系統來回收殭屍進程
功底深厚,知道父進程要對子進程負責 會在父進程內部考慮到調用 "wait" / "waitpid" 通知操做系統回收殭屍進程 可是發起系統調用時間可能慢了一點 因而咱們就可使用 "ps aux | grep [z]+" 命令查看到殭屍進程
技術半吊子,只知道開子進程,父進程也不結束,並在那一直開子進程,不知道什麼是殭屍進程 系統調用 "wait" / "waitpid" 也沒有據說過 因而計算機會堆積許多的殭屍進程,佔用着大量的"pid",(每啓動一個進程就會分配一個"pid號") 計算機進入一個奇怪的現象: 內存夠用,硬盤充足,CPU空閒,但新的程序沒法啓動 這就是由於"PID"不夠用了
咱們能夠手動發信號給父進程: "# kill -CHLD [父進程的PID]" 通知父進程快點向操做系統發起系統調用 "wait" / "waitpid" 來清理變成殭屍的兒子們
這種狀況子下,咱們只能將父進程終結,由於你發給它的信號不會獲得迴應 父進程被殺死,"殭屍進程"將會變成"殭屍孤兒進程" 但凡是"孤兒進程"都會被Linux系統中"PID"爲"1"的頂級進程"systemd"回收 "systemd"會發起系統調用 "wait" / "waitpid" 來通知操做系統清理殭屍進程 # Centos7 的頂級進程爲 systemd # Centos6 的頂級進程爲 init
本來 multiprocessing 模塊在你發起系統調用 start()
開啓子進程的時候會自動檢測當前狀態下是否存在殭屍進程, 並將其回收, join()
調用也是同樣, 咱們能夠查看這兩個調用的源碼進行查看 :
# coding:utf-8 from multiprocessing import Process import os,time def task(): print("子進程:%s"%os.getpid()) time.sleep(4) # 子進程 4 秒後結束變成殭屍進程 if __name__ == "__main__": for i in range(400): print("父進程:%s"%os.getpid()) p = Process(target=task) p.start() time.sleep(100000) # 讓父進程停在原地什麼也不作
使用 top 命令查看系統狀態信息, 能夠發現已經出現了 400 個殭屍進程
咱們能夠經過 kill 剛運行的 py 文件將這些殭屍進程變成孤兒進程, 從而被 systemd 接管, systemd 再發起系統調用將其清除
由主進程建立, 並會隨着主進程的結束而結束
進程之間是相互獨立的, 守護進程會在主進程代碼執行結束後就終止
守護進程內沒法再次開啓子進程, 不然會拋出異常 : AssertionError: daemonic processes are not allowed to have children
from multiprocessing import Process import os,time class MyProcess(Process): def __init__(self,n): super().__init__() self.n = n def run(self) -> None: print(f'子進程:{os.getpid()}開始') time.sleep(2) print(f"子進程:{os.getpid()}結束") if __name__ == '__main__': p = MyProcess(2) p.daemon = True # 須要在 strat() 以前設置 p.start() print(f"主進程:{os.getpid()}結束") # 在當前主進程的代碼已經運行完畢, 守護進程就會終止, 甚至守護進程還沒來的急啓動 '''輸出 主進程:16924結束 '''
咱們使用 sleep 讓主進程簡單延時一下好讓子進程啓動起來
from multiprocessing import Process import os,time class MyProcess(Process): def __init__(self,n): super().__init__() self.n = n def run(self) -> None: print(f'子進程:{os.getpid()}開始') time.sleep(2) print(f"子進程:{os.getpid()}結束") if __name__ == '__main__': p = MyProcess(2) p.daemon = True p.start() time.sleep(1) # 延時一秒, 足夠操做系統將子進程開起來 print(f"主進程:{os.getpid()}結束") '''輸出 子進程:8620開始 主進程:10480結束 '''
再次強調, 守護進程是在主進程的代碼執行完畢終止
from multiprocessing import Process import os,time def Foo(): print(f"Foo:{os.getpid()}-->111") time.sleep(1) print(f"Foo--->222") def Bar(): print(f"Bar:{os.getpid()}-->333") time.sleep(2) print(f"Bar--->444") if __name__ == '__main__': p1 = Process(target=Foo) p2 = Process(target=Bar) p1.daemon = True # 將 p1 設置守護進程 p1.start() p2.start() print("------>end") # 當運行到這一行的時候主進程代碼已經運行完了, 那麼守護進程也已經終止了, 與主進程在等着 p2 運行無關, 這時操做系統還沒來的急啓動 p1 這個子進程 '''輸出 ------>end Bar:18124-->333 Bar--->444 '''
上面咱們實現了進程的併發, 進程之間的數據是不共享的, 可是他們能夠共享同一個文件(硬盤空間), 或者是同一個打印空間, 然而在共享的同時也帶來了問題 : 進程的運行不是同時進行的, 它們沒有前後順序, 一旦開啓也不受咱們的限制, 當多個進程使用同一份數據資源時, 就會引起數據安全或者數據混亂問題
咱們打個簡單的比方, 公司裏的一臺打印機, 每一個人均可以使用, 但同事只能有一我的在使用, 否則就會形成打印錯亂; 又好比合租房的衛生間, 合住的同伴均可以使用衛生間, 但每次只能一我的進去, 進去以後門就鎖上了(至關於加鎖 Lock( ).acquire
( )), 出來以後開門, 其餘人又可使用衛生間了(至關於解鎖Lock( ).release( )
)
🍓餘票文件 "aaa.json" {"count": 1} # 剩一張票 🍓模擬多我的搶票 # coding:utf-8 from multiprocessing import Process import os,time,json def check(): # 先查票 time.sleep(1) # 模擬網絡延遲 with open("aaa.json")as f: dic = json.load(f) print(f"剩餘票數 : {dic['count']}") def get(): # 查完以後開始搶 time.sleep(1) # 模擬網絡延遲 with open("aaa.json")as f: dic = json.load(f) if dic["count"] >0: dic["count"] -= 1 time.sleep(1) # 模擬網絡延遲 with open("aaa.json","w")as f2: # 搶完以後修改數據並提交到服務端 json.dump(dic,f2) print(f"用戶 : {os.getpid()} 搶票成功") else: print(f"用戶 : {os.getpid()} 搶票失敗") def run(): check() time.sleep(1) # 模擬網絡延遲 get() if __name__ == "__main__": for i in range(4): p = Process(target=run) p.start() '''輸出 剩餘票數 : 1 剩餘票數 : 1 剩餘票數 : 1 剩餘票數 : 1 用戶 : 13116 搶票成功 用戶 : 2364 搶票成功 用戶 : 1796 搶票成功 用戶 : 6228 搶票成功 '''
打印的結果發現只有一張票, 可是四我的都搶成功了, 這就很是不合理,形成了數據混亂
# coding:utf-8 from multiprocessing import Process,Lock import os,time,json def check(): # 先查票 time.sleep(1) # 模擬網絡延遲 with open("aaa.json")as f: dic = json.load(f) print(f"剩餘票數 : {dic['count']}") def get(): # 查完以後開始搶 time.sleep(1) # 模擬網絡延遲 with open("aaa.json")as f: dic = json.load(f) if dic["count"] >0: dic["count"] -= 1 time.sleep(1) # 模擬網絡延遲 with open("aaa.json","w")as f2: # 搶完以後修改數據並提交到服務端 json.dump(dic,f2) print(f"用戶 : {os.getpid()} 搶票成功") else: print(f"用戶 : {os.getpid()} 搶票失敗") def run(lock): check() time.sleep(1) # 模擬網絡延遲 lock.acquire() # 在搶票環節加鎖 get() lock.release() # 搶完後解鎖 if __name__ == "__main__": lock = Lock() for i in range(4): p = Process(target=run,args=(lock,)) p.start() '''輸出 剩餘票數 : 1 剩餘票數 : 1 剩餘票數 : 1 剩餘票數 : 1 用戶 : 432 搶票成功 用戶 : 2636 搶票失敗 用戶 : 7772 搶票失敗 用戶 : 1272 搶票失敗 '''
加鎖以後, 一張票只有一我的能搶成功, 其實就是讓搶票這個局部環節變成了串行, 誰搶到了就誰用, 犧牲了效率, 提高了數據安全性
一、共享的數據基於文件, 文件又屬於硬盤, 效率就比較低 二、須要本身加鎖和解鎖操做, 這是一件很是危險的操做, 若是忘記解鎖程序就停在原地
🍑需求 一、多個進程共享同一塊內存數據, 實現高效率 二、找到一個能幫咱們處理好鎖的問題的機制 : multiprocessing模塊爲咱們提供了IPC通訊機制:管道和隊列 🍑介質 一、管道和隊列, 基於內存中的空間存放數據 二、隊列是基於管道和鎖實現的, 可讓咱們從複雜的鎖問題中解脫出來
ps : 咱們應該儘可能避免使用共享數據, (好比一個文件的傳遞應該將文件保存到硬盤, 在管道中放的應該是一個路徑, 而不該該是一個完整的文件), 儘量使用消息傳遞和隊列, 避免處理複雜 的同步和鎖問題, 並且在進程數目增多時, 每每能夠得到更好的可擴展性
進程間通訊機制簡稱 IPC (Inter Process Communication)
進程間彼此隔離, 要實現 IPC, multiprocessing 模塊爲咱們提供了隊列和管道這兩種形式
🍑導入模塊 from multiprocessing import Queue 🍑建立一個隊列對象 q = Queue([maxsize]) # 多進程可使用Queue進行數據傳遞 🍑參數介紹 maxsize # 是隊列中容許最大項, 省略則無大小限制
方法 | 功能 |
---|---|
q.put( ) | 向隊列中傳入數據,可選參數 : blocked(鎖定狀態)和timeout(超時時間)。若是blocked爲True(默認值), 而且timeout爲正值, 該方法會阻塞timeout指定的時間, 直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常 |
q.get( ) | 從隊列讀取走一個元素, 有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常 |
q.get_nowait( ) | 同q.get(blocked=False) |
q.put_nowait( ) | 同q.put(blocked=False) |
q.empty( ) | 調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目 |
q.full( ) | 調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走 |
q.qsize( ) | 返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣 |
方法 | 做用 |
---|---|
q.cancel_join_thread( ) | 不會在進程退出時自動鏈接後臺線程, 能夠防止join_thread()方法阻塞 |
q.close( ) | 關閉隊列, 防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤 |
q.join_thread( ) | 鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲 |
from multiprocessing import Queue q = Queue(3) # 建立一個隊列,設置最大項爲3 q.put({"name":"ii"}) # 放入一個字典 q.put([1,2,3,4,5]) # 放入一個列表 q.put("shawn") # 放入一個字符串 try: # q.put(1777,block=True,timeout=3) q.put(1777,block=False) # 放入一個整形,並設置隊列已滿立馬拋異常 except Exception: print("隊列已滿") print(q.get()) # 取一個值 print(q.get()) # 2 print(q.get()) # 3 try: # print(q.get(block=True,timeout=3)) print(q.get(block=False)) # 取一個值,隊列爲空立馬拋出異常 except Exception: print("隊列已空") '''輸出 隊列已滿 {'namwe': 'ahsns'} [1, 2, 3, 4, 5] shawn 隊列已空 '''
在併發編程中, 生產者消費者模式經過一個容器來解決生產者和消費者之間的強耦合性, 二者之間再也不是直接通訊, 而是經過堵塞隊列來進行通訊, 生產者(生產速度快)沒必要再等待消費者是否處理完數據, 消費者直接從隊列中取, 該隊列就至關於一個緩衝區, 平衡了生產者和消費者的工做能力, 從而提升了程序總體的數據處理速度
經過隊列 : 生產者------>隊列------->消費者
from multiprocessing import Process, Queue import time, random def producer(q, name, food): for i in range(3): res = f"{food}{i}" time.sleep(random.randint(1, 3)) # 模擬生產者數據產出時間 q.put(res) # 將產生的數據放入到隊列中 print(f"\033[1;35m{name}:生產了:{res}\033[0m") def consumer(q, name): while True: res = q.get() # 取出數據 if res == None: break # 判斷是否None, None表明隊列取完了,結束 time.sleep(random.randint(1, 3)) # 模擬消費者處理數據時間 print(f"\033[1;36m{name}吃了{res}\033[0m") if __name__ == "__main__": q = Queue() # 建立隊列 # 開啓三個生產者進程 p1 = Process(target=producer, args=(q, "shawn", "香腸")) p2 = Process(target=producer, args=(q, "派大星", "熱狗")) p3 = Process(target=producer, args=(q, "海綿寶寶", "雞")) # 開啓兩個消費者進程 c1 = Process(target=consumer, args=(q, "章魚哥")) c2 = Process(target=consumer, args=(q, "蟹老闆")) p1.start() p2.start() p3.start() c1.start() c2.start() # 等待生產者所有生產完畢結束進程 p1.join() p2.join() p3.join() # 主進程再想隊列裏面放入兩個None,當消費者拿到後表明取完了 q.put(None) q.put(None) print("痞老闆:主") '''輸出 shawn:生產了:香腸0 派大星:生產了:熱狗0 章魚哥吃了香腸0 蟹老闆吃了熱狗0 派大星:生產了:熱狗1 shawn:生產了:香腸1 海綿寶寶:生產了:雞0 章魚哥吃了熱狗1 海綿寶寶:生產了:雞1 派大星:生產了:熱狗2 章魚哥吃了雞0 蟹老闆吃了香腸1 shawn:生產了:香腸2 海綿寶寶:生產了:雞2 痞老闆:主 蟹老闆吃了熱狗2 章魚哥吃了雞1 蟹老闆吃了香腸2 章魚哥吃了雞2 Process finished with exit code 0 '''
q = JoinableQueue([maxsize])
: 與 Queue 的對象同樣, 但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的
方法 | 做用 |
---|---|
q.task_done( ) | 使用者使用此方法發出信號,表示q.get( )的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常 |
q.join( ) | 生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止 |
from multiprocessing import Process, JoinableQueue import time, random def producer(q, name, food): for i in range(3): res = f"{food}{i}" q.put(res) time.sleep(random.randint(1, 3)) print(f"\033[1;35m{name}:生產了:{res}\033[0m") q.join() # 等待每一個生產者本身放入的數據被消費者取完才結束該進程 def consumer(q, name): while True: res = q.get() if res == None: break time.sleep(random.randint(1, 3)) print(f"\033[1;36m{name}吃了{res}\033[0m") q.task_done() # 消費者每次取走一個數據都發送一個task_done信號,生產者那邊的計數相應減1 if __name__ == "__main__": q = JoinableQueue() # 建立一個對象 # 建立三個生產者 p1 = Process(target=producer, args=(q, "shawn", "香腸")) p2 = Process(target=producer, args=(q, "派大星", "熱狗")) p3 = Process(target=producer, args=(q, "海綿寶寶", "雞")) # 建立兩個消費者 c1 = Process(target=consumer, args=(q, "章魚哥")) c2 = Process(target=consumer, args=(q, "蟹老闆")) # 將兩個消費者設置成守護進程, 主進程代碼結束,這兩個消費者進程相應結束 c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() # 等待三個生產者進程結束 p1.join() p2.join() p3.join() 🔰#原理分析 : 生產者生產數據, 假設一個生產者生產3個數據帶隊列,每一個相應的計數爲3 🔰#消費者從隊列中取走數據的時候發送task_done信號給生產者,生產者的計數3-1,剩下兩個 🔰#消費者繼續取數據併發送信號,當生產者的計數爲0,表明隊列已經取完了,這時q.join()就再也不進行堵塞,生產者進程結束 🔰#而此時的消費者也已經沒有做用了,將消費者進程設置成守護進程,主進程等待生產者進程結束就結束,消費者進程天然被帶走 '''輸出 shawn:生產了:香腸0 海綿寶寶:生產了:雞0 章魚哥吃了香腸0 派大星:生產了:熱狗0 蟹老闆吃了熱狗0 shawn:生產了:香腸1 海綿寶寶:生產了:雞1 章魚哥吃了雞0 蟹老闆吃了香腸1 shawn:生產了:香腸2 派大星:生產了:熱狗1 章魚哥吃了雞1 蟹老闆吃了熱狗1 海綿寶寶:生產了:雞2 派大星:生產了:熱狗2 章魚哥吃了香腸2 蟹老闆吃了雞2 章魚哥吃了熱狗2 Process finished with exit code 0 '''
互斥鎖同時只容許一個線程修改數據, 而 Semaphore 容許同時有必定數量的進程更改數據, 就像理髮店, 好比只有3個託尼老師, 那最多隻容許3我的同時理髮, 後面的人只能等到有人理完了才能開始, 若是指定信號量爲3, 那麼來一我的得到一把鎖, 計數加1, 當計數等於3時, 後面的人均須要等待 , 一旦釋放, 就有人能夠得到一把鎖
from multiprocessing import Semaphore,Process import time,random def haircut(sem,name): start_time = time.time() sem.acquire() # 加鎖 print(f"{name}開始理髮") time.sleep(random.randint(2,3)) # 模擬理髮時間 print(f"{name}理髮加等待用時%.2f"%(time.time()-start_time)) sem.release() # 解鎖 if __name__ == '__main__': sem = Semaphore(3) # 最大進程數爲3 user_list = [] for i in range(8): p = Process(target=haircut,args=(sem,f"明星{i}")) p.start() user_list.append(p) for obj in user_list: obj.join() print("關門") '''輸出 明星0開始理髮 明星1開始理髮 明星2開始理髮 明星0理髮加等待用時3.00 明星3開始理髮 明星1理髮加等待用時3.00 明星4開始理髮 明星2理髮加等待用時3.00 明星5開始理髮 明星3理髮加等待用時4.93 明星6開始理髮 明星4理髮加等待用時4.87 明星7開始理髮 明星5理髮加等待用時5.82 明星7理髮加等待用時6.69 明星6理髮加等待用時7.74 關門 Process finished with exit code 0 '''
進程死鎖、遞歸鎖與線程死鎖、遞歸鎖同樣, 將統一放在線程一塊兒講, 請參見多線程
參見多線程
😮忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。 😒那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼? 😓首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。 😟第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。 😥所以咱們不能無限制的根據任務去開啓或者結束進程。那麼咱們要怎麼作呢?
😺定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務
😸等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務
😹若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。
😻也就是說,進池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行
😼這樣不會增長操做系統的調度難度,還節省了開關進程的時間,也必定程度上可以實現併發效果。
👉管理進程負責建立資源進程,把工做交給空閒資源進程處理,回收已經處理完工做的資源進程。
😱管理進程如何有效的管理資源進程,分配任務給資源進程? 👉經過IPC,信號,信號量,消息隊列,管道等進行交互。
咱們能夠經過維護一個進程池來控制進程的數目, 好比使用httpd的進程模式, 能夠規定最大進程數和最小進程數, multiprocessing模塊Pool類能夠提供指定數量的進程供用戶調用
Pool([numprocess],[initializer],[initargs])
參數 | 做用 |
---|---|
numprocess | 要建立的進程數,若是省略,將默認使用cpu_count()的值 |
initializer | 每一個工做進程啓動時要執行的可調用對象,默認爲None |
initargs | 傳給initializer的參數組 |
方法 | 做用 |
---|---|
p.apply(func,args,kwargs) | (同步調用)在進程池工做的進程中執行func函數,後面是參數,而後返回結果,若是想要傳入不一樣的參數併發的執行func, 就須要以不一樣的線程去調用p.apply()函數或者使用p.apply_async() |
p.apply_async(func,args,kwargs) | (異步調用)在進程池工做的進程中執行func函數,後面是參數,而後返回結果, 結果是AsyncResult類的實例, 可使用回調函數callback, 將前面funct返回的結果單作參數傳給回調函數 |
p.close( ) | 關閉進程池,防止進一步操做, 若是全部操做持續掛起,它們將在工做進程終止前完成 |
P.jion() | 等待全部工做進程退出。此方法只能在close()或teminate()以後調用,不然報錯 |
如下方法運用於 pply_async()
和 map_async()
的返回值, 返回值是**AsyncResul **實例的對象, 也就是該對象的方法
方法 | 做用 |
---|---|
obj.get( ) | 返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起 |
obj.ready( ) | 若是調用完成,返回True |
obj.successful( ) | 若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常 |
obj.wait([timeout]) | 等待結果變爲可用, 參數是超時時間 |
obj.terminate( ) | 當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是對象被垃圾回收, 將自動調用這個方法 |
from multiprocessing import Pool import time,os,random def test(n): print(f"子進程:{os.getpid()}") time.sleep(2) return n*random.randint(2,9) if __name__ == '__main__': n = os.cpu_count() # 本機CPU個數,個人是4,進程池容量個數自定義,默認CPU核數 p = Pool(processes=n) # 設置進程池進程個數,從無到有,而且之後一直只有這四個進程在執行任務 li = [] start_time = time.time() for i in range(10): res = p.apply(test,args=(2,)) # 建立十個個任務, 使用同步調用的方式 li.append(res) p.close() # 先關閉進程池, 不會再有新的進程加入到pool中, 防止進一步的操做(同步調用能夠不加此方法) p.join() # 必須在close調用以後執行, 不然報錯, 執行後等待全部子進程結束(同步調用能夠不加此方法) print(li) # 同步調用, 獲得的就是最終結果,(異步調用獲得的是對象, 須要使用get方法取值) print(f'使用時間:{time.time()-start_time}') '''輸出 子進程:7768 子進程:16276 子進程:17544 子進程:15680 子進程:7768 子進程:16276 子進程:17544 子進程:15680 子進程:7768 子進程:16276 [4, 18, 14, 14, 12, 14, 16, 14, 6, 10] 使用時間:20.226498126983643 '''
從上面的輸出結果能夠看到,進程一直是那四個 : 776九、1627六、1754四、15680, 而且異步提交須要等待上一個任務結束拿到結果才能進行下一個任務, 因此用時 20 秒多一點
from multiprocessing import Pool import time,os,random def test(n): print(f"子進程:{os.getpid()}") time.sleep(2) return n*n*random.randint(2,9) if __name__ == '__main__': n = os.cpu_count() # 本機CPU個數,個人是4,進程池容量個數自定義,默認CPU核數 p = Pool(processes=n) # 設置進程池大小, 從無到有, 並以後只有這四個進程執行任務 li = [] start_time = time.time() for i in range(10): res = p.apply_async(test,args=(2,)) # 開啓十個任務, 使用異步調用的方式 li.append(res) p.close() # 關閉進程池, 不會再有新的進程加入到pool中, 防止進一步的操做 p.join() # join必須在close函數以後進行, 不然報錯, 執行後等待全部子進程結束 print(li) # 返回的是AsyncResul的對象[<multiprocessing.pool.ApplyResult object at 0x000002318511B408>,....] print([i.get() for i in li]) # 使用get方法來獲取異步調用的值(同步調用沒有該方法),並放入列表中打印 print(f"使用時間:{time.time()-start_time}") '''輸出 子進程:8636 子進程:10828 子進程:7432 子進程:13976 子進程:8636 子進程:10828 子進程:7432 子進程:13976 子進程:8636 子進程:10828 [<multiprocessing.pool.ApplyResult object at 0x000001623059B308>,...省略] [16, 24, 24, 24, 16, 28, 36, 28, 8, 32] 使用時間:6.301024436950684 '''
從上面結果也能看出自始至終都只有四個進程在工做 : 863六、1082八、743二、13976,異步調用方式若是任務進行時遇到阻塞操做將立馬接收其它異步操做中的結果, 若是進程池滿了, 則只能等待任務進行完畢拿到結果, 拿到的結果是 AsyncResul 的對象, 須要使用 get 方法取值, 用時 6 秒多一點
from socket import * from multiprocessing import Pool import os s = socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 重用IP和端口 s.bind(("127.0.0.1",8055)) s.listen(5) def connection(conn): print(f"當前進程:{os.getpid()}") while 1: try: date = conn.recv(1024) if len(date) == 0:break conn.send("阿巴阿巴".encode("utf-8")) except Exception: break if __name__ == '__main__': p = Pool(2) # 不指定,默認本機CPU核數 print("connection....") while 1: conn,addr = s.accept() print(f"已連上{addr}") p.apply_async(connection,args=(conn,))
from socket import * c = socket(AF_INET,SOCK_STREAM) c.connect(("127.0.0.1",8055)) while 1: msg = input("內容>>").strip() if len(msg) == 0:continue c.send(msg.encode("utf-8")) date = c.recv(1024) print(f"服務端的回覆:{date.decode('utf-8')}")
四個客戶端一個服務端 :
啓動五臺機器, 讓四臺客戶端發送信息
前兩臺能發收消息, 後兩臺阻塞原地
服務端顯示兩個進程啓動成功 : 892八、17584, 剩餘兩個阻塞
咱們將前面兩個客戶端進程關閉, 看看進程號是否變化
關閉前兩個客戶端進程以後, 後兩個客戶端進程立馬啓動起來了, 而且發現PID仍是原來的兩個
將第一個函數的指針(也就是內存地址,Python中淡化了指針的概念)做爲參數傳給另外一個函數處理, 這第一個函數就稱爲回調函數
def foo(n): print(f"foo輸出{n}") def Bar(i,func): func(i) for i in range(3): Bar(i,foo) '''輸出 foo輸出0 foo輸出1 foo輸出2 '''
當進程池中一個任務處理完以後, 它去通知主進程本身結束了, 讓主進程處理本身的結果, 因而主進程去調用另外一個函數去處理該結果, 咱們能夠將耗時間或者阻塞的任務放入進程池, 在主進程中指定回調函數, 並由主進程負責執行, 這樣主進程在執行回調函數的時候就省去了I/O的過程, 直接拿到的就是任務的結果
from multiprocessing import Pool import os def get(n): print(f"get--->{os.getpid()}") return n # 返回任務執行的結果 def set(num): # 拿到回調函數的處理結果--->num print(f"set--->{os.getpid()} : {num**2}") if __name__ == '__main__': p = Pool(3) nums = [2,3,4,1] li = [] for i in nums: # 異步調用,並使用callback指定回調函數 res = p.apply_async(get,args=(i,),callback=set) li.append(res) p.close() # 關閉進程池 p.join() # 等待子進程結束 print([ii.get() for ii in li]) # 使用get方法拿到結果 '''輸出 get--->8388 get--->8388 set--->8768 : 4 get--->8388 set--->8768 : 9 get--->8388 set--->8768 : 16 set--->8768 : 1 [2, 3, 4, 1] '''
from multiprocessing import Pool import requests,os def get_htm(url): print(f"進程:{os.getpid()}開始獲取:{url}網頁") response = requests.get(url) if response.status_code == 200: # 若是是200,則獲取成功 return {'url':url,'text':response.text} else: return {'url':url,'text':''} # 有些網頁獲取不到,設置空 def parse_htm(htm_dic): print(f'進程:{os.getpid()}正在處理:{htm_dic["url"]}的text') parse_data = f"url:{htm_dic['url']} size:{len(htm_dic['text'])}" with open("./db.txt","a")as f: # 將URL和對應網頁源碼大小保存到文件 f.write(f"{parse_data}\n") if __name__ == '__main__': urls=[ 'https://zhuanlan.zhihu.com', 'https://www.cnblogs.com', 'https://www.python.org', 'https://blog.csdn.net', 'http://www.china.com.cn', ] p = Pool(3) # 設置進程池最大進程數爲3 li = [] for url in urls: # 異步調用並指定回調函數 res = p.apply_async(get_htm,args=(url,),callback=parse_htm) li.append(res) p.close() # 關閉進程池 p.join() # 等待子進程結束 print([i.get() for i in li]) # 使用get方法獲取結果 '''輸出 進程:11484開始獲取:https://zhuanlan.zhihu.com網頁 進程:17344開始獲取:https://www.cnblogs.com網頁 進程:2688開始獲取:https://www.python.org網頁 進程:11484開始獲取:https://blog.csdn.net網頁 進程:3928正在處理:https://zhuanlan.zhihu.com的text 進程:17344開始獲取:http://www.china.com.cn網頁 進程:3928正在處理:https://www.cnblogs.com的text 進程:3928正在處理:https://blog.csdn.net的text 進程:3928正在處理:http://www.china.com.cn的text 進程:3928正在處理:https://www.python.org的text [{'url': 'https://zhuanlan.zhihu.com', 'text': ''},...一堆網頁源碼的bytes(省略)] '''
from multiprocessing import Pool import re import requests def get_htm(url,format1): response = requests.get(url) if response.status_code == 200: return (response.text,format1) else: return ('',format1) def parse_htm(res): text,format1 = res data_list = re.findall(format1,text) for data in data_list: with open("福布斯排行.txt","a",encoding="utf-8")as f: f.write(f"排名:{data[0]},名字:{data[1]},身價:{data[2]},公司:{data[3]},國家:{data[4]}\n") if __name__ == '__main__': url1 = "https://www.phb123.com/renwu/fuhao/shishi.html" # 使用正則匹配關鍵字 format1 = re.compile(r'<td.*?"xh".*?>(\d+)<.*?title="(.*?)".*?alt.*?<td>(.*?)</td>.*?<td>(.*?)<.*?title="(.*?)"', re.S) url_list = [url1] for i in range(2, 16): # 總共15頁排行,將連接都加進列表裏 url_list.append(f"https://www.phb123.com/renwu/fuhao/shishi_{i}.html") p = Pool() li = [] for url in url_list: res = p.apply_async(get_htm,args=(url,format1),callback=parse_htm) li.append(res) p.close() p.join() print("保存完成")
ps : 若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數