原題 | PYTHON: A QUICK INTRODUCTION TO THE CONCURRENT.FUTURES MODULEhtml
做者 | MASNUNpython
原文 | masnun.com/2016/03/29/…git
譯者 | kbsc13("算法猿的成長"公衆號做者)github
聲明 | 翻譯是出於交流學習的目的,歡迎轉載,但請保留本文出於,請勿用做商業或者非法用途算法
concurrent.futures
是標準庫裏的一個模塊,它提供了一個實現異步任務的高級 API 接口。本文將經過一些代碼例子來介紹這個模塊常見的用法。編程
Executor
是一個抽象類,它有兩個很是有用的子類--ThreadPoolExecutor
和 ProcessPoolExecutor
。從命名就能夠知道,前者採用的是多線程,然後者使用多進程。下面將分別介紹這兩個子類,在給出的例子中,咱們都會建立一個線程池或者進程池,而後將任務提交到這個池子,這個池子將會分配可用的資源(線程或者進程)來執行給定的任務。bash
首先,先看看代碼:微信
from concurrent.futures import ThreadPoolExecutor
from time import sleep
# 定義須要執行的任務--休眠5秒後返回傳入的信息
def return_after_5_secs(message):
sleep(5)
return message
# 創建一個線程池,大小爲 3
pool = ThreadPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())
複製代碼
輸出結果:網絡
False
False
hello
複製代碼
這個代碼中首先建立了一個 ThreadPoolExecutor
對象--pool
,一般這裏默認線程數量是 5,但咱們指定線程池的線程數量是 3。接着就是調用 submit()
方法來把須要執行的任務,也就是函數,以及須要傳給這個函數的參數,而後會獲得 Future
對象,這裏調用其方法 done()
用於告訴咱們是否執行完任務,是,就返回 true
,沒有就返回 false
。多線程
在上述例子中,第一次調用 done()
時候,並無通過 5 秒,因此會獲得 false
;以後進行休眠 5 秒後,任務就會完成,再次調用 done()
就會獲得 true
的結果。若是是但願獲得任務的結果,能夠調用 future
的result
方法。
對 Future
對象的理解有助於理解和實現異步編程,所以很是建議好好看看官方文檔的介紹:
ProcessPoolExecutor
也是有類似的接口,使用方法也是相似的,代碼例子以下所示:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ProcessPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print("Result: " + future.result())
複製代碼
輸出結果:
False
False
Result: hello
複製代碼
一般,咱們會用多進程 ProcessPoolExecutor
來處理 CPU 密集型任務,多線程 ThreadPoolExecutor
則更適合處理網絡密集型 或者 I/O 任務。
儘管這兩個模塊的接口類似,但 ProcessPoolExecutor
採用的是 multiprocessing
模塊,而且不會被 GIL( Global Interpreter Lock) 所影響。不過對於這個模塊,咱們須要注意不能採用任何不能序列化的對象。
上述兩個模塊都有一個共同的方法--map()
。跟 Python 內建的 map
函數相似,該方法能夠實現對提供的一個函數進行屢次調用,而且經過給定一個可迭代的對象來將每一個參數都逐一傳給這個函數。另外,採用 map()
方法,提供的函數將是併發調用。
對於多進程,傳入的可迭代對象將分紅多塊的數據,每塊數據分配給每一個進程。分塊的數量能夠經過調整參數 chunk_size
,默認是 1.
下面是官方文檔給出的 ThreadPoolExecutor
的例子:
import concurrent.futures
import urllib.request
URLS = ['http://www.baidu.com/',
'http://www.163.com/',
'http://www.126.com/',
'http://www.jianshu.com/',
'http://news.sohu.com/']
# Retrieve a single page and report the url and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
複製代碼
輸出結果:
'http://www.baidu.com/' page is 153759 bytes
'http://www.163.com/' page is 693614 bytes
'http://news.sohu.com/' page is 175707 bytes
'http://www.126.com/' page is 10521 bytes
'http://www.jianshu.com/' generated an exception: HTTP Error 403: Forbidden
複製代碼
而對於 ProcessPoolExecutor
,代碼以下所示:
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
複製代碼
輸出結果:
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
複製代碼
concurrent.futures
模塊中有兩個函數用於處理進過 executors
返回的 futures
,分別是 as_completed()
和 wait()
。
as_completed()
函數會獲取 Future
對象,而且隨着任務開始處理而返回任務的結果,也就是須要執行的函數的返回結果。它和上述介紹的 map()
的主要區別是 map()
方法返回的結果是按照咱們傳入的可迭代對象中的順序返回的。而 as_completed()
返回的結果順序則是按照任務完成的順序,哪一個任務先完成,先返回結果。
下面給出一個例子:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
for x in as_completed(futures):
print(x.result())
複製代碼
輸出結果
Return of 3
Return of 4
Return of 0
Return of 2
Return of 1
複製代碼
wait()
函數返回一個包含兩個集合的帶有名字的 tuple,一個集合包含已經完成任務的結果(任務結果或者異常),另外一個包含的就是還未執行完畢的任務。
一樣,下面是一個例子:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
print(wait(futures))
複製代碼
輸出結果:
DoneAndNotDoneFutures(done={<Future at 0x2474aa4fba8 state=finished returned str>, <Future at 0x2474a903048 state=finished returned str>, <Future at 0x2474aa4fa58 state=finished returned str>, <Future at 0x2474aa4fcf8 state=finished returned str>, <Future at 0x2474a8beda0 state=finished returned str>}, not_done=set())
複製代碼
咱們能夠經過指定參數來控制 wait()
函數返回結果的時間,這個參數是 return_when
,可選數值有:FIRST_COMPLETED
, FIRST_EXCEPTION
和 ALL_COMPLETED
。默認結果是 ALL_COMPLETED
,也就是它會等待全部任務都執行完成才返回結果。
以上就是本次教程的全部內容,代碼已經上傳到:
歡迎關注個人微信公衆號--算法猿的成長,或者掃描下方的二維碼,你們一塊兒交流,學習和進步!