Python之路(第四十六篇)多種方法實現python線程池(threadpool模塊\multiprocessing.dummy模塊\concurrent.futures模塊)

1、線程池

好久(python2.6)以前python沒有官方的線程池模塊,只有第三方的threadpool模塊,html

以後再python2.6加入了multiprocessing.dummy 做爲可使用線程池的方式,python

在python3.2(2012年)以後加入了concurrent.futures模塊(python3.1.5也有,可是python3.1.5發佈時間晚於python3.2一年多),這個模塊是python3中自帶的模塊,可是python2.7以上版本也能夠安裝使用。編程

下面分別介紹下各個線程池模塊的使用api

使用環境python3.6.4,win7 多線程

threadpool模塊(上古時期--python2.6以前)

因爲threadpool模塊是第三方模塊須要進行安裝,架構

安裝app

pip install threadpool 或者在pycharm--settings--Project interpreter中安裝python2.7

 

使用介紹異步

(1)引入threadpool模塊async

(2)定義線程函數

(3)建立線程 池threadpool.ThreadPool()

(4)建立須要線程池處理的任務即threadpool.makeRequests()

(5)將建立的多個任務put到線程池中,threadpool.putRequest

(6)等到全部任務處理完畢theadpool.pool()

代碼實例

import threadpool, time
​
name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
             'Amuro Namie',' Sarah Brightman']
​
​
def Say_hello(str):
    print("Hello ", str)
    time.sleep(2)
​
​
def main():
    start_time = time.time()
    #第一步 建立線程池,線程數爲4:
    pool = threadpool.ThreadPool(4)
    # 第二步建立線程請求,包涵調用的函數、參數和回調函數:
    # requests = threadpool.makeRequests(func, args_list, call_back)
    
    requests = threadpool.makeRequests(Say_hello, name_list)
    # 第三步將全部要運行多線程的請求扔進線程池:
    [pool.putRequest(req) for req in requests]
    # 第四步等待全部的線程完成工做後退出:
    pool.wait()
    print('用時共: %s second' % (time.time() - start_time))
​
​
​
if __name__ == '__main__':
    main()
​

  

輸出

Hello  Satomi Ishihara
Hello  Aragaki Yui
Hello  Nainaiwei Hashimoto
Hello  HIKARU UTADA
Hello  Mai Kuraki
Hello  Nozomi Sasaki
Hello  Amuro Namie
Hello   Sarah Brightman
用時共: 4.0012288093566895 second

  

說明:makeRequests存放的是要開啓多線程的函數,以及函數相關參數和回調函數,其中回調函數能夠不寫(默認是無),也就是說makeRequests只須要2個參數就能夠運行。

 

multiprocessing.dummy(中古時期,python3.2起)

從python3.2起,可使用multiprocessing.dummy 建立線程池,注意

from  multiprocessing  import  Pool  #這裏引入的是進程池
from multiprocessing.dummy import Pool as ThreadPool  #這裏引入的纔是線程池

  

multiprocessing.dummy模塊與multiprocessing模塊的區別:dummy模塊是多線程,而multiprocessing是多進程,api都是通用的。簡單地說,multiprocessing.dummy是multiprocessing多進程模塊複製的一個多線程模塊,API都是通用的。

這裏的多線程也是受到它受到全局解釋器鎖(GIL)的限制,而且一次只有一個線程能夠執行附加到CPU的操做。

 

使用介紹

使用有四種方式:apply_async、apply、map_async、map。

 

其中apply_async和map_async是異步的,也就是啓動進程函數以後會繼續執行後續的代碼不用等待進程函數返回。apply_async和map_async方式提供了一寫獲取進程函數狀態的函數:ready()successful()get()。

PS:join()語句要放在close()語句後面。

具體能夠參考

Python之路(第四十篇)進程池

 

代碼實例


import  time
from multiprocessing.dummy import Pool as ThreadPool
​
name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
             'Amuro Namie',' Sarah Brightman']
​
​
def Say_hello(str):
    print("Hello ", str)
    time.sleep(2)
