multiprocessing

最近接觸一個項目,要在多個虛擬機中運行任務,參考別人以前項目的代碼,採用了多進程來處理,因而上網查了查python中的多進程python

1、先說說Queue(隊列對象)app

Queue是python中的標準庫,能夠直接import 引用,以前學習的時候有聽過著名的「先吃先拉」與「後吃先吐」,其實就是這裏說的隊列,隊列的構造的時候能夠定義它的容量,別吃撐了,吃多了,就會報錯,構造的時候不寫或者寫個小於1的數則表示無限多dom

import Queueasync

q = Queue.Queue(10)函數

向隊列中放值(put)學習

q.put(‘yang’)gradle

q.put(4)ui

q.put([‘yan’,’xing’])spa

在隊列中取值get().net

默認的隊列是先進先出的

>>> q.get() 
'yang' 
>>> q.get() 

>>> q.get() 
['yan', 'xing'] 
>>>

 

當一個隊列爲空的時候若是再用get取則會堵塞,因此取隊列的時候通常是用到

get_nowait()方法,這種方法在向一個空隊列取值的時候會拋一個Empty異常

因此更經常使用的方法是先判斷一個隊列是否爲空,若是不爲空則取值

隊列中經常使用的方法

Queue.qsize() 返回隊列的大小  
Queue.empty() 若是隊列爲空,返回True,反之False  
Queue.full() 若是隊列滿了,返回True,反之False 
Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間  
Queue.get_nowait() 至關Queue.get(False) 
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間  
Queue.put_nowait(item) 至關Queue.put(item, False)

 

2、multiprocessing中使用子進程概念

from multiprocessing import Process

能夠經過Process來構造一個子進程

p = Process(target=fun,args=(args))

再經過p.start()來啓動子進程

再經過p.join()方法來使得子進程運行結束後再執行父進程

from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'

image

3、在multiprocessing中使用pool

若是須要多個子進程時能夠考慮使用進程池(pool)來管理

from multiprocessing import Pool

 

from multiprocessing import Pool import os, time def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'

pool建立子進程的方法與Process不一樣,是經過

p.apply_async(func,args=(args))實現,一個池子裏能同時運行的任務是取決你電腦的cpu數量,如個人電腦如今是有4個cpu,那會子進程task0,task1,task2,task3能夠同時啓動,task4則在以前的一個某個進程結束後纔開始

image

上面的程序運行後的結果實際上是按照上圖中1,2,3分開進行的,先打印1,3秒後打印2,再3秒後打印3

代碼中的p.close()是關掉進程池子,是再也不向裏面添加進程了,對Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close(),調用close()以後就不能繼續添加新的Process了。

當時也能夠是實例pool的時候給它定義一個進程的多少

若是上面的代碼中p=Pool(5)那麼全部的子進程就能夠同時進行

3、多個子進程間的通訊

多個子進程間的通訊就要採用第一步中說到的Queue,好比有如下的需求,一個子進程向隊列中寫數據,另一個進程從隊列中取數據,

 

#coding:gbk from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): while True: if not q.empty(): value = q.get(True) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': # 父進程建立Queue,並傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啓動子進程pw,寫入: pw.start() # 等待pw結束: pw.join() # 啓動子進程pr,讀取: pr.start() pr.join() # pr進程裏是死循環,沒法等待其結束,只能強行終止: print print '全部數據都寫入而且讀完'

 

4、關於上面代碼的幾個有趣的問題

if __name__=='__main__': # 父進程建立Queue,並傳給各個子進程: q = Queue() p = Pool() pw = p.apply_async(write,args=(q,)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '全部數據都寫入而且讀完'

 

 

若是main函數寫成上面的樣本,原本我想要的是將會獲得一個隊列,將其做爲參數傳入進程池子裏的每一個子進程,可是卻獲得

RuntimeError: Queue objects should only be shared between processes through inheritance

的錯誤,查了下,大意是隊列對象不能在父進程與子進程間通訊,這個若是想要使用進程池中使用隊列則要使用multiprocess的Manager類

if __name__=='__main__': manager = multiprocessing.Manager()  # 父進程建立Queue,並傳給各個子進程: q = manager.Queue() p = Pool() pw = p.apply_async(write,args=(q,)) time.sleep(0.5) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '全部數據都寫入而且讀完'

 

這樣這個隊列對象就能夠在父進程與子進程間通訊,不用池則不須要Manager,之後再擴展multiprocess中的Manager類吧

關於鎖的應用,在不一樣程序間若是有同時對同一個隊列操做的時候,爲了不錯誤,能夠在某個函數操做隊列的時候給它加把鎖,這樣在同一個時間內則只能有一個子進程對隊列進行操做,鎖也要在manager對象中的鎖

#coding:gbk from multiprocessing import Process,Queue,Pool import multiprocessing import os, time, random # 寫數據進程執行的代碼: def write(q,lock): lock.acquire() #加上鎖 for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) lock.release() #釋放鎖 # 讀數據進程執行的代碼: def read(q): while True: if not q.empty(): value = q.get(False) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': manager = multiprocessing.Manager() # 父進程建立Queue,並傳給各個子進程: q = manager.Queue() lock = manager.Lock() #初始化一把鎖 p = Pool() pw = p.apply_async(write,args=(q,lock)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '全部數據都寫入而且讀完'

參考文章:

http://blog.csdn.net/yatere/article/details/6668006

http://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/0013868323401155ceb3db1e2044f80b974b469eb06cb43000

相關文章
相關標籤/搜索