Multiprocessing

和 threading 的比較 

多進程 Multiprocessing 和多線程 threading 相似, 他們都是在 python 中用來並行運算的. 不過既然有了 threading, 爲何 Python 還要出一個 multiprocessing 呢? 緣由很簡單, 就是用來彌補 threading 的一些劣勢, 好比在 threading 教程中提到的GIL.python

使用 multiprocessing 也很是簡單, 若是對 threading 有必定了解的朋友, 大家的享受時間就到了. 由於 python 把 multiprocessing 和 threading 的使用方法作的幾乎差很少. 這樣咱們就更容易上手. 也更容易發揮你電腦多核系統的威力了!git

A:添加進程 Process

導入線程進程標準模塊 

import multiprocessing as mp
import threading as td

定義一個被線程和進程調用的函數 

def job(a,d):
    print('aaaaa')

建立線程和進程 

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))

注意:Thread和Process的首字母都要大寫,被調用的函數沒有括號,被調用的函數的參數放在args(…)中github

分別啓動線程和進程多線程

t1.start()
p1.start()

分別鏈接線程和進程app

t1.join()
p1.join()

完整的線程和進程建立使用對比代碼 

import multiprocessing as mp
import threading as td

def job(a,d):
    print('aaaaa')

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()

從上面的使用對比代碼能夠看出,線程和進程的使用方法類似async

運用 

在運用時須要添加上一個定義main函數的語句函數

if __name__=='__main__':

完整的應用代碼:工具

import multiprocessing as mp

def job(a,d):
    print('aaaaa')

if __name__=='__main__':
    p1 = mp.Process(target=job,args=(1,2))
    p1.start()
    p1.join()

運行環境要在terminal環境下,可能其餘的編輯工具會出現運行結束後沒有打印結果,在terminal中的運行後打印的結果爲:學習

aaaaa

B:存儲進程輸出 Queue

學習資料:spa

Queue的功能是將每一個核或線程的運算結果放在隊裏中, 等到每一個線程或核運行完畢後再從隊列中取出結果, 繼續加載運算。緣由很簡單, 多線程調用的函數不能有返回值, 因此使用Queue存儲多個線程運算的結果

把結果放在 Queue 裏 

定義一個被多線程調用的函數,q 就像一個隊列,用來保存每次函數運行的結果

#該函數沒有返回值!!!
def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    #queue

主函數 

定義一個多線程隊列,用來存儲結果

if __name__=='__main__':
    q = mp.Queue()

定義兩個線程函數,用來處理同一個任務, args 的參數只要一個值的時候,參數後面須要加一個逗號,表示args是可迭代的,後面可能還有別的參數,不加逗號會出錯

p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))

分別啓動、鏈接兩個線程

p1.start()
p2.start()
p1.join()
p2.join()

上面是分兩批處理的,因此這裏分兩批輸出,將結果分別保存

res1 = q.get()
res2 = q.get()

打印最後的運算結果

print(res1+res2)

完整的代碼 

import multiprocessing as mp

def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    #queue

if __name__=='__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job,args=(q,))
    p2 = mp.Process(target=job,args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1+res2)

運行的時候仍是要在terminal中,最後運行結果爲

499667166000

C:效率對比 threading & multiprocessing

學習資料:

上篇講了多進程/多核的運算,此次咱們來對比下多進程,多線程和什麼都不作時的消耗時間,看看哪一種方式更有效率。

建立多進程 multiprocessing 

和上節同樣,首先import multiprocessing並定義要實現的job(),同時爲了容易比較,咱們將計算的次數增長到1000000

import multiprocessing as mp

def job(q):
    res = 0
    for i in range(1000000):
        res += i + i**2 + i**3
    q.put(res) # queue

由於多進程是多核運算,因此咱們將上節的多進程代碼命名爲multicore()

def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:',res1 + res2)

建立多線程 multithread 

接下來建立多線程程序,建立多線程和多進程有不少類似的地方。首先import threading而後定義multithread()完成一樣的任務

import threading as td

def multithread():
    q = mp.Queue() # thread可放入process一樣的queue中
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)

建立普通函數 

最後咱們定義最普通的函數。注意,在上面例子中咱們創建了兩個進程或線程,均對job()進行了兩次運算,因此在normal()中咱們也讓它循環兩次