​
​
def main():
    start_time = time.time()
    #第一步 建立線程池,線程數爲4:
    pool = ThreadPool(4)
    # 第二步用map方法執行
    # pool.map(func, args)  ,注意這裏的map方法自帶pool.close()和pool.join()方法,等待全部子線程執行完
    pool.map(Say_hello,name_list)
    print('用時共: %s second' % (time.time() - start_time))
​
​
if __name__ == '__main__':
    main()

  


輸出結果

Hello  Satomi Ishihara
Hello  Aragaki Yui
Hello  Nainaiwei Hashimoto
Hello  HIKARU UTADA
Hello  Mai Kuraki
Hello  Nozomi Sasaki
Hello  Amuro Namie
Hello   Sarah Brightman
用時共: 4.004228830337524 second

  

 

concurrent.futures模塊(從python3.2開始自帶)

concurrent.futures模塊,能夠利用multiprocessing實現真正的平行計算。

核心原理是:concurrent.futures會以子進程的形式,平行的運行多個python解釋器,從而令python程序能夠利用多核CPU來提高執行速度。因爲子進程與主解釋器相分離,因此他們的全局解釋器鎖也是相互獨立的。每一個子進程都可以完整的使用一個CPU內核。

 

使用介紹

模塊主要包含下面兩個類:

  1. ThreadPoolExecutor

  2. ProcessPoolExecutor

也就是對 threading 和 multiprocessing 進行了高級別的抽象, 暴露出統一的接口, 方便開發者使用。

可使用 ThreadPoolExecutor 來進行多線程編程,ProcessPoolExecutor 進行多進程編程,二者實現了一樣的接口,這些接口由抽象類 Executor 定義。 這個模塊提供了兩大類型,一個是執行器類 Executor,另外一個是 Future 類。

ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用

其實 concurrent.futures 底層仍是用的 threading 和 multiprocessing 這兩個模塊, 至關於在這上面又封裝了一層, 因此速度上會慢一點, 這個是架構和接口實現上的取捨形成的。

 

基類Executor

#submit(fn, *args, **kwargs)
異步提交任務
​
#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操做,map方法接收兩個參數,第一個爲要執行的函數,第二個爲一個序列,會對序列中的每一個元素都執行這個函數,返回值爲執行結果組成的生成器
​
#shutdown(wait=True) 
至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前
能夠經過 with 語句來避免顯式調用本方法。with 語句會用 wait=True 的默認參數調用 Executor.shutdown() 方法。
​
​
#result(timeout=None)
取得結果
submit()方法,這個方法的做用是提交一個可執行的回調task,並返回一個future實例。future可以使用done()方法判斷該任務是否結束,done()方法是不阻塞的,使用result()方法能夠獲取任務的返回值,這個方法是阻塞的。
​
​
#add_done_callback(fn)
回調函數
​
# done()
判斷某一個線程是否完成
​
# cancle()
取消某個任務

  


 

代碼示例

進程池:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
​
import os,time
def task(n):
    print('%s is runing進程號' %os.getpid())
    time.sleep(2)
    return n**2
​
​
def main():
    start_time = time.time()
    executor=ProcessPoolExecutor(max_workers=3)
​
    futures=[]
    for i in range(10):
        future=executor.submit(task,i)  #這裏使用submit提交進程
        futures.append(future)
    executor.shutdown(True)
    print('*'*20)
    for future in futures:
        print(future.result())
    print('用時共: %s second' % (time.time() - start_time))
​
if __name__ == '__main__':
    main()

  

輸出結果

10292 is runing進程號
12516 is runing進程號
9664 is runing進程號
10292 is runing進程號
12516 is runing進程號
9664 is runing進程號
10292 is runing進程號
12516 is runing進程號
9664 is runing進程號
10292 is runing進程號
********************
0
1
4
9
16
25
36
49
64
81
用時共: 8.176467418670654 second

  

也可使用map方法提交進程

示例

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
​
import os,time
def task(n):
    print('%s is runing進程號' %os.getpid())
    time.sleep(2)
    return n**2
