Python之路--Python基礎10--併發編程之進程

1、multiprocessing模塊介紹

  Python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。html

  Python提供了multiprocessing。multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。python

 

2、Process類介紹

建立進程的類:git

  Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)github

強調:數據庫

  1. 須要使用關鍵字的方式來指定參數編程

  2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號。json

參數介紹:windows

  group參數未使用,值始終爲None數組

  target表示調用對象,即子進程要執行的任務安全

  args表示調用對象的位置參數元組,args=(1,2,'egon',)

  kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

  name爲子進程的名稱

方法介紹:

  p.start():啓動進程,並調用該子進程中的p.run() 。
  p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法。

  p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖。
  p.is_alive():若是p仍然運行,返回True。

  p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程。

屬性介紹:

  p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置

  p.name:進程的名稱

  p.pid:進程的pid

  p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)

  p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

 

2、Process類介紹

注意:在windows中Process()必須放到# if __name__ == '__main__':下,否則會報錯。

一、建立並開啓子進程的兩種方式

#開進程的方法一:
import time import random from multiprocessing import Process def work(name): print('%s working' %name) time.sleep(random.randrange(1,5)) print('%s work end' %name) p1=Process(target=work,args=('egon',)) #必須加,號
p2=Process(target=work,args=('alex',)) p1.start() p2.start() print('主線程')
#開進程的方法二:
import time import random from multiprocessing import Process class Work(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s working' %self.name) time.sleep(random.randrange(1,5)) print('%s work end' %self.name) p1=Work('egon') p2=Work('alex') p1.start() #start會自動調用run
p2.start() print('主線程')

進程直接的內存空間是隔離的:

from multiprocessing import Process def work(): global n n = 0 print('子進程內: ', n) if __name__ == '__main__': n = 100 p = Process(target=work) p.start() print('主進程內: ', n) # 輸出: # 主進程內: 100 # 子進程內: 0

 

二、Process對象的join方法

from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() p.join(0.0001) #等待p中止,等0.0001秒就再也不等了
print('開始')
from multiprocessing import Process import time import random def piao(name): print('%s is piaoing' %name) time.sleep(random.randint(1,3)) print('%s is piao end' %name) p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('yuanhao',)) p4=Process(target=piao,args=('wupeiqi',)) p1.start() p2.start() p3.start() p4.start() #有的同窗會有疑問:既然join是等待進程結束,那麼我像下面這樣寫,進程不就又變成串行的了嗎? #固然不是了,必須明確:p.join()是讓誰等? #很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,

#詳細解析以下: #進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了 #而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵 #join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待 #因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
p1.join() p2.join() p3.join() p4.join() print('主線程') #上述啓動進程與join進程能夠簡寫爲 # p_l=[p1,p2,p3,p4] #  # for p in p_l: # p.start() #  # for p in p_l: # p.join()

 

三、Process對象的其餘方法或屬性

#進程對象的其餘方法一:terminate,is_alive
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,5)) print('%s is piao end' %self.name) p1=Piao('egon1') p1.start() p1.terminate()     #關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
print(p1.is_alive())  #結果爲True

print('開始') print(p1.is_alive()) #結果爲False
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        # #因此加到這裏,會覆蓋咱們的self.name=name

        #爲咱們開啓的進程設置名字的作法
        super().__init__() self.name=name #這兩個換下位子就行了 def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() print('開始') print(p.pid) #查看pid

 

主進程建立守護進程

  1:守護進程會在主進程代碼執行結束後就終止

  2:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
p.start() print('')

 

3、進程同步(鎖)

  進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

part1:多個進程共享同一打印終端

#併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start()
#由併發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start()

 

part2:多個進程共享同一文件

文件當數據庫,模擬搶票

#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂

#文件db的內容爲:{"count":1} #注意必定要用雙引號,否則json沒法識別
from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('剩餘票數%s' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db.txt','w')) print('購票成功') def task(lock): search() #lock.acquire()
 get() #lock.release()

if __name__ == '__main__': lock=Lock() for i in range(5): #模擬併發5個客戶端搶票
        p=Process(target=task,args=(lock,)) p.start()

輸出:

剩餘票數1
剩餘票數1
剩餘票數1
剩餘票數1
剩餘票數1
購票成功
購票成功
購票成功
購票成功
購票成功

解決辦法:加鎖:購票行爲由併發變成了串行,犧牲了運行效率,但保證了數據安全

#文件db的內容爲:{"count":1} #注意必定要用雙引號,否則json沒法識別
from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('剩餘票數%s' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db.txt','w')) print('購票成功') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock=Lock() for i in range(5): #模擬併發5個客戶端搶票
        p=Process(target=task,args=(lock,)) p.start()
剩餘票數1
剩餘票數1
剩餘票數1
剩餘票數1
剩餘票數1
購票成功

總結:

  加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

雖然能夠用文件共享數據實現進程間通訊,但問題是:

  一、效率低(共享數據基於文件,而文件是硬盤上的數據)

  二、須要本身加鎖處理

所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。

 

一、隊列和管道都是將數據存放於內存中

二、隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,

咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

 

4、隊列

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

建立隊列的類(底層就是以管道和鎖定的方式實現)

  Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

參數介紹:

  maxsize是隊列中容許最大項數,省略則無大小限制。   

主要方法介紹:

  q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。

  q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
  q.get_nowait():同q.get(False)
  q.put_nowait():同q.put(False)

  q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
  q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
  q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

  q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞

  q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

  q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲

 

栗子:

''' multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,可是隊列接口 '''

from multiprocessing import Process,Queue import time q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty
q.put(3) q.put(3) q.put(3) print(q.full()) #滿了

print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了

 

生產者消費者模型

  在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

  在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

  生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

#基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們
    p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,)) #開始
 p1.start() c1.start() print('')

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.put(None) #發送結束信號

if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('')

 

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

#主進程在生產者生產完畢後發送結束信號None

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們
    p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,)) #開始
 p1.start() c1.start() p1.join() q.put(None) #發送結束信號
    print('')

但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決,有幾個消費者就須要發送幾回結束信號:至關low。其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制。

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

#參數介紹:

  maxsize是隊列中容許最大項數,省略則無大小限制。    

#方法介紹:

  JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:

  q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常

  q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了

def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.join() if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始
    p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('') #主進程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
    #於是c1,c2也沒有存在的價值了,應該隨着主進程的結束而結束,因此設置成守護進程

 

5、共享數據

  展望將來,基於消息傳遞的併發編程是大勢所趨,即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中,進程間通訊應該儘可能避免使用本節所講的共享數據的方式。

from multiprocessing import Manager,Process,Lock import os def work(d,lock): # with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
        d['count']-=1

if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) #{'count': 94}

 

6、進程池

 

  在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:

  一、很明顯須要併發執行的任務一般要遠大於核數

  二、一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程

  三、進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)

  例如當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。

  咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數... 
  ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

  建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程

 

Pool([numprocess [,initializer [, initargs]]]):建立進程池 

參數介紹:

  numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值

  initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None

  initargs:是要傳給initializer的參數組

方法介紹:

  p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()
  p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。
  p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
  p.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

 

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法

  obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。

  obj.ready():若是調用完成,返回True

  obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常

  obj.wait([timeout]):等待結果變爲可用。

  obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

栗子:

#同步調用apply,一個一個一執行

from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2

if __name__ == '__main__': p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[] for i in range(10): res=p.apply(work,args=(i,)) #同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞,但無論該任務是否存在阻塞,同步調用都會在原地等着,只是等的過程當中如果任務發生了阻塞就會被奪走cpu的執行權限
 res_l.append(res) print(res_l)
#異步調用apply_async,三個三個執行

from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2

if __name__ == '__main__': p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res
 res_l.append(res) #異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果,不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
 p.close() p.join() for res in res_l: print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
# 詳解:apply_async與apply

#一:使用進程池(異步調用,apply_async) #coding: utf-8
from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply_async(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
 res_l.append(res) print("==============================>") #沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了
 pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果
    for i in res_l: print(i.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get

#二:使用進程池(同步調用,apply) #coding: utf-8
from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個
    print("==============================>") pool.close() pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法
        print(i)

 

栗子2:

服務端:

#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count()) #開啓6個客戶端,會發現2個客戶端處於等待狀態 #在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break

if __name__ == '__main__': p=Pool() while True: conn,client_addr=server.accept() p.apply_async(talk,args=(conn,client_addr)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

 

客戶端:

from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

發現:併發開啓多個客戶端,服務端同一時間只有3個不一樣的pid,幹掉一個客戶端,另一個客戶端纔會進來,被3個進程之一處理。

 

回掉函數:

  須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數。咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了

''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
#爬蟲案例

from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))

若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2
if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中全部進程執行完畢
 nums=[] for res in res_l: nums.append(res.get()) #拿到全部結果
    print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理

 

進程池的其餘實現方式:https://docs.python.org/dev/library/concurrent.futures.html

相關文章
相關標籤/搜索