def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i**2 + i**3
    print('normal:', res)

運行時間 

最後,爲了對比各函數運行時間,咱們須要import time, 而後依次運行定義好函數:

import time

if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore()
    print('multicore time:', time.time() - st2)

大功告成,下面咱們來看下實際運行對比。

結果對比 

"""
# range(1000000)
('normal:', 499999666667166666000000L)
('normal time:', 1.1306169033050537)
('thread:', 499999666667166666000000L)
('multithread time:', 1.3054230213165283)
('multicore:', 499999666667166666000000L)
('multicore time:', 0.646507978439331)
"""

普通/多線程/多進程的運行時間分別是1.131.30.64秒。 咱們發現多核/多進程最快,說明在同時間運行了多個任務。 而多線程的運行時間竟然比什麼都不作的程序還要慢一點,說明多線程仍是有必定的短板的。 戳這裏查看「多線程的短板是什麼」。

咱們將運算次數加十倍,再來看看三種方法的運行時間:

"""
# range(10000000)
('normal:', 4999999666666716666660000000L)
('normal time:', 40.041773080825806)
('thread:', 4999999666666716666660000000L)
('multithread time:', 41.777158975601196)
('multicore:', 4999999666666716666660000000L)
('multicore time:', 22.4337899684906)
"""

此次運行時間依然是 多進程 < 普通 < 多線程,由此咱們能夠清晰地看出哪一種方法更有效率。

D:進程池 Pool

此次咱們講進程池Pool。 進程池就是咱們將所要運行的東西,放到池子裏,Python會自行解決多進程的問題

首先import multiprocessing和定義job()

import multiprocessing as mp

def job(x):
    return x*x

進程池 Pool() 和 map() 

而後咱們定義一個Pool

pool = mp.Pool()

有了池子以後,就可讓池子對應某一個函數,咱們向池子裏丟數據,池子就會返回函數返回的值。 Pool和以前的Process的不一樣點是丟向Pool的函數有返回值,而Process的沒有返回值。

接下來用map()獲取結果,在map()中須要放入函數和須要迭代運算的值,而後它會自動分配給CPU核,返回結果

res = pool.map(job, range(10))

讓咱們來運行一下

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()

運行結果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

自定義核數量 

咱們怎麼知道Pool是否真的調用了多個核呢?咱們能夠把迭代次數增大些,而後打開CPU負載看下CPU運行狀況

打開CPU負載(Mac):活動監視器 > CPU > CPU負載(單擊一下便可)

Pool默認大小是CPU的核數,咱們也能夠經過在Pool中傳入processes參數便可自定義須要的核數量,

def multicore():
    pool = mp.Pool(processes=3) # 定義CPU核數量爲3
    res = pool.map(job, range(10))
    print(res)

apply_async() 

Pool除了map()外,還有能夠返回結果的方式,那就是apply_async().

apply_async()中只能傳遞一個值,它只會放入一個核進行運算,可是傳入值時要注意是可迭代的,因此在傳入值後須要加逗號, 同時須要用get()方法獲取返回值

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到結果
    print(res.get())

運行結果;

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async()

用 apply_async() 輸出多個結果 

那麼如何用apply_async()輸出多個迭代呢?

咱們在apply_async()中多傳入幾個值試試

res = pool.apply_async(job, (2,3,4,))

結果會報錯:

TypeError: job() takes exactly 1 argument (3 given)

apply_async()只能輸入一組參數。

在此咱們將apply_async() 放入迭代器中,定義一個新的multi_res

multi_res = [pool.apply_async(job, (i,)) for i in range(10)]

一樣在取出值時須要一個一個取出來

print([res.get() for res in multi_res])

合併代碼

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到結果
    print(res.get())
    # 迭代器,i=0時apply一次,i=1時apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 從迭代器中取出
    print([res.get() for res in multi_res])

運行結果

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()
4 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res

能夠看出在apply用迭代器的獲得的結果和用map獲得的結果是同樣的

總結 

  1. Pool默認調用是CPU的核數,傳入processes參數可自定義CPU核數
  2. map() 放入迭代參數,返回多個結果
  3. apply_async()只能放入一組參數,並返回一個結果,若是想獲得map()的效果須要經過迭代

相關文章
相關標籤/搜索