Python初學——多進程Multiprocessing

1.1 什麼是 Multiprocessing

多線程在同一時間只能處理一個任務。html

可把任務平均分配給每一個核,而每一個核具備本身的運算空間。python

1.2 添加進程 Process

與線程相似,以下所示,可是該程序直接運行無結果,由於IDLE不支持多進程,在命令行終端運行纔有結果顯示多線程

import multiprocessing as mp def job(a,b): print('abc') if __name__=='__main__': p1=mp.Process(target=job,args=(1,2)) p1.start() p1.join()

1.3 存儲進程輸出 Queue

不知道爲何下面的這個程序能夠在IDLE中正常運行。首先定義了一個job函數做系列數學運算,而後將結果放到res中,在main函數運行,取出queue中存儲的結果再進行一次加法運算。app

import multiprocessing as mp def job(q): res=0 for i in range(1000): res+=i+i**2+i**3 q.put(res) if __name__ == '__main__': q=mp.Queue() p1 = mp.Process(target=job,args=(q,))#注意當參數只有一個時,應加上逗號
    p2 = mp.Process(target=job,args=(q,)) p1.start() p2.start() p1.join() p2.join() res1=q.get() res2=q.get() print(res1+res2)

 結果以下所示:async

 

1.4 效率比對 threading & multiprocessing

 在job函數中定義了數學運算,比較正常狀況、多線程和多進程分別的運行時間。函數

import multiprocessing as mp import threading as td import time def job(q): res = 0 for i in range(10000000): res += i+i**2+i**3 q.put(res) # queue

def multicore(): q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print('multicore:' , res1+res2) def normal(): res = 0 for _ in range(2):#線程或進程都構造了兩個,進行了兩次運算,因此這裏循環兩次
        for i in range(10000000): res += i+i**2+i**3
    print('normal:', res) def multithread(): q = mp.Queue() t1 = td.Thread(target=job, args=(q,)) t2 = td.Thread(target=job, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print('multithread:', res1+res2) if __name__ == '__main__': st = time.time() normal() st1= time.time() print('normal time:', st1 - st) multithread() st2 = time.time() print('multithread time:', st2 - st1) multicore() print('multicore time:', time.time()-st2)

 在視頻中的運行結果是多進程<正常<多線程,而個人運行結果爲下圖所示:ui

 

綜上,多核/多進程運行最快,說明在同時間運行了多個任務,而多線程卻不必定會比正常狀況下的運行來的快,這和多線程中的GIL有關。spa

1.5 進程池

進程池Pool,就是咱們將所要運行的東西,放到池子裏,Python會自行解決多進程的問題。命令行

import multiprocessing as mp def job(x): return x*x def multicore(): pool=mp.Pool(processes=2)#定義一個Pool,並定義CPU核數量爲2
    res=pool.map(job,range(10)) print(res) res=pool.apply_async(job,(2,)) print(res.get()) multi_res=[pool.apply_async(job,(i,)) for i in range(10)] print([res.get()for res in multi_res]) if __name__=='__main__': multicore() 

運行結果以下所示:線程

首先定義一個池子,有了池子以後,就可讓池子對應某一個函數,在上述代碼中定義的pool對應job函數。咱們向池子裏丟數據,池子就會返回函數返回的值。 Pool和以前的Process的不一樣點是丟向Pool的函數有返回值,而Process的沒有返回值。

接下來用map()獲取結果,在map()中須要放入函數和須要迭代運算的值,而後它會自動分配給CPU核,返回結果

 

咱們怎麼知道Pool是否真的調用了多個核呢?咱們能夠把迭代次數增大些,而後打開CPU負載看下CPU運行狀況

打開CPU負載(Mac):活動監視器 > CPU > CPU負載(單擊一下便可)

Pool默認大小是CPU的核數,咱們也能夠經過在Pool中傳入processes參數便可自定義須要的核數量。

Pool除了能夠用map來返回結果以外,還能夠用apply_async(),與map不一樣的是,只能傳遞一個值,只會放入一個核進行計算,可是傳入值時要注意是可迭代的,因此在傳入值後須要加逗號, 同時須要用get()方法獲取返回值。所對應的代碼爲:

res=pool.apply_async(job,(2,)) print(res.get())

運行結果爲4。

因爲傳入值是能夠迭代的,則咱們一樣可使用apply_async()來輸出多個結果。若是在apply_async()中輸入多個傳入值:

res = pool.apply_async(job, (2,3,4,))

結果會報錯:

TypeError: job() takes exactly 1 argument (3 given)

即apply_async()只能輸入一組參數。

在此咱們將apply_async()放入迭代器中,定義一個新的multi_res

multi_res = [pool.apply_async(job, (i,)) for i in range(10)]

一樣在取出值時須要一個一個取出來

print([res.get() for res in multi_res])

apply用迭代器的運行結果與map取出的結果相同。

note:
(1)Pool默認調用是CPU的核數,傳入processes參數可自定義CPU核數

(2)map() 放入迭代參數,返回多個結果

(3)apply_async()只能放入一組參數,並返回一個結果,若是想獲得map()的效果須要經過迭代

1.6 共享內存 shared memory

只有經過共享內存才能讓CPU之間進行交流。

經過Value將數據存儲在一個共享的內存表中。

import multiprocessing as mp value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14)

 

 其中,i和d表示數據類型。i爲帶符號的整型,d爲雙精浮點類型。更多數據類型可參考網址:https://docs.python.org/3/library/array.html

