目錄html
小子今天想來談談「並行計算」,做爲一個非科班人員,我爲何去搗鼓這麼一個在科班裏也比較專業的問題了。這就要說下我前幾天作的一個做業了,當時我用python寫了個程序,結果運行了一天,這個速度可以讓我愁了,我還怎麼優化,怎麼交做業啊。因而小子就去各大論壇尋丹問藥了,終於讓我發現能夠用並行計算來最大化壓榨電腦的CPU,提高計算效率,並且python裏有multiprocessing這個庫能夠提供並行計算接口,因而小子花1天時間改進程序,終於在規定時間內作出了本身滿意的結果,上交了做業。以後,小子對並行計算充滿了興趣,因而又從新在Google上游歷了一番,大體弄清了GPU、CPU、進程、線程、並行計算、分佈式計算等概念,也把python的multiprocessing耍了一遍,如今小子也算略有心得了,因此來此立碑,以示後來遊客。
小子本文分爲四部分,一是大數據時代現狀,其二是面對挑戰的方法,而後是用python寫並行程序,最後是multiprocessing實戰。python
當前咱們正處於大數據時代,天天咱們會經過手機、電腦等設備不斷的將本身的數據傳到互聯網上。據統計,YouTube上每分鐘就會增長500多小時的視頻,面對如此海量的數據,如何高效的存儲與處理它們就成了當前最大的挑戰。
但在這個對硬件要求愈來愈高的時代,CPU卻彷佛並不這麼給力了。自2013年以來,處理器頻率的增加速度逐漸放緩了,目前CPU的頻率主要分佈在3~4GHz。這個也是能夠理解的,畢竟摩爾定律都生效了50年了,若是它老人家還如此給力,那咱們之後就只要靜等處理器頻率提高,什麼計算問題在將來那都不是話下了。實際上CPU與頻率是於能耗密切相關的,咱們以前能夠經過加電壓來提高頻率,但當能耗太大,散熱問題就沒法解決了,因此頻率就逐漸穩定下來了,而Intel與AMD等大製造商也將目標轉向了多核芯片,目前普通桌面PC也達到了4~8核。git
我們有了多核CPU,以及大量計算設備,那咱們怎麼來用它們應對大數據時代的挑戰了。那就要提到下面的方法了。github
並行(parallelism)是指程序運行時的狀態,若是在同時刻有多個「工做單位」運行,則所運行的程序處於並行狀態。圖一是並行程序的示例,開始並行後,程序從主線程分出許多小的線程並同步執行,此時每一個線程在各個獨立的CPU進行運行,在全部線程都運行完成以後,它們會從新合併爲主線程,而運行結果也會進行合併,並交給主線程繼續處理。
編程
何時用並行計算:windows
GPU即圖形處理器核心(Graphics Processing Unit),它是顯卡的心臟,顯卡上還有顯存,GPU與顯存相似與CPU與內存。
GPU與CPU有不一樣的設計目標,CPU須要處理全部的計算指令,因此它的單元設計得至關複雜;而GPU主要爲了圖形「渲染」而設計,渲染即進行數據的列處理,因此GPU天生就會爲了更快速地執行復雜算術運算和幾何運算的。
GPU相比與CPU有以下優點:數組
GPU目前在處理深度學習上用得十分多,英偉達(NVIDIA)目前也花大精力去開發適合深度學習的GPU。如今上百層的神經網絡已經很常見了,面對如此龐大的計算量,CPU可能須要運算幾天,而GPU卻能夠在幾小時內算完,這個差距已經足夠別人比咱們多打幾個比賽,多發幾篇論文了。緩存
說到分佈式計算,咱們就先說下下Google的3篇論文,原文能夠直接點連接去下載:安全
Google在2003~2006年發表了這三篇論文以後,一時之間引發了轟動,可是Google並無將MapReduce開源。在這種狀況下Hadoop就出現了,Doug Cutting在Google的3篇論文的理論基礎上開發了Hadoop,此後Hadoop不斷走向成熟,目前Facebook、IBM、ImageShack等知名公司都在使用Hadoop運行他們的程序。網絡
分佈式計算的優點:
能夠集成諸多低配的計算機(成千上萬臺)進行高併發的儲存與計算,從而達到與超級計算機媲美的處理能力。
在介紹如何使用python寫並行程序以前,咱們須要先補充幾個概念,分別是進程、線程與全局解釋器鎖(Global Interpreter Lock, GIL)。
進程(process):
線程(threading):
進程與線程有兩個主要的不一樣點,其一是進程包含線程,線程使用進程的內存空間,固然線程也有本身的私有空間,但容量小;其二是進程有各自獨立的內存空間,互不干擾,而線程是共享內存空間。
圖三展現了進程、線程與CPU之間的關係。在圖三中,進程一與進程二都含有3個線程,CPU會按照線程來分配任務,如圖中4個CPU同時執行前4個線程,後兩個標紅線程處於等待狀態,在CPU運行完當前線程時,等待的線程會被喚醒並進入CPU執行。一般,進程含有的線程數越多,則它佔用CPU的時間會越長。
GIL是計算機程序設計語言解釋器用於同步線程的一種機制,它使得任什麼時候刻僅有一個線程在執行。即使在多核心處理器上,使用 GIL 的解釋器也只容許同一時間執行一個線程。Python的Cpython解釋器(廣泛使用的解釋器)使用GIL,在一個Python解釋器進程內能夠執行多線程程序,但每次一個線程執行時就會得到全局解釋器鎖,使得別的線程只能等待,因爲GIL幾乎釋放的同時就會被原線程立刻得到,那些等待線程可能剛喚醒,因此常常形成線程不平衡享受CPU資源,此時多線程的效率比單線程還要低下。在python的官方文檔裏,它是這樣解釋GIL的:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
能夠說它的初衷是很好的,爲了保證線程間的數據安全性;可是隨着時代的發展,GIL卻成爲了python並行計算的最大障礙,但這個時候GIL已經遍及CPython的各個角落,修改它的工做量太大,特別是對這種開源性的語音來講。但幸虧GIL只鎖了線程,咱們能夠再新建解釋器進程來實現並行,那這就是multiprocessing的工做了。
multiprocessing是python裏的多進程包,經過它,咱們能夠在python程序裏創建多進程來執行任務,從而進行並行計算。官方文檔以下所述:
The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
咱們接下來介紹下multiprocessing的各個接口:
multiprocessing.Process(target=None, args=()) target: 能夠被run()調用的函數,簡單來講就是進程中運行的函數 args: 是target的參數 process的方法: start(): 開始啓動進程,在建立process以後執行 join([timeout]):阻塞目前父進程,直到調用join方法的進程執行完或超時(timeout),才繼續執行父進程 terminate():終止進程,不論進程有沒有執行完,儘可能少用。
示例1
from multiprocessing import Process def f(name): print 'hello', name if __name__ == '__main__': p = Process(target=f, args=('bob',)) # p進程執行f函數,參數爲'bob',注意後面的「,」 p.start() # 進程開始 p.join() # 阻塞主線程,直至p進程執行結束
class multiprocessing.Pool([processes]) processes是進程池中的進程數,默認是本機的cpu數量 方法: apply(func[, args[, kwds]])進程池中的進程進行func函數操做,操做時會阻塞進程,直至生成結果。 apply_async(func[, args[, kwds[, callback]]])與apply相似,可是不會阻塞進程 map(func, iterable[, chunksize])進程池中的進程進行映射操做 map_async(func, iterable[, chunksize[, callback]]) imap(func, iterable[, chunksize]):返回有序迭代器 imap_unordered(func, iterable[, chunsize]):返回無序迭代器 close():禁止進程池再接收任務 terminate():強行終止進程池,不管是否有任務在執行 join():在close()或terminate()以後進行,等待進程退出
示例2
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': p = Pool(5) # 建立有5個進程的進程池 print(p.map(f, [1, 2, 3])) # 將f函數的操做給進程池
multiprocessing.Pipe([duplex]) 返回兩個鏈接對象(conn1, conn2),兩個鏈接對象分別訪問pipe的頭和尾,進行讀寫操做 Duplex: True(default),建立的pipe是雙向的,也即兩端均可以進行讀寫;若爲False,則pipe是單向的,僅能夠在一端讀,另外一端寫,此時與Queue相似。 multiprocessing.Queue([maxsize]) qsize():返回queue中member數量 empty():若是queue是空的,則返回true full():若是queue中member數量達到maxsize,則返回true put(obj):將一個object放入到queue中 get():從隊列中取出一個object並將它從queue中移除,FIFO原則 close():關閉隊列,並將緩存的object寫入pipe
示例
from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': pool = Pool(processes=4) # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print result.get(timeout=1) # prints "100" unless your computer is *very* slow print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print it.next() # prints "0" print it.next() # prints "1" print it.next(timeout=1) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print result.get(timeout=1) # raises multiprocessing.TimeoutError
當一個進程得到(acquire)鎖以後,其它進程在想得到鎖就會被禁止,能夠保護數據,進行同步處理。 acquire(block=True, timeout=None):嘗試獲取一個鎖,若是block爲true,則會在得到鎖以後阻止其它進程再獲取鎖。 release():釋放鎖
共享內存一般須要配合進程鎖來處理,保證處理的順序相同。
multiprocessing.Value(typecode_or_type, *args[, lock]) 返回一個ctype對象, 建立c = Value(‘d’, 3.14),調用c.value() multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True) 返回一個ctype數組,只能是一維的 Array(‘i’, [1, 2, 3, 4])
Type code | C Type | Python Type | Minimum size in bytes |
---|---|---|---|
'b' |
signed char | int | 1 |
'B' |
unsigned char | int | 1 |
'u' |
Py_UNICODE | Unicode character | 2 |
'h' |
signed short | int | 2 |
'H' |
unsigned short | int | 2 |
'i' |
signed int | int | 2 |
'I' |
unsigned int | int | 2 |
'l' |
signed long | int | 4 |
'L' |
unsigned long | int | 4 |
'q' |
signed long long | int | 8 |
'Q' |
unsigned long long | int | 8 |
'f' |
float | float | 4 |
'd' |
double | float | 8 |
multiprocessing.active_children():返回當前進程的全部子進程 multiprocessing.cpu_count():返回本計算機的cpu數量 multiprocessing.current_process():返回當前進程
windows平臺另需注意:
process、lock與value嘗試:
import multiprocessing as mp import time def job(v, num, l): l.acquire() # 鎖住 for _ in range(5): time.sleep(0.1) v.value += num # 獲取共享內存 print(v.value) l.release() # 釋放 def multicore(): l = mp.Lock() # 定義一個進程鎖 #l = 1 v = mp.Value('i', 0) # 定義共享內存 p1 = mp.Process(target=job, args=(v,1,l)) # 須要將lock傳入 p2 = mp.Process(target=job, args=(v,3,l)) p1.start() p2.start() p1.join() p2.join() if __name__=='__main__': multicore()
上述代碼即對共享內存疊加5次,p1進程每次疊加1,p2進程每次疊加3,爲了不p1與p2在運行時搶奪共享數據v,在進程執行時鎖住了該進程,從而保證了執行的順序。我測試了三個案例:
能夠發現,沒鎖的狀況下調整join能夠取得與加鎖相似的結果,這是由於join便是阻塞主進程,直至當前進程結束纔回到主進程,若將p1.join()放到p1.start()後面,則會立刻阻塞主進程,使得p2要稍後纔開始,這與鎖的效果同樣。
若是如上述代碼所示,p1.join()在p2.start()後面,雖然是p1先join(),但這時只是阻塞了主進程,而p2是兄弟進程,它已經開始了,p1就不能阻止它了,因此這時若是沒鎖的話p1與p2就是並行了,運行時間就是一半,但由於它們爭搶共享變量,因此輸出就變得不肯定了。
pool
import multiprocessing as mp #import pdb def job(i): return i*i def multicore(): pool = mp.Pool() #pdb.set_trace() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get得到結果 print(res.get()) # 迭代器,i=0時apply一次,i=1時apply一次等等 multi_res = [pool.apply_async(job, (i,)) for i in range(10)] # 從迭代器中取出 print([res.get() for res in multi_res]) multicore()
pool其實很是好用,特別是map與apply_async。經過pool這個接口,咱們只有指定能夠並行的函數與函數參數列表,它就能夠自動幫咱們建立多進程池進行並行計算,真的不要太方便。pool特別適用於數據並行模型,假如是消息傳遞模型那仍是建議本身經過process來創立進程吧。
小子此次主要是按本身的理解把並行計算理了下,對進程、線程、CPU之間的關係作了下闡述,並把python的multiprocessing這個包拎了拎,我的感受這個裏面還大有學問,上次我一個師兄用python的process來控制單次迭代的運行時間(運行超時就跳過此次迭代,進入下一次迭代)也是讓我漲了見識,後面還要多多學習啊。
感謝您花費寶貴的時間閱讀到這裏,但願能有所收穫,也歡迎在評論區進行交流。
推薦好文:
multiprocessing官方文檔
python多進程的理解 multiprocessing Process join run(推薦好文)
多進程 Multiprocessing