一. multiprocess模塊 仔細說來,multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:建立進程部分,進程同步部分,進程池部分,進程之間數據共享。重點強調:進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內,可是經過一些特殊的方法,能夠實現進程之間數據的共享。 1. Process模塊 Process是穿件進程的模塊, 藉助這個模塊, 能夠實現進程的建立 Process([group [, target [, name [, args [, kwargs]]]]])由該類實例化一個對象, 表示一個子進程中的任務(還沒有啓動) 強調: (1). 須要使用關鍵字的方式來指定參數 (2). args指定的爲傳給target函數的位置參數, 是一個元祖形式, 必須有逗號 (1). 看一個程序實例: from multiprocessing import Process def func(): print(12345) # 當咱們運行當前這個test.py文件的時候, 就產生了進程, 這個進程咱們稱之爲主進程 if __name__ == '__main__': # 將函數註冊到一個進程中, p是一個進程對象, 此時尚未啓動進程, 只是建立了一個進程對象, 而且func是不加括號的的, 由於加上括號就直接運行了 p = Process(target=func, ) # 告訴操做系統, 給我開啓一個進程, func這個函數就被咱們新開的這個進程執行了, 而這個進程是我主程序建立出來的因此稱這個新建立的進程爲主進程的子進程, 而主進程又能夠稱之爲這個新進程的父進程 # 而這個子進程中執行的程序,至關於將如今這個test.py文件中的程序copy到一個你看不到的python文件中去執行了,就至關於當前這個文件,被另一個py文件import過去並執行了。 # start並非直接就去執行了,咱們知道進程有三個狀態,進程會進入進程的三個狀態,就緒,(被調度,也就是時間片切換到它的時候)執行,阻塞,而且在這個三個狀態之間不斷的轉換,等待cpu執行時間片到了。 p.start() # 這是主進程的程序,上面開啓的子進程的程序是和主進程的程序同時運行的,咱們稱爲異步 print("*"*10) (2). 上面說了,咱們經過主進程建立的子進程是異步執行的,那麼咱們就驗證一下,而且看一會兒進程和主進程(也就是父進程)的ID號(講一下pid和ppid,使用pycharm舉例),來看看是不是父子關係。 import time import os # os.getpid() # 獲取本身的進程號 # os.getppid() # 獲取本身進程的父進程的ID號 from multiprocessing import Process def func(): print("aaaaa") time.sleep(1) print("子進程>>>", os.getpid()) print("該子進程的父進程>>>", os.getppid()) print(12345) if __name__ == '__main__': p = Process(target=func, ) p.start() print("*"*10) print("父進程>>>", os.getpid()) print("父進程的父進程>>>", os.getppid()) # ********** # 首先打印出來了主進程的程序,而後打印的是子進程的,也就是子進程是異步執行的,至關於主進程和子進程同時運行着,若是是同步的話,咱們先執行的是func(),而後再打印主進程最後的10個*號。 # 父進程>>> 9044 # 父進程的父進程>>> 9528 #我運行的test.py文件的父進程號,它是pycharm的進程號 # aaaaa # 子進程>>> 10476 # 該子進程的父進程>>> 9044 #是我主進程的ID號,說明主進程爲它的父進程 # 12345 (3). 看一個問題,說明linux和windows兩個不一樣的操做系統建立進程的不一樣機制致使的不一樣結果: import time import os from multiprocessing import Process def func(): print('aaaa') time.sleep(1) print('子進程>>',os.getpid()) print('該子進程的父進程>>',os.getppid()) print(12345) print('太白老司機') """若是我在這裏加了一個打印,你會發現運行結果中會出現兩次打印出來的太白老司機,由於咱們在主進程中開了一個子進程,子進程中的程序至關於import的主進程中的程序,那麼import的時候會不會執行你import的那個文件的程序啊,前面學的,是會執行的,因此出現了兩次打印, 實際上是由於windows開起進程的機制決定的,在linux下是不存在這個效果的,由於windows使用的是process方法來開啓進程,他就會拿到主進程中的全部程序,而linux下只是去執行我子進程中註冊的那個函數,不會執行別的程序,這也是爲何在windows執行程序的時候,要加上if __name__ == '__main__':,不然會出現子進程中運行的時候還開啓子進程,那就出現無限循環的建立進程了,就報錯了""" if __name__ == '__main__': p = Process(target=func, ) p.start() print("*"*10) print("父進程>>>", os.getpid()) print("父進程的父進程>>>", os.getppid()) # 太白老司機 # ********** # 父進程>>> 11204 # 父進程的父進程>>> 9528 # 太白老司機 # aaaa # 子進程>> 6644 # 該子進程的父進程>> 11204 # 12345 (4). 一個進程的生命週期:若是子進程的運行時間長,那麼等到子進程執行結束程序才結束,若是主進程的執行時間長,那麼主進程執行結束程序才結束,實際上咱們在子進程中打印的內容是在主進程的執行結果中看不出來的,可是pycharm幫咱們作了優化,由於它會識別到你這是開的子進程,幫你把子進程中打印的內容打印到了顯示臺上。 若是說一個主進程運行完了以後,咱們把pycharm關了,可是子進程尚未執行結束,那麼子進程還存在嗎?這要看你的進程是如何配置的,若是說咱們沒有配置說我主進程結束,子進程要跟着結束,那麼主進程結束的時候,子進程是不會跟着結束的,他會本身執行完,若是我設定的是主進程結束,子進程必須跟着結束,那麼就不會出現單獨的子進程(孤兒進程)了,具體如何設置,看下面的守護進程的講解。好比說,咱們未來啓動項目的時候,可能經過cmd來啓動,那麼我cmd關閉了你的項目就會關閉嗎,不會的,由於你的項目不能中止對外的服務,對吧 (5). Process類中參數的介紹: 參數介紹: ①. group參數未使用,值始終爲None ②. target表示調用對象,即子進程要執行的任務 ③. args表示調用對象的位置參數元組,args=(1,2,'egon',) ④. kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} ⑤. name爲子進程的名稱 給要執行的函數傳參數: def func(x,y): print(x) time.sleep(1) print(y) if __name__ == '__main__': p = Process(target=func,args=('姑娘','來玩啊!'))#這是func須要接收的參數的傳送方式。 p.start() print('父進程執行結束!') #執行結果: 父進程執行結束! 姑娘 來玩啊! (6). Process類中各方法的介紹: ①. p.start():啓動進程,並調用該子進程中的p.run() ②. p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 ③. p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 ④. p.is_alive():若是p仍然運行,返回True ⑤. p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程 jion方法的例子: 讓主進程加上join的地方等待(也就是阻塞住), 等待子程序執行完以後, 在繼續往下執行主進程, 不少時候, 咱們主進程須要子進程的執行結果, 因此必需要等待, join有點像把子程序和主程序拼接起來, 將異步改成同步執行 import time from multiprocessing import Process def func(x, y): print(x) time.sleep(1) print(y) if __name__ == "__main__": p = Process(target=func, args=("姑娘", "來玩啊")) p.start() print("這裏是異步的") p.join() print("父程序執行結束") # 打印結果 這裏是異步的 姑娘 來玩啊 父程序執行結束 用for循環開啓多個線程: import time import os from multiprocessing import Process def func(x, y): print(x) # time.sleep(1) print(y) if __name__ == '__main__': p_list = [] for i in range(10): p = Process(target=func, args=("姑娘%s"%i, "來玩啊")) p_list.append(p) p.start() # 一、若是加到for循環裏面,那麼全部子進程包括父進程就所有變爲同步了,由於for循環也是主進程的,循環第一次的時候,一個進程去執行了,而後這個進程就join住了,那麼for循環就不會繼續執行了,等着第一個子進程執行結束纔會繼續執行for循環去建立第二個子進程。 # 二、若是我不想這樣的,也就是我想全部的子進程是異步的,而後全部的子進程執行完了再執行主進程 p.join() # 四、這是解決辦法,前提是咱們的子進程所有都已經去執行了,那麼我在一次給全部正在執行的子進程加上join,那麼主進程就須要等着全部子進程執行結束纔會繼續執行本身的程序了,而且保障了全部子進程是異步執行的。 [ap.join() for ap in p_list] # 三、若是這樣寫的話,屢次運行以後,你會發現會出現主進程的程序比一些子進程先執行完,由於咱們p.join()是對最後一個子進程進行了join,也就是說若是這最後一個子進程先於其餘子進程執行完,那麼主進程就會去執行,而此時若是還有一些子進程沒有執行完,而主進程執行完了,那麼就會先打印主進程的內容了,這個cpu調度進程的機制有關係,由於咱們的電腦可能只有4個cpu,個人子進程加上住進程有11個,雖然我for循環是按順序起進程的,可是操做系統必定會按照順序給你執行你的進程嗎,答案是不會的,操做系統會按照本身的算法來分配進程給cpu去執行,這裏也解釋了咱們打印出來的子進程中的內容也是沒有固定順序的緣由,由於打印結果也須要調用cpu,能夠理解成進程在爭搶cpu,若是同窗你想問這是什麼算法,這就要去研究操做系統啦。那咱們的想全部子進程異步執行,而後再執行主進程的這個需求怎麼解決啊 p.join() print("不要錢") 模擬兩個應用場景, 1. 同時對一個文件進行寫操做, 2. 同時建立多個文件 import time import os import re from multiprocessing import Process # 多進程同時對一個文件進行寫操做 # def func(x, y, i): # with open(x, "a", encoding = "utf-8") as f: # print("當前進程%s拿到的文件的光標位置>>%s" % (os.getpid(), f.tell())) # f.write("%s\n"%y) # 多線程同時建立多個文件 def func(x, y): with open(x, "w", encoding="utf-8") as f: f.write(y) if __name__ == '__main__': p_list = [] for i in range(10): # p = Process(target=func, args=("can_do_girl_lists.txt", "姑娘%s"%(i+1), i+1)) p = Process(target=func, args=("girl/can_do_girl_lists_%s.txt"%(i+1), "姑娘%s"%(i+1))) p_list.append(p) p.start() [ap.join() for ap in p_list] # with open("can_do_girl_lists.txt", "r", encoding="utf-8") as f: # data = f.read() # all_num = re.findall("\d+", data) # print(">>>>>", all_num, ".....%s"%(len(all_num))) # print([i for i in os.walk(r"D:/1PY/Day30")]) print("不要錢") (7). Process類中自帶封裝的各屬性的介紹 ①. p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置 ②. p.name:進程的名稱 ③. p.pid:進程的pid ④. p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) ⑤. p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可) 2. Process類的使用 注意: 在windows中Process()必須放到 if __name__ == "__main__": 下 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。 進程建立的第二種方法(繼承): (1). 進程建立的第二種方法: import os from multiprocessing import Process class MyProcess(Process): def __init__(self, person): super().__init__() # 必須執行父類的__init__方法 self.person = person def run(self): print(os.getpid()) print(self.pid) print("%s正在和女主播聊天" % self.person) # def start(self): # self.run() # print("我是%s" % self.person) if __name__ == '__main__': p1 = MyProcess("Jedan") p2 = MyProcess("太白") p3 = MyProcess("alexDSB") p1.start() p2.start() p2.run() p3.start() p1.join() p2.join() p3.join() (2). 進程之間的數據是隔離的 from multiprocessing import Process n = 100 def work(): global n n = 0 print("子進程內:", n) # print(n) if __name__ == '__main__': p = Process(target=work) p.start() p.join() #等待子進程執行完畢,若是數據共享的話,我子進程是否是經過global將n改成0了,可是你看打印結果,主進程在子進程執行結束以後,仍然是n=100,子進程n=0,說明子進程對n的修改沒有在主進程中生效,說明什麼?說明他們之間的數據是隔離的,互相不影響的 print("主進程內:", n) # 子進程內: 0 # 主進程內: 100 (3). 多線程實現多個客戶端通訊 (4). is_alive(), terminate() import time from multiprocessing import Process class Piao(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("%s is 打飛機" % self.name) s = input("!!!") # 在pycharm下子進程中不能input輸入, 會報錯 EOFError: EOF when reading a line, 由於子進程中沒有像咱們主進程這樣的在pycharm下的控制檯能夠輸入東西的地方 time.sleep(2) print("%s is 打飛機結束" % self.name) if __name__ == '__main__': p1 = Piao("太白") p1.start() p1.join() # time.sleep(5) p1.terminate() # 關閉進程, 不會當即關閉, 有個等着操做系統去關閉這個進程的時間, 因此is_alive馬上查看的結果可能仍是存活, 可是稍微等一會就關閉了 print(p1.is_alive()) # 查看子程序是否還存活 print("等會...") time.sleep(1) print(p1.is_alive()) (5). 殭屍進程(有害)和孤兒進程(無害) 殭屍進程: 一個進程使用fork建立子進程, 若是子程序退出, 而父程序沒有調用wait或waitpid獲取子進程的狀態消息, 那麼子進程的進程描述符任然保存在系統中, 這個進程稱之爲殭屍進程 任何一個子進程(init除外)在exit()以後,並不是立刻就消失掉,而是留下一個稱爲殭屍進程(Zombie)的數據結構,等待父進程處理。這是每一個子進程在結束時都要通過的階段。若是子進程在exit()以後,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是「Z」。若是父進程能及時 處理,可能用ps命令就來不及看到子進程的殭屍狀態,但這並不等於子進程不通過殭屍狀態。 若是父進程在子進程結束以前退出,則子進程將由init接管。init將會以父進程的身份對殭屍狀態的子進程進行處理。 孤兒進程: 一個父進程退出, 而它的一個或多個子進程還在運行, 那麼那些子進程將會成爲孤兒進程, 孤兒進程將會被init進程(進程號爲1)所收養, 並由init進程對他們完成狀態收集工做 殭屍進程的危害場景: 例若有個進程,它按期的產 生一個子進程,這個子進程須要作的事情不多,作完它該作的事情以後就退出了,所以這個子進程的生命週期很短,可是,父進程只管生成新的子進程,至於子進程 退出以後的事情,則一律漠不關心,這樣,系統運行上一段時間以後,系統中就會存在不少的僵死進程,假若用ps命令查看的話,就會看到不少狀態爲Z的進程。 嚴格地來講,僵死進程並非問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。所以,當咱們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是經過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元兇進程以後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們佔用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。 3. 守護進程 若是主進程結束了, 由主進程建立的子進程必須跟着結束, 這時就須要守護進程 主進程建立守護進程: 其一. 守護進程會在主進程代碼執行結束後就終止 其二. 守護進程內沒法再開啓子進程, 不然會拋出異常AssertionError: daemonic processes are not allowed to have children 注意: 進程之間是相互獨立的, 主進程代碼運行結束, 守護進程隨即終止 import os, time from multiprocessing import Process class MyProcess(Process): def __init__(self, person): super().__init__() self.name = name deg run(self): print(os.getpid(), self.name) print("%s正在和女主播聊天" % self.name) time.sleep(3) if __name__ == "__mian__": p = MyProcess("太白") p.daemon = True # 必定要在p.start()以前設置p爲守護進程, 禁止p建立子進程, 而且父進程代碼執行結束, p即終止運行 p.start() print("寶") 4. 進程同步(鎖) 利用併發編程能夠更加充分的利用io資源, 但也帶來了新的問題: 進程之間的數據不共享, 可是共享同一套文件系統, 因此訪問同一個文件或者同一個打印終端, 是沒有問題的, 而共享帶來的是競爭, 競爭帶來的是錯亂, 如何控制, 就是枷鎖 (1). 多進程搶佔輸出資源, 致使打印混亂的示例: import os, time, random from multiprocessing import Process def work(n): print("%s:%s is running" % (n, os.getpid())) time.sleep(random.randint(1,3)) print("%s:%s is done" % (n, os.getpid())) if __name__ == '__main__': for i in range(5): p = Process(target = work, args = (i,)) p.start() # 3:6716 is running # 4:10220 is running # 0:5524 is running # 1:3164 is running # 2:10036 is running # 3:6716 is done # 4:10220 is done # 1:3164 is done # 0:5524 is done # 2:10036 is done 兩個問題: 一. 每一個進程中work函數的第一個打印就不是按照咱們for循環的順序來打印的 二. 每一個work都要兩個打印, 但時第一個打印的順序是3-4-0-1-2, 而第二個打印的順序是3-4-1-0-2, 說明咱們一個進程中的程序順序都亂了 第二個問題能夠經過枷鎖來解決, 第一個問題是無法解決的, 由於進程開到了內核, 由操做系統來決定進程的調度, 沒法控制 (2). 加鎖, 由併發改成了串行 import os, time from multiprocessing import Process, Lock def work(n, l): # 加鎖, 保證每一次只有一個進程在執行鎖裏面的程序, 這一段程序對於全部寫上這個鎖的進程, 你們都變成了串行 lock.acquire() print("%s:%s is running" % (n, os.getpid())) time.sleep(1) print("%s:%s is done" % (n, os.getpid())) # 解鎖, 解鎖以後其餘進程才能去執行本身的程序 lock.release() if __name_ == "__main__": lock = Lock() for i in range(5): p = Process(target=work, args=(i, l)) p.start() # 2:4032 is running # 2:4032 is done # 0:8444 is running # 0:8444 is done # 4:2872 is running # 4:2872 is done # 3:7480 is running # 3:7480 is done # 1:5196 is running # 1:5196 is done 結果分析:(本身去屢次運行一下,看看結果,我拿出其中一個結果來看)經過結果咱們能夠看出,多進程剛開始去執行的時候,每次運行,首先打印出來哪一個進程的程序是不固定的,可是咱們解決了上面打印混亂示例代碼的第二個問題,那就是同一個進程中的兩次打印都是先完成的,而後才切換到下一個進程去,打印下一個進程中的兩個打印結果,說明咱們控制住了同一進程中的代碼執行順序,若是涉及到多個進程去操做同一個數據或者文件的時候,就不擔憂數據算錯或者文件中的內容寫入混亂了。 上面這種狀況雖然使用加鎖的形式實現了順序的執行, 可是程序又從新變成串行了, 這樣確實會浪費了時間, 可是卻保證了數據的安全 (3). 模擬搶票 import json, time from multiprocessing import Process, Lock def check(n): ticket_dic = json.load(open("ticketinfo.json", "r", encoding="utf-8")) print(ticket_dic) print("%s查看了餘票數, 尚有餘票%s張" % (n, ticket_dic["餘票"])) def buy(n): ticket_dic = json.load(open("ticketinfo.json", "r", encoding="utf-8")) if ticket_dic["餘票"]>0: time.sleep(1) ticket_dic["餘票"] -= 1 json.dump(ticket_dic, open("ticketinfo.json", "w", encoding="utf-8"), ensure_ascii=False) print("%s購票成功" % n) else: print("沒票了") def task(n, lock): # def task(n): check(n) lock.acquire() buy(n) lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task, args=(i, lock)) # p = Process(target=task, args=(i, )) p.start() # {'餘票': 2} # 1查看了餘票數, 尚有餘票2張 # {'餘票': 2} # 2查看了餘票數, 尚有餘票2張 # {'餘票': 2} # 0查看了餘票數, 尚有餘票2張 # {'餘票': 2} # 3查看了餘票數, 尚有餘票2張 # {'餘票': 2} # 4查看了餘票數, 尚有餘票2張 # 1購票成功 # 2購票成功 # 沒票了 # 沒票了 # 沒票了 進程鎖總結: 加鎖能夠保證多個進程修改同一塊數據時, 同一時間只能有一個任務能夠進行修改, 即串行的修改, 沒錯, 速度是慢了, 但保證了數據安全. 雖然能夠用文件共享數據實現進程間通訊, 但問題是: 1效率低(共享數據基於文件, 而文件是硬盤上的數據). 2須要本身加鎖處理 所以咱們最好找一種解決方案可以兼顧: 1效率高(多個進程共享一塊內存的數據) 2幫咱們處理好問題, 這就是multiprocessing模塊爲咱們提供的基於消息的IPC通訊機制: 隊列和管道 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。 IPC通訊機制(瞭解):IPC是intent-Process Communication的縮寫,含義爲進程間通訊或者跨進程通訊,是指兩個進程之間進行數據交換的過程。IPC不是某個系統所獨有的,任何一個操做系統都須要有相應的IPC機制, 好比Windows上能夠經過剪貼板、管道和郵槽等來進行進程間通訊,而Linux上能夠經過命名共享內容、信號量等來進行進程間通訊。Android它也有本身的進程間通訊方式,Android建構在Linux基礎上,繼承了一 部分Linux的通訊方式。 5. 隊列 進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。隊列就像一個特殊的列表,可是能夠設置固定長度,而且從前面插入數據,從後面取出數據,先進先出。 Queue([maxsize]) 建立共享的進程隊列。 參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。 底層隊列使用管道和鎖實現。 (1). 方法介紹 q = Queue([maxsize]) 建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具備如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Full 異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。 q.qsize() 返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。 q.empty() 若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目 q.full() 若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。 q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。 q.cancel_join_thread() 不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。 q.join_thread() 鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。 (2). 隊列是進程安全的, 同一時間只能一個進程拿到隊列中的數據 例子: 批量生產輸入放入隊列, 再批量的獲取結果 import os, time import multiprocessing # 向queue中輸入數據的函數 def inputQ(queue): info = str(os.getpid()) + "(put):" + str(time.asctime()) queue.put(info) # 向queue中輸出數據的函數 def outputQ(queue): info = queue.get() print("%s%s %s" % (str(os.getpid()), "(get):", info)) if __name__ == '__main__': # windows下, 若是開啓進程較多的話, 程序會崩潰, 爲了防止這個問題, 使用freeze_support()方法來解決 multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 輸入進程 for i in range(10): process = multiprocessing.Process(target=inputQ, args=(queue,)) time.sleep(0.2) process.start() record1.append(process) # 輸出進程 for i in range(10): process = multiprocessing.Process(target=outputQ, args=(queue,)) process.start() record2.append(process) # for p in record1: # p.join() [pp.join() for pp in record1] # for p in record2: # p.join() [pp.join() for pp in record2] # 6248(get): 200(put):Wed Jan 9 23:05:38 2019 # 1568(get): 11692(put):Wed Jan 9 23:05:38 2019 # 9292(get): 6736(put):Wed Jan 9 23:05:38 2019 # 3452(get): 12136(put):Wed Jan 9 23:05:38 2019 # 6676(get): 3400(put):Wed Jan 9 23:05:39 2019 # 372(get): 2904(put):Wed Jan 9 23:05:39 2019 # 1396(get): 6352(put):Wed Jan 9 23:05:39 2019 # 1532(get): 4156(put):Wed Jan 9 23:05:39 2019 # 6868(get): 9528(put):Wed Jan 9 23:05:40 2019 # 10832(get): 8336(put):Wed Jan 9 23:05:41 2019 (3). 生產者消費者模型 在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。 爲何要使用生產者和消費者模式 在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。 什麼是生產者消費者模式 生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力,而且我能夠根據生產速度和消費速度來均衡一下多少個生產者能夠爲多少個消費者提供足夠的服務,就能夠開多進程等等,而這些進程都是到阻塞隊列或者說是緩衝區中去獲取或者添加數據。 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主') (3). 生產者消費者模型總結 #程序中有兩類角色 一類負責生產數據(生產者) 一類負責處理數據(消費者) #引入生產者消費者模型爲了解決的問題是: 平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度 #如何實現: 生產者隊列<——>消費者 #生產者消費者模型實現類程序的解耦和 (4). 經過上面基於隊列的生產者消費者代碼示例,咱們發現一個問題:主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。 解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(5): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.put(None) #在本身的子進程的最後加入一個結束信號 if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主') 注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() p1.join() #等待生產者進程結束 q.put(None) #發送結束信號 print('主') (5). 但上述解決方式,在有多個生產者和多個消費者時,因爲隊列咱們說了是進程安全的,我一個進程拿走告終束信號,另一個進程就拿不到了,還須要多發送一個結束信號,有幾個取數據的進程就要發送幾個結束信號,咱們則須要用一個很low的方式去解決 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #開始 p1.start() p2.start() p3.start() c1.start() p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號 p2.join() p3.join() q.put(None) #有幾個消費者就應該發送幾回結束信號None q.put(None) #發送結束信號 print('主') (6). 其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制 #JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 #參數介紹: maxsize是隊列中容許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法以外還具備: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止,也就是隊列中的數據所有被get拿走了。 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() # time.sleep(random.randint(1,3)) time.sleep(random.random()) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走並執行完了 def producer(name,q): for i in range(10): # time.sleep(random.randint(1,3)) time.sleep(random.random()) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) print('%s生產結束'%name) q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。 print('%s生產結束~~~~~~'%name) if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True #若是不加守護,那麼主進程結束不了,可是加了守護以後,必須確保生產者的內容生產完而且被處理完了,全部必須還要在主進程給生產者設置join,才能確保生產者生產的任務被執行完了,而且可以確保守護進程在全部任務執行完成以後才隨着主進程的結束而結束。 c2.daemon=True #開始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() #我要確保你的生產者進程結束了,生產者進程的結束標誌着你生產的全部的人任務都已經被處理完了 p2.join() p3.join() print('主') # 主進程等--->p1,p2,p3等---->c1,c2 # p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據 # 於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。python