python的併發模塊concurrent

 

Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持,他屬於上層的封裝,對於用戶來講,不用在考慮那麼多東西了。python

官方參考資料:https://pythonhosted.org/futures/git

1.Executor

Exectuor是基礎模塊,這是一個抽象類,其子類分爲ThreadPoolExecutor和ProcessPoolExecutor,分別被用來建立線程池和進程池。github

提供的方法以下:app

Executor.submit(fn, *args, **kwargs)

fn:爲須要異步執行的函數
args,kwargs:爲給函數傳遞的參數
就來看看官網的這個例子:異步

?
1
2
3
with ThreadPoolExecutor(max_workers = 1 ) as executor:
     future = executor.submit( pow , 323 , 1235 )
     print (future.result())

  

咱們使用submit方法來往線程池中加入一個task(pow函數),submit返回一個Future對象。其中future.result()的result方法的做用是拿到調用返回的結果。若是沒有執行完畢就會去等待。這裏咱們使用with操做符,使得當任務執行完成以後,自動執行shutdown函數,而無需編寫相關釋放代碼。
關於更多future的具體方法說明看後面的future部分解釋。函數

Executor.map(fn, *args, **kwargs)

map(func, *iterables, timeout=None)
此map函數和python自帶的map函數功能相似,只不過concurrent模塊的map函數從迭代器得到參數後異步執行。而且,每個異步操做,能用timeout參數來設置超時時間,timeout的值能夠是int或float型,若是操做timeout的話,會raisesTimeoutError。若是timeout參數不指定的話,則不設置超時間。 post

func:爲須要異步執行的函數
iterables:能夠是一個能迭代的對象.
timeout:設置每次異步操做的超時時間測試

?
1
2
3
4
5
6
7
8
9
from concurrent.futures import ThreadPoolExecutor
import requests
URLS = [ 'http://www.163.com' , 'https://www.baidu.com/' , 'https://github.com/' ]
def load_url(url):
         req = requests.get(url, timeout = 60 )
         print ( '%r page is %d bytes' % (url, len (req.content)))
executor = ThreadPoolExecutor(max_workers = 3 )
executor. map (load_url,URLS)
print ( '主線程結束' )

  

submit函數和map函數,根據須要,選一個使用便可。url

Executor.shutdown(wait=True)

此函數用於釋放異步執行操做後的系統資源。Executor實現了enter__和__exit使得其對象能夠使用with操做符。
在這裏能夠使用with上下文關鍵字代替,如上面第一個submit的例子。spa

2.Future對象

submit函數返回future對象,future提供了跟蹤任務執行狀態的方法,Future實例能夠被Executor.submit()方法建立。除了測試以外不該該直接建立。

cancel():嘗試去取消調用。若是調用當前正在執行,不能被取消。這個方法將返回False,不然調用將會被取消,方法將返回True

cancelled():若是調用被成功取消返回True

running():若是當前正在被執行不能被取消返回True

done():若是調用被成功取消或者完成running返回True

result(Timeout = None):拿到調用返回的結果。若是沒有執行完畢就會去等待

exception(timeout=None):捕獲程序執行過程當中的異常

add_done_callback(fn):將fn綁定到future對象上。當future對象被取消或完成運行時,fn函數將會被調用

3.wait方法

 wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另一個是uncompleted(未完成的)。使用wait方法的一個優點就是得到更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默認設置爲ALL_COMPLETED。

  若是採用默認的ALL_COMPLETED,程序會阻塞直到線程池裏面的全部任務都完成,再執行主線程:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python 
# encoding: utf-8 
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
URLS = [ 'http://www.163.com' , 'https://www.baidu.com/' , 'https://github.com/' ]
def load_url(url):
     req = requests.get(url, timeout = 60 )
     print ( '%r page is %d bytes' % (url, len (req.content)))
executor = ThreadPoolExecutor(max_workers = 3 )
f_list = []
for url in URLS:
     future = executor.submit(load_url,url)
     f_list.append(future)
print (wait(f_list))
print ( '主線程結束' )

  

若是採用FIRST_COMPLETED參數,程序並不會等到線程池裏面全部的任務都完成。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
URLS = [ 'http://www.163.com' , 'https://www.baidu.com/' , 'https://github.com/' ]
def load_url(url):
     req = requests.get(url, timeout = 60 )
     print ( '%r page is %d bytes' % (url, len (req.content)))
executor = ThreadPoolExecutor(max_workers = 3 )
f_list = []
for url in URLS:
     future = executor.submit(load_url,url)
     f_list.append(future)
print (wait(f_list,return_when = 'FIRST_COMPLETED' ))
print ( '主線程結束' )

  

關於模塊的基本使用就是上面的這些。後續會作一些拓展或者案例。

相關文章
相關標籤/搜索