​
​
def main():
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=3) as executor:
        futures = executor.map(task, [i for i in range(10)])
    print('*'*20)
    for future in futures:
        print(future)  #無需再次使用result()方法獲取結果
    print('用時共: %s second' % (time.time() - start_time))
​
if __name__ == '__main__':
    main()

  

輸出結果

1504 is runing進程號
12644 is runing進程號
7732 is runing進程號
1504 is runing進程號
12644 is runing進程號
7732 is runing進程號
1504 is runing進程號
7732 is runing進程號
12644 is runing進程號
1504 is runing進程號
********************
0
1
4
9
16
25
36
49
64
81
用時共: 8.171467304229736 second

  

 

分析:map方法返回的results列表是有序的,順序和*iterables迭代器的順序一致,這裏也無需再次使用result()方法獲取結果。

這裏使用with操做符,使得當任務執行完成以後,自動執行shutdown函數,而無需編寫相關釋放代碼。

 

map()與submit()使用場景

經常使用的方法是 submit(), 若是要提交任務的函數是同樣的, 就能夠簡化成 map(), 可是若是提交的函數是不同的, 或者執行的過程當中可能出現異常, 就要使用到 submit(), 由於使用 map() 在執行過程當中若是出現異常會直接拋出錯誤, 而 submit() 則會分開處理。

 

 

 

線程池

用法與ProcessPoolExecutor相同

 

示例

 

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
​
import os,time
def task(n):
    print('%s is runing進程號' %os.getpid())
    time.sleep(2)
    return n**2
​
​
def main():
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = executor.map(task, [i for i in range(10)])
    print('*'*20)
    for future in futures:
        print(future)
    print('用時共: %s second' % (time.time() - start_time))
​
if __name__ == '__main__':
    main()

  

輸出結果

7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
********************
0
1
4
9
16
25
36
49
64
81
用時共: 8.001457929611206 second

  

分析:注意全部的進程號都是同樣的,這裏是開啓的多線程,因此進程號是同樣的

示例二


import  time
from concurrent.futures import ThreadPoolExecutor
​
name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
             'Amuro Namie',' Sarah Brightman']
​
​
def say_hello(str):
    print("Hello ", str)
    time.sleep(2)
​
​
def main():
    start_time = time.time()
    # 用map方法執行
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = executor.map(say_hello,name_list)
    print('用時共: %s second' % (time.time() - start_time))
​
​
if __name__ == '__main__':
    main()
​

  

 

輸出結果

Hello  Satomi Ishihara
Hello  Aragaki Yui
Hello  Nainaiwei Hashimoto
Hello  HIKARU UTADA
Hello  Mai Kuraki
Hello  Nozomi Sasaki
Hello  Amuro Namie
Hello   Sarah Brightman
用時共: 4.0022289752960205 second

  回調函數的使用

import  time
from concurrent.futures import ThreadPoolExecutor

name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
             'Amuro Namie',' Sarah Brightman']


def say_hello(str):
    print("Hello ", str)
    time.sleep(2)
    return str



def call_back(res):
    res = res.result()  #獲取結果
    print(res,"長度是%s"%len(res))



def main():
    start_time = time.time()
    # 用submit方法執行
    executor=ThreadPoolExecutor(max_workers=4)

    for i in name_list:
        executor.submit(say_hello,i).add_done_callback(call_back)
    executor.shutdown(True)
    #這裏使用submit提交線程,使用add_done_callback()添加回調函數
    print('用時共: %s second' % (time.time() - start_time))


if __name__ == '__main__':
    main()

  輸出結果

Hello  Satomi Ishihara
Hello  Aragaki Yui
Hello  Nainaiwei Hashimoto
Hello  HIKARU UTADA
HIKARU UTADA 長度是12
Aragaki Yui 長度是11
Hello  Mai Kuraki
Hello  Nozomi Sasaki
Satomi Ishihara 長度是15
Nainaiwei Hashimoto 長度是19
Hello  Amuro Namie
Hello   Sarah Brightman
Nozomi Sasaki 長度是13
Amuro Namie 長度是11
 Sarah Brightman 長度是16
Mai Kuraki 長度是10
用時共: 4.0022289752960205 second
相關文章
相關標籤/搜索