1、什麼是future?
一、標準庫中這樣說,The Future class encapsulates the asynchronous execution of a callable. python
流暢的python這樣說,期物封裝待完成的操做,能夠放入隊列,完成的狀態可查詢,獲得結果(或拋出異常)後能夠獲取結果(或異常)。
安道將future翻譯爲期物仍是很形象的,安道翻譯的書質量仍是有保證的。git
二、從原碼看Future的對象有5種狀態,封裝了一把鎖,一個狀態值,結果,異常,和回調函數。狀態分別經過cancelled,running,done,來查詢。
經過set_result,和set_exception來設置結果和異常,並觸發回調函數,回調函數只有一個參數就是future自己,在回調函數中能夠獲取該future綁定操做的結果。
經過result獲取最終結果,若是有異常就raise該異常。能夠經過exception獲取異常,並無raise該異常。
concurrent.futures.ThreadPoolExecutor設置結果、異常的線程和獲取結果的線程再也不一個線程,這個時候self._condition這一把鎖就起了做用,並且這把鎖也是在future綁定的操做未完成以前,經過result()方法獲取結果時阻塞的緣由。
三、concurrent.futures中,不過與其說future是封裝了一個操做,不如說是每個future綁定了一個操做。github
注意:當在線程池內部發生異常的時候並不會直接raise該異常而是經過futures的set_exception()方法將異常暫時封裝到future中。當future封裝的操做完成的時候,經過其result()方法獲取結果是會raise在線程池內部發生的exception。json
2、ThreadPoolExecutor,在什麼地方建立的線程,如何控制的線程個數。
一、ThreadPoolExecutor有一個任務隊列,一個保存線程對象的set.
二、在init方法中能夠看出,線程池默認最大線程個數爲( cpu個數*5 )
if max_workers is None:
max_workers = (os.cpu_count() or 1) * 5
三、在submit方法中將一個future對象,和一個操做綁定到一個_WorkItem任務中,在其run方法中會把fn操做的結果和異常放到對應的future中,每有一個fn對應一個future,submit返回這個future。因此能夠在線程池外經過futrue對於fn的狀態進行查詢,並獲取fn的結果或其異常。以後將_WorkItem的對象丟到任務隊列中。
四、在submit中判斷線程數,若是線程數未達到最大線程數,就新建線程。新建的線程target爲_worker,_worker的任務就是取出任務queue中的_WorkItem而後run()。網絡
class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: self.future.set_exception(e) else: self.future.set_result(result) def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue
3、爲何Executor.map按可迭代的順序返回參數,而as_completed會先返回完成的future?async
一、Executor實現了map方法,ThreadPoolExecutor繼承了它。
二、Executor的map方法先調用submit方法,將對應操做fn,丟到線程池中,並得到一個future List,而後經過迭代這個列表依次獲取對應future中的結果。
三、as_completed是先查詢全部fs(future)的狀態,而後返回已經完成的future,客戶端代碼會先獲取的已經完成的future,而後不斷檢查獲取已完成的future,而後返回,因此與提交任務的順序無關會先返回完成的任務。函數
四、文章結尾代碼爲對應的map和as_completed效果對比。測試
class Executor(object): def map(self, fn, *iterables, timeout=None, chunksize=1): if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in zip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): try: for future in fs: if timeout is None: yield future.result() else: yield future.result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator()
4、futures 未完成時爲何會在Executor.map,和as_completed方法阻塞?
一、self._condition.wait(timeout)這把鎖是阻塞的緣由,調用result的客戶端代碼,和調用set_result、set_exception的線程池代碼不在一個線程中,只有在future對應的任務完成以後,線程池中的線程經過set_result、set_exception中的self._condition.notify_all(),從新喚醒wait的客戶端代碼線程,這個時候阻塞解除,獲取到對應的已完成的future。ui
class Future(object): def result(self, timeout=None): with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self.__get_result() self._condition.wait(timeout) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self.__get_result() else: raise TimeoutError()
附:測試代碼url
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed import requests from requests.exceptions import ConnectionError from functools import partial from os import cpu_count def get_url(url): try: r = requests.get(url) except ConnectionError: raise ConnectionError('檢查網絡連接!') return url, r.status_code URLS = [ 'https://my.oschina.net/u/2255341/blog', 'https://github.com/timeline.json', 'http://www.oschina.net/', ] if __name__ == '__main__': # get_url('https://github.com/timeline.json') executor = ThreadPoolExecutor(max_workers=2) for res in executor.map(get_url, URLS): print(res) print('------------------------------------------------') for future in as_completed(map(partial(executor.submit, get_url), URLS)): res = future.result() print(res)
(py3env) ➜ concurrent_futures git:(master) ✗ python download.py
('https://my.oschina.net/u/2255341/blog', 403)
('https://github.com/timeline.json', 410)
('http://www.oschina.net/', 403)
------------------------------------------------
('https://my.oschina.net/u/2255341/blog', 403)
('http://www.oschina.net/', 403)
('https://github.com/timeline.json', 410)
代碼地址:https://github.com/kagxin/recipe/blob/master/concurrent_futures/download.py