若是想要充分利用,在python中大部分狀況須要使用多進程,那麼這個包就叫作 multiprocessing。html
藉助它,能夠輕鬆完成從單進程到併發執行的轉換。multiprocessing支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。python
那麼本節要介紹的內容有:數組
在multiprocessing中,每個進程都用一個Process類來表示。首先看下它的API數據結構
Process([group [, target [, name [, args [, kwargs]]]]])
target表示調用對象,你能夠傳入方法的名字
args表示被調用對象的位置參數元組,好比target是函數a,他有兩個參數m,n,那麼args就傳入(m, n)便可
kwargs表示調用對象的字典
name是別名,至關於給這個進程取一個名字
group分組,實際上不使用
import multiprocessing def process(num): print 'Process:', num if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=process, args=(i,)) p.start()
最簡單的建立Process的過程如上所示,target傳入函數名,args是函數的參數,是元組的形式,若是隻有一個參數,那就是長度爲1的元組。多線程
而後調用start()方法便可啓動多個進程了。併發
另外你還能夠經過 cpu_count() 方法還有 active_children() 方法獲取當前機器的 CPU 核心數量以及獲得目前全部的運行的進程。app
經過一個實例來感覺一下:dom
import multiprocessing import time def process(num): time.sleep(num) print 'Process:', num if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=process, args=(i,)) p.start() print('CPU number:' + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print('Child process name: ' + p.name + ' id: ' + str(p.pid)) print('Process Ended')
運行結果:async
Process: 0 CPU number:8 Child process name: Process-2 id: 9641 Child process name: Process-4 id: 9643 Child process name: Process-5 id: 9644 Child process name: Process-3 id: 9642 Process Ended Process: 1 Process: 2 Process: 3 Process: 4
另外你還能夠繼承Process類,自定義進程類,實現run方法便可。函數
用一個實例來感覺一下:
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, loop): Process.__init__(self) self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count)) if __name__ == '__main__': for i in range(2, 5): p = MyProcess(i) p.start()
在上面的例子中,咱們繼承了 Process 這個類,而後實現了run方法。打印出來了進程號和參數。
運行結果:
Pid: 28116 LoopCount: 0 Pid: 28117 LoopCount: 0 Pid: 28118 LoopCount: 0 Pid: 28116 LoopCount: 1 Pid: 28117 LoopCount: 1 Pid: 28118 LoopCount: 1 Pid: 28117 LoopCount: 2 Pid: 28118 LoopCount: 2 Pid: 28118 LoopCount: 3
能夠看到,三個進程分別打印出了二、三、4條結果。
咱們能夠把一些方法獨立的寫在每一個類裏封裝好,等用的時候直接初始化一個類運行便可。
在這裏介紹一個屬性,叫作deamon。每一個線程均可以單獨設置它的屬性,若是設置爲True,當父進程結束後,子進程會自動被終止。
用一個實例來感覺一下,仍是原來的例子,增長了deamon屬性:
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, loop): Process.__init__(self) self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count)) if __name__ == '__main__': for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() print 'Main process Ended!'
在這裏,調用的時候增長了設置deamon,最後的主進程(即父進程)打印輸出了一句話。
運行結果:
Main process Ended!
結果很簡單,由於主進程沒有作任何事情,直接輸出一句話結束,因此在這時也直接終止了子進程的運行。
這樣能夠有效防止無控制地生成子進程。若是這樣寫了,你在關閉這個主程序運行時,就無需額外擔憂子進程有沒有被關閉了。
不過這樣並非咱們想要達到的效果呀,能不能讓全部子進程都執行完了而後再結束呢?那固然是能夠的,只須要加入join()方法便可。
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, loop): Process.__init__(self) self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count)) if __name__ == '__main__': for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() p.join() print 'Main process Ended!'
在這裏,每一個子進程都調用了join()方法,這樣父進程(主進程)就會等待子進程執行完畢。
運行結果:
Pid: 29902 LoopCount: 0 Pid: 29902 LoopCount: 1 Pid: 29905 LoopCount: 0 Pid: 29905 LoopCount: 1 Pid: 29905 LoopCount: 2 Pid: 29912 LoopCount: 0 Pid: 29912 LoopCount: 1 Pid: 29912 LoopCount: 2 Pid: 29912 LoopCount: 3 Main process Ended!
發現全部子進程都執行完畢以後,父進程最後打印出告終束的結果。
在上面的一些小實例中,你可能會遇到以下的運行結果:
什麼問題?有的輸出錯位了。這是因爲並行致使的,兩個進程同時進行了輸出,結果第一個進程的換行沒有來得及輸出,第二個進程就輸出告終果。因此致使這種排版的問題。
那這歸根結底是由於線程同時資源(輸出操做)而致使的。
那怎麼來避免這種問題?那天然是在某一時間,只能一個進程輸出,其餘進程等待。等剛纔那個進程輸出完畢以後,另外一個進程再進行輸出。這種現象就叫作「互斥」。
咱們能夠經過 Lock 來實現,在一個進程輸出時,加鎖,其餘進程等待。等此進程執行結束後,釋放鎖,其餘進程能夠進行輸出。
咱們現用一個實例來感覺一下:
from multiprocessing import Process, Lock import time class MyProcess(Process): def __init__(self, loop, lock): Process.__init__(self) self.loop = loop self.lock = lock def run(self): for count in range(self.loop): time.sleep(0.1) #self.lock.acquire() print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count)) #self.lock.release() if __name__ == '__main__': lock = Lock() for i in range(10, 15): p = MyProcess(i, lock) p.start()
首先看一下不加鎖的輸出結果:
Pid: 45755 LoopCount: 0 Pid: 45756 LoopCount: 0 Pid: 45757 LoopCount: 0 Pid: 45758 LoopCount: 0 Pid: 45759 LoopCount: 0 Pid: 45755 LoopCount: 1 Pid: 45756 LoopCount: 1 Pid: 45757 LoopCount: 1 Pid: 45758 LoopCount: 1 Pid: 45759 LoopCount: 1 Pid: 45755 LoopCount: 2Pid: 45756 LoopCount: 2 Pid: 45757 LoopCount: 2 Pid: 45758 LoopCount: 2 Pid: 45759 LoopCount: 2 Pid: 45756 LoopCount: 3 Pid: 45755 LoopCount: 3 Pid: 45757 LoopCount: 3 Pid: 45758 LoopCount: 3 Pid: 45759 LoopCount: 3 Pid: 45755 LoopCount: 4 Pid: 45756 LoopCount: 4 Pid: 45757 LoopCount: 4 Pid: 45759 LoopCount: 4 Pid: 45758 LoopCount: 4 Pid: 45756 LoopCount: 5 Pid: 45755 LoopCount: 5 Pid: 45757 LoopCount: 5 Pid: 45759 LoopCount: 5 Pid: 45758 LoopCount: 5 Pid: 45756 LoopCount: 6Pid: 45755 LoopCount: 6 Pid: 45757 LoopCount: 6 Pid: 45759 LoopCount: 6 Pid: 45758 LoopCount: 6 Pid: 45755 LoopCount: 7Pid: 45756 LoopCount: 7 Pid: 45757 LoopCount: 7 Pid: 45758 LoopCount: 7 Pid: 45759 LoopCount: 7 Pid: 45756 LoopCount: 8Pid: 45755 LoopCount: 8 Pid: 45757 LoopCount: 8 Pid: 45758 LoopCount: 8Pid: 45759 LoopCount: 8 Pid: 45755 LoopCount: 9 Pid: 45756 LoopCount: 9 Pid: 45757 LoopCount: 9 Pid: 45758 LoopCount: 9 Pid: 45759 LoopCount: 9 Pid: 45756 LoopCount: 10 Pid: 45757 LoopCount: 10 Pid: 45758 LoopCount: 10 Pid: 45759 LoopCount: 10 Pid: 45757 LoopCount: 11 Pid: 45758 LoopCount: 11 Pid: 45759 LoopCount: 11 Pid: 45758 LoopCount: 12 Pid: 45759 LoopCount: 12 Pid: 45759 LoopCount: 13
能夠看到有些輸出已經形成了影響。
而後咱們對其加鎖:
from multiprocessing import Process, Lock import time class MyProcess(Process): def __init__(self, loop, lock): Process.__init__(self) self.loop = loop self.lock = lock def run(self): for count in range(self.loop): time.sleep(0.1) self.lock.acquire() print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count)) self.lock.release() if __name__ == '__main__': lock = Lock() for i in range(10, 15): p = MyProcess(i, lock) p.start()
咱們在print方法的先後分別添加了得到鎖和釋放鎖的操做。這樣就能保證在同一時間只有一個print操做。
看一下運行結果:
Pid: 45889 LoopCount: 0 Pid: 45890 LoopCount: 0 Pid: 45891 LoopCount: 0 Pid: 45892 LoopCount: 0 Pid: 45893 LoopCount: 0 Pid: 45889 LoopCount: 1 Pid: 45890 LoopCount: 1 Pid: 45891 LoopCount: 1 Pid: 45892 LoopCount: 1 Pid: 45893 LoopCount: 1 Pid: 45889 LoopCount: 2 Pid: 45890 LoopCount: 2 Pid: 45891 LoopCount: 2 Pid: 45892 LoopCount: 2 Pid: 45893 LoopCount: 2 Pid: 45889 LoopCount: 3 Pid: 45890 LoopCount: 3 Pid: 45891 LoopCount: 3 Pid: 45892 LoopCount: 3 Pid: 45893 LoopCount: 3 Pid: 45889 LoopCount: 4 Pid: 45890 LoopCount: 4 Pid: 45891 LoopCount: 4 Pid: 45892 LoopCount: 4 Pid: 45893 LoopCount: 4 Pid: 45889 LoopCount: 5 Pid: 45890 LoopCount: 5 Pid: 45891 LoopCount: 5 Pid: 45892 LoopCount: 5 Pid: 45893 LoopCount: 5 Pid: 45889 LoopCount: 6 Pid: 45890 LoopCount: 6 Pid: 45891 LoopCount: 6 Pid: 45893 LoopCount: 6 Pid: 45892 LoopCount: 6 Pid: 45889 LoopCount: 7 Pid: 45890 LoopCount: 7 Pid: 45891 LoopCount: 7 Pid: 45892 LoopCount: 7 Pid: 45893 LoopCount: 7 Pid: 45889 LoopCount: 8 Pid: 45890 LoopCount: 8 Pid: 45891 LoopCount: 8 Pid: 45892 LoopCount: 8 Pid: 45893 LoopCount: 8 Pid: 45889 LoopCount: 9 Pid: 45890 LoopCount: 9 Pid: 45891 LoopCount: 9 Pid: 45892 LoopCount: 9 Pid: 45893 LoopCount: 9 Pid: 45890 LoopCount: 10 Pid: 45891 LoopCount: 10 Pid: 45892 LoopCount: 10 Pid: 45893 LoopCount: 10 Pid: 45891 LoopCount: 11 Pid: 45892 LoopCount: 11 Pid: 45893 LoopCount: 11 Pid: 45893 LoopCount: 12 Pid: 45892 LoopCount: 12 Pid: 45893 LoopCount: 13
嗯,一切都沒問題了。
因此在訪問臨界資源時,使用Lock就能夠避免進程同時佔用資源而致使的一些問題。
信號量,是在進程同步過程當中一個比較重要的角色。能夠控制臨界資源的數量,保證各個進程之間的互斥和同步。
若是你學過操做系統,那麼必定對這方面很是瞭解,若是你還不瞭解信號量是什麼,能夠參考
來了解一下它是作什麼的。
那麼接下來咱們就用一個實例來演示一下進程之間利用Semaphore作到同步和互斥,以及控制臨界資源數量
from multiprocessing import Process, Semaphore, Lock, Queue import time buffer = Queue(10) empty = Semaphore(2) full = Semaphore(0) lock = Lock() class Consumer(Process): def run(self): global buffer, empty, full, lock while True: full.acquire() lock.acquire() buffer.get() print('Consumer pop an element') time.sleep(1) lock.release() empty.release() class Producer(Process): def run(self): global buffer, empty, full, lock while True: empty.acquire() lock.acquire() buffer.put(1) print('Producer append an element') time.sleep(1) lock.release() full.release() if __name__ == '__main__': p = Producer() c = Consumer() p.daemon = c.daemon = True p.start() c.start() p.join() c.join() print 'Ended!'
如上代碼實現了註明的生產者和消費者問題,定義了兩個進程類,一個是消費者,一個是生產者。
定義了一個共享隊列,利用了Queue數據結構,而後定義了兩個信號量,一個表明緩衝區空餘數,一個表示緩衝區佔用數。
生產者Producer使用empty.acquire()方法來佔用一個緩衝區位置,而後緩衝區空閒區大小減少1,接下來進行加鎖,對緩衝區進行操做。而後釋放鎖,而後讓表明佔用的緩衝區位置數量+1,消費者則相反。
運行結果以下:
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
能夠發現兩個進程在交替運行,生產者先放入緩衝區物品,而後消費者取出,不停地進行循環。
經過上面的例子來體會一下信號量的用法。
在上面的例子中咱們使用了Queue,能夠做爲進程通訊的共享隊列使用。
在上面的程序中,若是你把Queue換成普通的list,是徹底起不到效果的。即便在一個進程中改變了這個list,在另外一個進程也不能獲取到它的狀態。
所以進程間的通訊,隊列須要用Queue。固然這裏的隊列指的是 multiprocessing.Queue
依然是用上面那個例子,咱們一個進程向隊列中放入數據,而後另外一個進程取出數據。
from multiprocessing import Process, Semaphore, Lock, Queue import time from random import random buffer = Queue(10) empty = Semaphore(2) full = Semaphore(0) lock = Lock() class Consumer(Process): def run(self): global buffer, empty, full, lock while True: full.acquire() lock.acquire() print 'Consumer get', buffer.get() time.sleep(1) lock.release() empty.release() class Producer(Process): def run(self): global buffer, empty, full, lock while True: empty.acquire() lock.acquire() num = random() print 'Producer put ', num buffer.put(num) time.sleep(1) lock.release() full.release() if __name__ == '__main__': p = Producer() c = Consumer() p.daemon = c.daemon = True p.start() c.start() p.join() c.join() print 'Ended!'
運行結果:
Producer put 0.719213647437 Producer put 0.44287326683 Consumer get 0.719213647437 Consumer get 0.44287326683 Producer put 0.722859424381 Producer put 0.525321338921 Consumer get 0.722859424381 Consumer get 0.525321338921
能夠看到生產者放入隊列中數據,而後消費者將數據取出來。
get方法有兩個參數,blocked和timeout,意思爲阻塞和超時時間。默認blocked是true,即阻塞式。
當一個隊列爲空的時候若是再用get取則會阻塞,因此這時候就須要吧blocked設置爲false,即非阻塞式,實際上它就會調用get_nowait()方法,此時還須要設置一個超時時間,在這麼長的時間內尚未取到隊列元素,那就拋出Queue.Empty異常。
當一個隊列爲滿的時候若是再用put放則會阻塞,因此這時候就須要吧blocked設置爲false,即非阻塞式,實際上它就會調用put_nowait()方法,此時還須要設置一個超時時間,在這麼長的時間內尚未放進去元素,那就拋出Queue.Full異常。
另外隊列中經常使用的方法
Queue.qsize() 返回隊列的大小 ,不過在 Mac OS 上無法運行。
緣由:
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
return self._maxsize – self._sem._semlock._get_value()
Queue.empty() 若是隊列爲空,返回True, 反之False
Queue.full() 若是隊列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間
Queue.get_nowait() 至關Queue.get(False)
Queue.put(item) 阻塞式寫入隊列,timeout等待時間
Queue.put_nowait(item) 至關Queue.put(item, False)
管道,顧名思義,一端發一端收。
Pipe能夠是單向(half-duplex),也能夠是雙向(duplex)。咱們經過mutiprocessing.Pipe(duplex=False)建立單向管道 (默認爲雙向)。一個進程從PIPE一端輸入對象,而後被PIPE另外一端的進程接收,單向管道只容許管道一端的進程輸入,而雙向管道則容許從兩端輸入。
用一個實例來感覺一下:
from multiprocessing import Process, Pipe class Consumer(Process): def __init__(self, pipe): Process.__init__(self) self.pipe = pipe def run(self): self.pipe.send('Consumer Words') print 'Consumer Received:', self.pipe.recv() class Producer(Process): def __init__(self, pipe): Process.__init__(self) self.pipe = pipe def run(self): print 'Producer Received:', self.pipe.recv() self.pipe.send('Producer Words') if __name__ == '__main__': pipe = Pipe() p = Producer(pipe[0]) c = Consumer(pipe[1]) p.daemon = c.daemon = True p.start() c.start() p.join() c.join() print 'Ended!'
在這裏聲明瞭一個默認爲雙向的管道,而後將管道的兩端分別傳給兩個進程。兩個進程互相收發。觀察一下結果:
Producer Received: Consumer Words
Consumer Received: Producer Words
Ended!
在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。
Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程來它。
在這裏須要瞭解阻塞和非阻塞的概念。
阻塞和非阻塞關注的是程序在等待調用結果(消息,返回值)時的狀態。
阻塞即要等到回調結果出來,在有結果以前,當前進程會被掛起。
Pool的用法有阻塞和非阻塞兩種方式。非阻塞即爲添加進程後,不必定非要等到改進程執行完就添加其餘進程運行,阻塞則相反。
現用一個實例感覺一下非阻塞的用法:
from multiprocessing import Lock, Pool import time def function(index): print 'Start process: ', index time.sleep(3) print 'End process', index if __name__ == '__main__': pool = Pool(processes=3) for i in xrange(4): pool.apply_async(function, (i,)) print "Started processes" pool.close() pool.join() print "Subprocess done."
在這裏利用了apply_async方法,即非阻塞。
運行結果:
Started processes Start process: Start process: 0 1 Start process: 2 End processEnd process 0 1 Start process: 3 End process 2 End process 3 Subprocess done.
能夠發如今這裏添加三個進程進去後,立馬就開始執行,不用非要等到某個進程結束後再添加新的進程進去。
下面再看看阻塞的用法:
from multiprocessing import Lock, Pool import time def function(index): print 'Start process: ', index time.sleep(3) print 'End process', index if __name__ == '__main__': pool = Pool(processes=3) for i in xrange(4): pool.apply(function, (i,)) print "Started processes" pool.close() pool.join() print "Subprocess done."
在這裏只須要把apply_async改爲apply便可。
運行結果以下:
Start process: 0 End process 0 Start process: 1 End process 1 Start process: 2 End process 2 Start process: 3 End process 3 Started processes Subprocess done.
這樣一來就好理解了吧?
下面對函數進行解釋:
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的。
close() 關閉pool,使其不在接受新的任務。
terminate() 結束工做進程,不在處理未完成的任務。
join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate以後使用。
固然每一個進程能夠在各自的方法返回一個結果。apply或apply_async方法能夠拿到這個結果並進一步進行處理。
from multiprocessing import Lock, Pool import time def function(index): print 'Start process: ', index time.sleep(3) print 'End process', index return index if __name__ == '__main__': pool = Pool(processes=3) for i in xrange(4): result = pool.apply_async(function, (i,)) print result.get() print "Started processes" pool.close() pool.join() print "Subprocess done."
運行結果:
Start process: 0 End process 0 0 Start process: 1 End process 1 1 Start process: 2 End process 2 2 Start process: 3 End process 3 3 Started processes Subprocess done.
另外還有一個很是好用的map方法。
若是你如今有一堆數據要處理,每一項都須要通過一個方法來處理,那麼map很是適合。
好比如今你有一個數組,包含了全部的URL,而如今已經有了一個方法用來抓取每一個URL內容並解析,那麼能夠直接在map的第一個參數傳入方法名,第二個參數傳入URL數組。
如今咱們用一個實例來感覺一下:
from multiprocessing import Pool import requests from requests.exceptions import ConnectionError def scrape(url): try: print requests.get(url) except ConnectionError: print 'Error Occured ', url finally: print 'URL ', url, ' Scraped' if __name__ == '__main__': pool = Pool(processes=3) urls = [ 'https://www.baidu.com', 'http://www.meituan.com/', 'http://blog.csdn.net/', 'http://xxxyxxx.net' ] pool.map(scrape, urls)
在這裏初始化一個Pool,指定進程數爲3,若是不指定,那麼會自動根據CPU內核來分配進程數。
而後有一個連接列表,map函數能夠遍歷每一個URL,而後對其分別執行scrape方法。
運行結果:
<Response [403]> URL http://blog.csdn.net/ Scraped <Response [200]> URL https://www.baidu.com Scraped Error Occured http://xxxyxxx.net URL http://xxxyxxx.net Scraped <Response [200]> URL http://www.meituan.com/ Scraped
多進程multiprocessing相比多線程功能強大太多