1、multiprocessing 模塊介紹編程
Python 中的多線程沒法利用多核優點,若是想要充分地使用多核 CPU 的資源(os.cpu_count()查看),在 Python 中大部分狀況須要使用多進程。Python提供了 multiprocessing。json
multiprocessing 模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊 threading 的編程接口相似。安全
multiprocessing 模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了 Process、Queue、Pipe、Lock 等組件。網絡
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。數據結構
2、Process 類的介紹多線程
一、建立進程的類併發
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,可用來開啓一個子進程 強調: 1. 須要使用關鍵字的方式來指定參數 2. args 指定的爲傳給 target 函數的位置參數,是一個元組形式,必須有逗號
二、參數介紹app
group參數未使用,值始終爲None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'qiu',) kwargs表示調用對象的字典,kwargs={'name':'qiu','age':18} name爲子進程的名稱
三、方法介紹dom
p.start():啓動進程,並調用該子進程中的 p.run()
p.run():進程啓動時運行的方法,正是它去調用 target 指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate(): 強制終止進程 p,不會進行任何清理操做,若是 p 建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。
若是 p 還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive(): 若是 p 仍然運行,返回 True
p.join([timeout]): 主線程等待 p 終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,
須要強調的是,p.join只能join住 start 開啓的進程,而不能 join 住 run 開啓的進程
四、屬性介紹異步
p.daemon:默認值爲 False,若是設爲 True,表明 p 爲後臺運行的守護進程,當 p 的父進程終止時,p 也隨之終止,而且設定爲 True 後,p 不能建立本身的新進程,必須在 p.start() 以前設置 p.name: 進程的名稱 p.pid:進程的pid p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) p.authkey: 進程的身份驗證鍵,默認是由 os.urandom() 隨機生成的 32 字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
3、Process類的使用
注意:在 Windows 中 Process() 必須放到 if __name__ == '__main__': 下
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()的內部調用,使用if __name__ == 「__main __」,這個if語句 中的語句將不會在導入時被調用。
from multiprocessing import Process import time def task(name): print("%s is running" %name) time.sleep(3) print("%s is done" %name) if __name__ == '__main__': p = Process(target=task, args=("qiu",)) # p = Process(target=task, kwargs={"name": "qiu"})
# p.start()只是向操做系統發送了一個開啓子進程的信號, 操做系統才能開啓子進程,
# 涉及到申請內存空間, 要將父進程的數據拷貝到子進程, 要將CPU調到子進程裏運行子進程的代碼
# 纔會有 is running的顯示, 這都是一系列的硬件操做
# 因此print("主")這行代碼運行速度要快一些
p.start() print("主")
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("%s is running" %self.name) time.sleep(3) print("%s is done" %self.name) if __name__ == '__main__': p = MyProcess("qiu") p.start() print("主")
4、join方法
在主進程運行過程當中若是想要併發的執行其餘任務,咱們能夠開啓子進程,此時主進程的任務和子進程的任務分爲兩種狀況:
一種狀況是:在主進程的任務與子進程的任務彼此獨立的狀況下,主進程的任務先執行完畢後,主進程還須要等待子進程執行完畢,而後統一回收資源
還有一種狀況是:若是主進程的任務在執行到某一個階段時,須要等待子進程執行完畢後才能繼續執行,就須要一種機制可以讓主進程監測子進程是否運行完畢,在子進程執行完畢後才繼續執行,不然一直在原地阻塞,這就是 join 方法的做用。
from multiprocessing import Process import time def task(name, n): print("%s is running" %name) time.sleep(n) print("%s is done" %name) if __name__ == '__main__': p1 = Process(target=task, args=("Process 1", 1)) p2 = Process(target=task, args=("Process 2", 2)) p3 = Process(target=task, args=("Process 3", 3)) start = time.time() p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print("主進程", time.time() - start)
人會有疑問,既然 join 是等待進程結束,那麼我像下面 join 下去,進程不就變成串行了的嗎?
固然不是了,必須明確 join 是讓誰等:進程只要 start 就會在開始運行了,因此 p1 到 p3.start() 時,系統中已經有三個併發的進程了,而 p1.join() 是在等 p1 結束,p1 只要不結束主線程就會一直卡在原地,這也是問題的關鍵。join 是讓主線程等,而 p1-p3 仍然是併發執行的,p1.join() 的時候,其他 p2,p3 仍然在運行,等 p1.join() 結束,可能 p2,p3 早已經結束了,這樣 p2.join(),p3.join() 直接經過檢測,無需等待。因此 3 個 join 花費的總時間仍然是耗費時間最長的那個進程運行的時間,因此這裏即使交換 join 的順序,執行的時間仍然是 3 秒多一點,多出來的那零點幾秒是開啓進程以及進程切換的時間。
from multiprocessing import Process import time def task(name, n): print("%s is running" %name) time.sleep(n) print("%s is done" %name) if __name__ == '__main__': p1 = Process(target=task, args=("Process 1", 1)) p2 = Process(target=task, args=("Process 2", 2)) p3 = Process(target=task, args=("Process 3", 3)) start = time.time() p1.start() p2.start() p3.start() p3.join() p1.join() p2.join() print("主進程", time.time() - start)
join 是讓主進程在原地等待,等待子進程運行完畢,不會影響子進程的執行
上面的代碼可使用 for 循環簡寫
from multiprocessing import Process import time def task(name, n): print("%s is running" %name) time.sleep(n) print("%s is done" %name) if __name__ == '__main__': start = time.time() p_l = [] for i in range(1, 4): p = Process(target=task, args=("Process %s" %i, i)) p_l.append(p) p.start() for p in p_l: p.join() print("主進程", time.time() - start)
進程間的內存空間互相隔離
from multiprocessing import Process n = 100
def task(): global n n = 0 if __name__ == '__main__': p = Process(target=task) p.start() p.join() print("主進程內的:", n)
5、殭屍進程與孤兒進程
殭屍進程:一個進程使用 fork 建立子進程,若是子進程退出,而父進程並無調用 wait 或 waitpid 獲取子進程的狀態信息,那麼子進程的進程描述符仍然保存在系統中。這種進程稱之爲僵死進程
咱們知道在 Unix/Linux 中,正常狀況下子進程是經過父進程建立的,子進程在建立新的進程。子進程的結束和父進程的運行是一個異步過程,即父進程永遠沒法預測子進程到底何時結束,若是子進程一結束就馬上回收其所有資源,那麼在父進程內將沒法獲取子進程的狀態信息。所以,Unix 提供了一種機制能夠保證父進程能夠在任意時刻獲取子進程結束時的狀態信息:
一、在每一個進程退出的時候,內核釋放該進程全部的資源,包括打開的文件,佔用的內存等。可是仍然爲其保留必定的信息(包括進程號、退出狀態、運行時間等)
二、直到父進程經過 wait/waitpid 來取時才釋放。但這樣就致使了問題,若是進程不調用 wait/waitpid 的話,那麼保留的那段信息就不會釋放,其進程號就會一直被佔用,可是系統所能使用的進程號是有限的,若是大量的產生僵死進程,將由於沒有可用的進程號而致使系統不能產生新的進程。此即爲殭屍進程的危害,應當避免。
任何一個子進程(init除外)在 exit() 以後,並不是立刻就消失掉,而是留下一個稱爲殭屍進程(Zombie)的數據結構,等待父進程處理。這是每一個子進程在結束時都要通過的階段。若是子進程在 exit() 以後,父進程沒有來得及處理,這時用 ps 命令就能看到子進程的狀態是 「Z」 。若是父進程能及時 處理,可能用 ps 命令就來不及看到子進程的殭屍狀態,但這並不等於子進程不通過殭屍狀態。 若是父進程在子進程結束以前退出,則子進程將由 init 接管。init 將會以父進程的身份對殭屍狀態的子進程進行處理。
孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成爲孤兒進程。孤兒進程將被 init 進程(進程號爲 1)所收養,並由 init 進程對它們完成狀態收集工做。
孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了 init 進程身上,init 進程就好像是一個民政局,專門負責處理孤兒進程的善後工做。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置爲 init,而 init 進程會循環地 wait() 它的已經退出的子進程。這樣,當一個孤兒進程淒涼地結束了其生命週期的時候,init 進程就會表明黨和政府出面處理它的一切善後工做。所以孤兒進程並不會有什麼危害。
6、守護進程
主進程建立子進程,而後將該進程設置成守護本身的進程,守護進程就比如皇帝身邊的老太監,皇帝一死老太監就跟着殉葬。
關於守護進程須要強調兩點:
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
若是咱們有兩個任務須要併發執行,那麼開一個主進程和一個子進程分別去執行就 ok 了,若是子進程的任務在主進程任務結束後就沒有存在的必要了,那麼該子進程應該在開啓前就被設置成守護進程。主進程代碼運行結束,守護進程隨即終止
from multiprocessing import Process import time import random def task(name): print('%s is running' % name) time.sleep(random.randrange(1, 3)) print('%s is done' % name) if __name__ == '__main__': p = Process(target=task, args=('qiu',)) p.daemon = True # 必定要在p.start()前設置, 設置p爲守護進程, 禁止p建立子進程, 而且父進程代碼執行結束, p即終止運行
p.start() print('主') # 只要終端打印出這一行內容, 那麼守護進程p也就跟着結束掉了
7、互斥鎖
進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或者打印終端是沒有問題的,可是帶來的是競爭,競爭帶來的結果是錯亂,以下:多個進程模擬多我的執行搶票任務
# db.json文件內容:
{"count": 1} import json, time, random from multiprocessing import Process def search(name): with open("db.json", "rt", encoding="utf-8") as f: dic = json.load(f) time.sleep(1) # 模擬查看票數的網絡延遲
print("%s查看到餘票爲%s張" %(name, dic["count"])) def get(name): with open("db.json", "rt", encoding="utf-8") as f: dic = json.load(f) if dic["count"] > 0: dic["count"] -= 1 time.sleep(random.randint(1, 3)) # 模擬買票的網絡延遲
with open("db.json", "wt", encoding="utf-8") as f: json.dump(dic, f) print("%s購票成功" %name) else: print("%s查看到沒有票了" %name) def task(name): search(name) get(name) if __name__ == '__main__': for i in range(10): p = Process(target=task, args=("路人%s " %i,)) p.start() # 運行結果
路人0 查看到餘票爲1張 路人1 查看到餘票爲1張 路人2 查看到餘票爲1張 路人3 查看到餘票爲1張 路人4 查看到餘票爲1張 路人5 查看到餘票爲1張 路人6 查看到餘票爲1張 路人7 查看到餘票爲1張 路人9 查看到餘票爲1張 路人8 查看到餘票爲1張 路人0 購票成功 路人3 購票成功 路人4 購票成功 路人7 購票成功 路人9 購票成功 路人1 購票成功 路人2 購票成功 路人6 購票成功 路人8 購票成功 路人5 購票成功
能夠看到,它們10我的是併發運行,效率高,但競爭寫同一文件,數據寫入錯亂,只有一張票,卻成功賣給了10我的。以前學到 join,但 join 的操做至關於指定了買票的順序,只能由第一我的買,並不能保證數據的安全性。
因此要進行加鎖處理。而互斥鎖的意思就是互相排斥,若是把多個進程比喻爲多我的,互斥鎖的工做原理就是多我的都要去爭搶同一個資源:衛生間,一我的搶到衛生間後上一把鎖,其餘人都要等着,等到這個完成任務後釋放鎖,其餘人才有可能有一個搶到......因此互斥鎖的原理,就是把部分的併發改爲串行,下降了效率,但保證了數據安全不錯亂
import json, time, random from multiprocessing import Process, Lock def search(name): with open("db.json", "rt", encoding="utf-8") as f: dic = json.load(f) time.sleep(1) # 模擬查看票數的網絡延遲
print("%s查看到餘票爲%s張" %(name, dic["count"])) def get(name): with open("db.json", "rt", encoding="utf-8") as f: dic = json.load(f) if dic["count"] > 0: dic["count"] -= 1 time.sleep(random.randint(1, 3)) # 模擬買票的網絡延遲
with open("db.json", "wt", encoding="utf-8") as f: json.dump(dic, f) print("%s 購票成功" %name) else: print("%s 查看到沒有票了" %name) def task(name, mutex): search(name) # 併發
mutex.acquire() # 加鎖
get(name) # 串行
mutex.release() # 釋放鎖
# with mutex: # 至關於mutex.acquire(),執行完子代碼塊自動執行mutex.release()
# get(name)
if __name__ == '__main__': mutex = Lock() for i in range(10): p = Process(target=task, args=("路人%s " %i, mutex)) p.start() # 運行結果
路人0 查看到餘票爲1張 路人1 查看到餘票爲1張 路人3 查看到餘票爲1張 路人2 查看到餘票爲1張 路人4 查看到餘票爲1張 路人5 查看到餘票爲1張 路人6 查看到餘票爲1張 路人7 查看到餘票爲1張 路人8 查看到餘票爲1張 路人9 查看到餘票爲1張 路人0 購票成功 路人1 查看到沒有票了 路人3 查看到沒有票了 路人2 查看到沒有票了 路人4 查看到沒有票了 路人5 查看到沒有票了 路人6 查看到沒有票了 路人7 查看到沒有票了 路人8 查看到沒有票了 路人9 查看到沒有票了
互斥鎖與 join
使用 join 能夠將併發變成串行,互斥鎖的原理也是將併發變成串行,那咱們直接使用 join 就能夠了啊,爲什麼還要互斥鎖?
import json, time, random from multiprocessing import Process def search(name): with open("db.json", "rt", encoding="utf-8") as f: dic = json.load(f) time.sleep(1) # 模擬查看票數的網絡延遲
print("%s查看到餘票爲%s張" %(name, dic["count"])) def get(name): with open("db.json", "rt", encoding="utf-8") as f: dic = json.load(f) if dic["count"] > 0: dic["count"] -= 1 time.sleep(random.randint(1, 3)) # 模擬買票的網絡延遲
with open("db.json", "wt", encoding="utf-8") as f: json.dump(dic, f) print("%s 購票成功" %name) else: print("%s 查看到沒有票了" %name) def task(name): search(name) get(name) if __name__ == '__main__': for i in range(10): p = Process(target=task, args=("路人%s " %i,)) p.start() p.join() # 運行結果
路人0 查看到餘票爲0張 路人0 查看到沒有票了 路人1 查看到餘票爲0張 路人1 查看到沒有票了 路人2 查看到餘票爲0張 路人2 查看到沒有票了 路人3 查看到餘票爲0張 路人3 查看到沒有票了 路人4 查看到餘票爲0張 路人4 查看到沒有票了 路人5 查看到餘票爲0張 路人5 查看到沒有票了 路人6 查看到餘票爲0張 路人6 查看到沒有票了 路人7 查看到餘票爲0張 路人7 查看到沒有票了 路人8 查看到餘票爲0張 路人8 查看到沒有票了 路人9 查看到餘票爲0張 路人9 查看到沒有票了
發現使用 join 將併發改爲串行,確實能保證數據安全,但問題是連查票操做也變成只能一個一我的去查了,很明顯你們查票時應該是併發的去查詢而無需考慮數據準確與否,此時 join 與互斥鎖的區別就顯而易見了,join 是將一個任務總體串行,而互斥鎖的好處則是能夠將一個任務中的某一段代碼串行
總結:
加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行地修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
一、效率低(共享數據基於文件,而文件是硬盤上的數據)
二、須要本身加鎖處理
所以咱們最好找尋一種解決方案可以兼顧:
一、效率高(多個進程共享一塊內存的數據)
二、幫咱們處理好鎖問題
這就是 mutiprocessing 模塊爲咱們提供的基於消息的 IPC 通訊機制:隊列和管道。
隊列和管道都是將數據存放於內存中,而隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,於是隊列纔是進程間通訊的最佳選擇。
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
8、隊列
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing 模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
from multiprocessing import Queue q = Queue(3) # put ,get ,put_nowait,get_nowait,full,empty
q.put(1) q.put(2) q.put(3) print(q.full()) # 滿了 # q.put(4) # 再放就阻塞住了
print(q.get()) print(q.get()) print(q.get()) print(q.empty()) # 空了 # print(q.get()) # 再取就阻塞住了