進程間通訊html
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。python
進程隊列queuegit
不一樣於線程queue,進程queue的生成是用multiprocessing模塊生成的。github
在生成子進程的時候,會將代碼拷貝到子進程中執行一遍,及子進程擁有和主進程內容同樣的不一樣的名稱空間。網絡
示例1:併發
1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]) 4 print(q.qsize()) 5 6 q=multiprocessing.Queue() #全局定義一個q進程隊列,在產生子進程時候會在子進程裏生成,能夠指定最大數,限制隊列長度 7 if __name__ == '__main__': 8 p=multiprocessing.Process(target=foo,args=()) #由於名稱空間不一樣,子進程的主線程建立的q隊列,主進程get不到,因此會阻塞住 9 p.start() 10 # foo() #主進程執行一下函數就能夠訪問到了 11 print(q.get())
示例2:app
1 import multiprocessing 2 3 def foo(): 4 q.put([11,'hello',True]) 5 print(q.qsize()) 6 7 if __name__ == '__main__': 8 q = multiprocessing.Queue() #主進程建立一個q進程隊列 9 p=multiprocessing.Process(target=foo,args=()) #由於名稱空間不一樣,子進程的主線程找不到q隊列,因此會報錯提示沒有q 10 p.start() 11 print(q.get())
示例3:框架
1 import multiprocessing 2 3 def foo(argument): #定義函數處理進程隊列 4 argument.put([11,'hello',True]) 5 print(argument.qsize()) 6 q = multiprocessing.Queue() #全局定義一個進程隊列 7 print('test') 8 9 if __name__ == '__main__': 10 x = multiprocessing.Queue() #主進程定義一個進程隊列 11 p=multiprocessing.Process(target=foo,args=(x,)) #主進程把值傳給子進程就能夠處理了 12 p.start() 13 print(x.get()) 14 # foo(q) 15 # print(q.get())
經常使用方法異步
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。 q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常. q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。 q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
其餘方法socket
q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。 q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲
另外一個建立進程隊列的類
http://www.cnblogs.com/zero527/p/7211909.html
管道pipe
管道就是管道,就像生活中的管道,兩頭都能進能出
默認管道是全雙工的,若是建立管道的時候映射成False,左邊只能用於接收,右邊只能用於發送,相似於單行道
最簡單的管道雙向通訊示例:
1 import multiprocessing 2 3 def foo(sk): 4 sk.send('hello world') 5 print(sk.recv()) 6 7 if __name__ == '__main__': 8 conn1,conn2=multiprocessing.Pipe() #開闢兩個口,都是能進能出,括號中若是False即單向通訊 9 p=multiprocessing.Process(target=foo,args=(conn1,)) #子進程使用sock口,調用foo函數 10 p.start() 11 print(conn2.recv()) #主進程使用conn口接收 12 conn2.send('hi son') #主進程使用conn口發送
經常使用方法
conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
注意:send()和recv()方法使用pickle模塊對對象進行序列化
其餘方法
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法,不用的時候兩邊都要close conn1.fileno():返回鏈接使用的整數文件描述符 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。若是忘記執行這些步驟,程序可能再消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生產EOFError異常。所以在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def consumer(p,name): 5 left,right=p 6 left.close() 7 while True: 8 try: 9 baozi=right.recv() 10 print('%s 收到包子:%s' %(name,baozi)) 11 except EOFError: 12 right.close() 13 break 14 def producer(seq,p): 15 left,right=p 16 right.close() 17 for i in seq: 18 left.send(i) 19 # time.sleep(1) 20 else: 21 left.close() 22 if __name__ == '__main__': 23 left,right=Pipe() 24 c1=Process(target=consumer,args=((left,right),'c1')) 25 c1.start() 26 seq=(i for i in range(10)) 27 producer(seq,(left,right)) 28 right.close() 29 left.close() 30 c1.join() 31 print('主進程') 32 33 生產者消費者關閉某端點
共享數據manage
Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另外一個進程的數據。
注:進程間通訊應該儘可能避免使用共享數據的方式
共享數據:列表
1 from multiprocessing import Manager,Process 2 def foo(l,i): 3 l.append(i**i) 4 if __name__ == '__main__': 5 man=Manager() 6 ml=man.list([11,22,33]) 7 l=[] 8 for i in range(5): 9 p=Process(target=foo,args=(ml,i)) 10 p.start() 11 l.append(p) 12 for i in l: #必需要join,否則會執行報錯,處理一個數據必需要一個個來,不能同時處理一個數據 13 i.join() 14 print(ml)
共享數據:字典
1 from multiprocessing import Manager,Process 2 def foo(d,k,v): 3 d[k]=v 4 if __name__ == '__main__': 5 man=Manager() 6 md=man.dict({'name':'bob'}) 7 l=[] 8 for i in range(5): 9 p=Process(target=foo,args=(md,i,'a')) 10 p.start() 11 l.append(p) 12 for i in l: #必需要join,否則會執行報錯,處理一個數據必需要一個個來,不能同時處理一個數據 13 i.join() 14 print(md)
進程池
開多進程是爲了併發,一般有幾個cpu核心就開幾個進程,可是進程開多了會影響效率,主要體如今切換的開銷,因此引入進程池限制進程的數量。
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。
示例:
1 from multiprocessing import Pool 2 import time 3 4 def foo(n): 5 print(n) 6 time.sleep(1) 7 8 if __name__ == '__main__': 9 pool_obj=Pool(5) # 10 for i in range(47): 11 # pool_obj.apply_async(func=foo,args=(i,)) 12 pool_obj.apply(func=foo,args=(i,)) #子進程的生成是靠進程池對象維護的 13 # apply同步,子進程一個個執行 14 # apply_async異步,多個子進程一塊兒執行 15 pool_obj.close() 16 pool_obj.join() 17 print('ending')
經常使用方法:
pool_obj.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async() pool_obj.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。 pool_obj.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 pool_obj.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
其餘方法:
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。 obj.ready():若是調用完成,返回True obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常 obj.wait([timeout]):等待結果變爲可用。 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
協程
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。
一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
注意:
1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其餘線程運行)
2. 單線程內開啓協程,一旦遇到io,從應用程序級別(而非操做系統)控制切換
協程優勢:
1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
2. 單線程內就能夠實現併發的效果,最大限度地利用cpu
協程缺點:
1.協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程
2.協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
yield實現協程併發
1 import time 2 def consumer(): 3 r='' 4 while True: 5 n=yield r 6 if not n: 7 return 8 print('[CONSUMER] ←← Consuming %s...' % n) 9 time.sleep(1) 10 r='200 Ok' 11 12 def produce(c): 13 next(c) #1.啓動生成器 14 n=0 15 while n < 5: 16 n=n+1 17 print('[PRODUCER] →→ Producing %s...' % n) 18 cr=c.send(n) 19 #2.將n傳入到consumer的對象,yield接收到傳入值開始執行代碼,遇到yield執行代碼返回r的值 20 print('[PRODUCER] Consumer return: %s' % cr) 21 #3.produce沒有值了,關閉整個過程 22 c.close() 23 24 if __name__ == '__main__': 25 c=consumer() #生成生成器對象 26 produce(c) #執行調用
greenlet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍後使用next()或send()操做進行恢復爲止。可使用一個調度器循環在一組生成器函數之間協做多個任務。greentlet是python中實現咱們所謂的"Coroutine(協程)"的一個基礎庫。
示例1:
1 from greenlet import greenlet 2 def foo(): 3 print('ok1') 4 g2.switch() #阻斷 5 print('ok3') 6 g2.switch() 7 def bar(): 8 print('ok2') 9 g1.switch() 10 print('ok4') 11 12 g1=greenlet(foo) #生成foo函數的greenlet對象 13 g2=greenlet(bar) #生成bar函數的greenlet對象 14 g1.switch() #一、執行g1對象,打印ok1 15 #二、遇到g2.switch(),轉到g2執行打印ok2 16 #三、遇到g1.switch(),轉到g1的阻斷處繼續執行打印ok3 17 #四、遇到g2.switch(),轉到g2執行打印ok4
示例2:
1 def eat(name): 2 print('%s eat food 1' %name) 3 gr2.switch('bob') 4 print('%s eat food 2' %name) 5 gr2.switch() 6 def play_phone(name): 7 print('%s play 1' %name) 8 gr1.switch() 9 print('%s play 2' %name) 10 11 gr1=greenlet(eat) 12 gr2=greenlet(play_phone) 13 gr1.switch(name='natasha')#能夠在第一次switch時傳入參數,之後都不須要
這種方法不會節省時間,由於不是io操做,而greenlet遇到io操做不會跳轉,仍然要io阻斷
基於greenlet框架的高級庫gevent模塊
gevent是第三方庫,經過greenlet實現協程,其基本思想是:
當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。
因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:
簡單示例:
1 import gevent 2 def foo(): 3 print('ok1') 4 gevent.sleep(4) #模擬io操做 5 print('ok3') 6 def bar(): 7 print('ok2') 8 gevent.sleep(2) 9 print('ok4') 10 11 g1=gevent.spawn(foo) 12 g2=gevent.spawn(bar) 13 gevent.joinall([g1,g2]) #所有阻塞,或者單獨一個個join
spawn括號內第一個參數是函數名,如foo,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數foo的
注意:
gevent.sleep(4)模擬的是gevent能夠識別的io阻塞,
而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
1 #補丁 2 from gevent import monkey 3 monkey.patch_all()
必須放到被打補丁者的前面,如time,socket模塊以前
或者咱們乾脆記憶成:要用gevent,須要將補丁放到文件的開頭
爬蟲示例:
1 from gevent import monkey;monkey.patch_all() 2 import gevent 3 import requests 4 import time 5 6 def get_page(url): 7 print('GET: %s' %url) 8 response=requests.get(url) 9 if response.status_code == 200: 10 print('%d bytes received from %s' %(len(response.text),url)) 11 12 13 start_time=time.time() 14 gevent.joinall([ 15 gevent.spawn(get_page,'https://www.python.org/'), 16 gevent.spawn(get_page,'https://www.yahoo.com/'), 17 gevent.spawn(get_page,'https://github.com/'), 18 ]) 19 stop_time=time.time() 20 print('run time is %s' %(stop_time-start_time))