multiprocessing包是Python中的多進程管理包。它與 threading.Thread相似,能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠容許放在Python程序內部編寫的函數中。該Process對象與Thread對象的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要經過start()設置)、exitcode(進程在運行時爲None、若是爲–N,表示被信號N結束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類,用來同步進程,其用法也與threading包中的同名類同樣。multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。css
這個模塊表示像線程同樣管理進程,這個是multiprocessing的核心,它與threading很類似,對多核CPU的利用率會比threading好的多。python
看一下Process類的構造方法:併發
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
參數說明:
group:進程所屬組。基本不用
target:表示調用對象。
args:表示調用對象的位置參數元組。
name:別名
kwargs:表示調用對象的字典。app
建立進程的簡單實例:async
#coding=utf-8 import multiprocessing def do(n) : #獲取當前線程的名字 name = multiprocessing.current_process().name print name,'starting' print "worker ", n return if __name__ == '__main__' : numList = [] for i in xrange(5) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start() p.join() print "Process end."
執行結果:函數
Process-1 starting worker 0 Process end. Process-2 starting worker 1 Process end. Process-3 starting worker 2 Process end. Process-4 starting worker 3 Process end. Process-5 starting worker 4 Process end.
建立子進程時,只須要傳入一個執行函數和函數的參數,建立一個Process實例,並用其start()方法啓動,這樣建立進程比fork()還要簡單。
join()方法表示等待子進程結束之後再繼續往下運行,一般用於進程間的同步。ui
注意:
在Windows上要想使用進程模塊,就必須把有關進程的代碼寫在當前.py文件的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的進程模塊。Unix/Linux下則不須要。spa
在使用Python進行系統管理時,特別是同時操做多個文件目錄或者遠程控制多臺主機,並行操做能夠節約大量的時間。若是操做的對象數目不大時,還能夠直接使用Process類動態的生成多個進程,十幾個還好,可是若是上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時進程池就派上用場了。
Pool類能夠提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,若是池尚未滿,就會建立一個新的進程來執行請求。若是池滿,請求就會告知先等待,直到池中有進程結束,纔會建立新的進程來執行這些請求。
下面介紹一下multiprocessing 模塊下的Pool類下的幾個方法線程
函數原型:code
apply(func[, args=()[, kwds={}]])
該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束(不建議使用,而且3.x之後不在出現)。
函數原型:
apply_async(func[, args=()[, kwds={}[, callback=None]]])
與apply用法同樣,但它是非阻塞且支持結果返回進行回調。
函數原型:
map(func, iterable[, chunksize=None])
Pool類中的map方法,與內置的map函數用法行爲基本一致,它會使進程阻塞直到返回結果。
注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程序纔會運行子進程。
關閉進程池(pool),使其不在接受新的任務。
結束工做進程,不在處理未處理的任務。
主進程阻塞等待子進程的退出,join方法必須在close或terminate以後使用。
multiprocessing.Pool類的實例:
import time from multiprocessing import Pool def run(fn): #fn: 函數參數是數據列表的一個元素 time.sleep(1) return fn*fn if __name__ == "__main__": testFL = [1,2,3,4,5,6] print 'shunxu:' #順序執行(也就是串行執行,單進程) s = time.time() for fn in testFL: run(fn) e1 = time.time() print "順序執行時間:", int(e1 - s) print 'concurrent:' #建立多個進程,並行執行 pool = Pool(5) #建立擁有5個進程數量的進程池 #testFL:要處理的數據列表,run:處理testFL列表中數據的函數 rl =pool.map(run, testFL) pool.close()#關閉進程池,再也不接受新的進程 pool.join()#主進程阻塞等待子進程的退出 e2 = time.time() print "並行執行時間:", int(e2-e1) print rl
執行結果:
shunxu: 順序執行時間: 6 concurrent: 並行執行時間: 2 [1, 4, 9, 16, 25, 36]
上例是一個建立多個進程併發處理與順序執行處理同一數據,所用時間的差異。從結果能夠看出,併發執行的時間明顯比順序執行要快不少,可是進程是要耗資源的,因此平時工做中,進程數也不能開太大。
程序中的r1表示所有進程執行結束後全局的返回結果集,run函數有返回值,因此一個進程對應一個返回結果,這個結果存在一個列表中,也就是一個結果堆中,其實是用了隊列的原理,等待全部進程都執行完畢,就返回這個列表(列表的順序不定)。
對Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close(),讓其再也不接受新的Process了。
再看一個實例:
import time from multiprocessing import Pool def run(fn) : time.sleep(2) print fn if __name__ == "__main__" : startTime = time.time() testFL = [1,2,3,4,5] pool = Pool(10)#能夠同時跑10個進程 pool.map(run,testFL) pool.close() pool.join() endTime = time.time() print "time :", endTime - startTime
執行結果:
21 3 4 5 time : 2.51999998093
再次執行結果以下:
1 34 2 5 time : 2.48600006104
結果中爲何還有空行和沒有折行的數據呢?其實這跟進程調度有關,當有多個進程並行執行時,每一個進程獲得的時間片時間不同,哪一個進程接受哪一個請求以及執行完成時間都是不定的,因此會出現輸出亂序的狀況。那爲何又會有沒這行和空行的狀況呢?由於有可能在執行第一個進程時,剛要打印換行符時,切換到另外一個進程,這樣就極有可能兩個數字打印到同一行,而且再次切換回第一個進程時會打印一個換行符,因此就會出現空行的狀況。
並行處理某個目錄下文件中的字符個數和行數,存入res.txt文件中,
每一個文件一行,格式爲:filename:lineNumber,charNumber
import os import time from multiprocessing import Pool def getFile(path) : #獲取目錄下的文件list fileList = [] for root, dirs, files in list(os.walk(path)) : for i in files : if i.endswith('.txt') or i.endswith('.10w') : fileList.append(root + "\\" + i) return fileList def operFile(filePath) : #統計每一個文件中行數和字符數,並返回 filePath = filePath fp = open(filePath) content = fp.readlines() fp.close() lines = len(content) alphaNum = 0 for i in content : alphaNum += len(i.strip('\n')) return lines,alphaNum,filePath def out(list1, writeFilePath) : #將統計結果寫入結果文件中 fileLines = 0 charNum = 0 fp = open(writeFilePath,'a') for i in list1 : fp.write(i[2] + " 行數:"+ str(i[0]) + " 字符數:"+str(i[1]) + "\n") fileLines += i[0] charNum += i[1] fp.close() print fileLines, charNum if __name__ == "__main__": #建立多個進程去統計目錄中全部文件的行數和字符數 startTime = time.time() filePath = "C:\\wcx\\a" fileList = getFile(filePath) pool = Pool(5) resultList =pool.map(operFile, fileList) pool.close() pool.join() writeFilePath = "c:\\wcx\\res.txt" print resultList out(resultList, writeFilePath) endTime = time.time() print "used time is ", endTime - startTime
執行結果:
耗時不到1秒,可見多進程併發執行速度是很快的。