在多進程中有一個Array類,能夠和共享內存交互,來實現進程之間共享數據。

和numpy中的不一樣,這裏的Array只能是一維的,而且須要定義數據類型不然會報錯。

array = mp.Array('i', [1, 2, 3, 4])

1.7 進程鎖 Lock

首先是不加進程鎖的運行狀況,在下述代碼中定義了共享變量v,定義了兩個進程,都可對v進行操做。job函數的做用是每隔0.1s輸出一次累加num的值,累加值num在兩個進程中分別爲1和3。

import multiprocessing as mp import time def job(v,num): for _ in range(10): time.sleep(0.1)#暫停0.1s,讓輸出效果更明顯
        v.value+=num #v.value獲取共享變量值
        print(v.value) def multicore(): v=mp.Value('i',0)#定義共享變量
    p1=mp.Process(target=job,args=(v,1)) p2=mp.Process(target=job,args=(v,3)) p1.start() p2.start() p1.join() p2.join() if __name__=='__main__': multicore()

 運行結果以下所示:

能夠看到兩個進程互相搶佔共享內存v。

爲了解決上述不一樣進程搶共享資源的問題,咱們能夠用加進程鎖來解決。

首先須要定義一個進程鎖:

 l = mp.Lock() # 定義一個進程鎖

而後將進程鎖的信息傳入各個進程中

 p1 = mp.Process(target=job, args=(v,1,l)) # 須要將Lock傳入
 p2 = mp.Process(target=job, args=(v,3,l)) 

job()中設置進程鎖的使用,保證運行時一個進程的對鎖內內容的獨佔

def job(v, num, l): l.acquire() # 鎖住
    for _ in range(5): time.sleep(0.1) v.value += num # v.value獲取共享內存
        print(v.value) l.release() # 釋放

完整代碼:

def job(v, num, l): l.acquire() # 鎖住
    for _ in range(5): time.sleep(0.1) v.value += num # 獲取共享內存
        print(v.value) l.release() # 釋放

def multicore(): l = mp.Lock() # 定義一個進程鎖
    v = mp.Value('i', 0) # 定義共享內存
    p1 = mp.Process(target=job, args=(v,1,l)) # 須要將lock傳入
    p2 = mp.Process(target=job, args=(v,3,l)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()

運行結果以下所示:

能夠看到進程1運行完以後才運行進程2。

相關文章
相關標籤/搜索