關於 python concurrent.futures

1、什麼是future?
        一、標準庫中這樣說,The Future class encapsulates the asynchronous execution of a callable. python

           流暢的python這樣說,期物封裝待完成的操做,能夠放入隊列,完成的狀態可查詢,獲得結果(或拋出異常)後能夠獲取結果(或異常)。
           安道將future翻譯爲期物仍是很形象的,安道翻譯的書質量仍是有保證的。git

        二、從原碼看Future的對象有5種狀態,封裝了一把鎖,一個狀態值,結果,異常,和回調函數。狀態分別經過cancelled,running,done,來查詢。
        經過set_result,和set_exception來設置結果和異常,並觸發回調函數,回調函數只有一個參數就是future自己,在回調函數中能夠獲取該future綁定操做的結果。
        經過result獲取最終結果,若是有異常就raise該異常。能夠經過exception獲取異常,並無raise該異常。
        concurrent.futures.ThreadPoolExecutor設置結果、異常的線程和獲取結果的線程再也不一個線程,這個時候self._condition這一把鎖就起了做用,並且這把鎖也是在future綁定的操做未完成以前,經過result()方法獲取結果時阻塞的緣由。
        三、concurrent.futures中,不過與其說future是封裝了一個操做,不如說是每個future綁定了一個操做。github

        注意:當在線程池內部發生異常的時候並不會直接raise該異常而是經過futures的set_exception()方法將異常暫時封裝到future中。當future封裝的操做完成的時候,經過其result()方法獲取結果是會raise在線程池內部發生的exception。json

2、ThreadPoolExecutor,在什麼地方建立的線程,如何控制的線程個數。
        一、ThreadPoolExecutor有一個任務隊列,一個保存線程對象的set.
        二、在init方法中能夠看出,線程池默認最大線程個數爲( cpu個數*5 )
            if max_workers is None:
                max_workers = (os.cpu_count() or 1) * 5
        三、在submit方法中將一個future對象,和一個操做綁定到一個_WorkItem任務中,在其run方法中會把fn操做的結果和異常放到對應的future中,每有一個fn對應一個future,submit返回這個future。因此能夠在線程池外經過futrue對於fn的狀態進行查詢,並獲取fn的結果或其異常。以後將_WorkItem的對象丟到任務隊列中。
        四、在submit中判斷線程數,若是線程數未達到最大線程數,就新建線程。新建的線程target爲_worker,_worker的任務就是取出任務queue中的_WorkItem而後run()。網絡

class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)
def _worker(executor_reference, work_queue):
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)
class ThreadPoolExecutor(_base.Executor):
    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
    submit.__doc__ = _base.Executor.submit.__doc__

    def _adjust_thread_count(self):
        # When the executor gets lost, the weakref callback will wake up
        # the worker threads.
        def weakref_cb(_, q=self._work_queue):
            q.put(None)
        # TODO(bquinlan): Should avoid creating new threads if there are more
        # idle threads than items in the work queue.
        if len(self._threads) < self._max_workers:
            t = threading.Thread(target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue))
            t.daemon = True
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue

 3、爲何Executor.map按可迭代的順序返回參數,而as_completed會先返回完成的future?async

        一、Executor實現了map方法,ThreadPoolExecutor繼承了它。
        二、Executor的map方法先調用submit方法,將對應操做fn,丟到線程池中,並得到一個future List,而後經過迭代這個列表依次獲取對應future中的結果。
        三、as_completed是先查詢全部fs(future)的狀態,而後返回已經完成的future,客戶端代碼會先獲取的已經完成的future,而後不斷檢查獲取已完成的future,而後返回,因此與提交任務的順序無關會先返回完成的任務。函數

        四、文章結尾代碼爲對應的map和as_completed效果對比。測試

class Executor(object):
    def map(self, fn, *iterables, timeout=None, chunksize=1):

        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                for future in fs:
                    if timeout is None:
                        yield future.result()
                    else:
                        yield future.result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

4、futures 未完成時爲何會在Executor.map,和as_completed方法阻塞?
    一、self._condition.wait(timeout)這把鎖是阻塞的緣由,調用result的客戶端代碼,和調用set_result、set_exception的線程池代碼不在一個線程中,只有在future對應的任務完成以後,線程池中的線程經過set_result、set_exception中的self._condition.notify_all(),從新喚醒wait的客戶端代碼線程,這個時候阻塞解除,獲取到對應的已完成的future。ui

class Future(object):
    def result(self, timeout=None):
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()

            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()
            else:
                raise TimeoutError()

附:測試代碼url

    

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import requests
from requests.exceptions import ConnectionError
from functools import partial
from os import cpu_count


def get_url(url):
    try:
        r = requests.get(url)
    except ConnectionError:
        raise ConnectionError('檢查網絡連接!')

    return url, r.status_code

URLS = [
    'https://my.oschina.net/u/2255341/blog',
    'https://github.com/timeline.json',
    'http://www.oschina.net/',
]


if __name__ == '__main__':
    # get_url('https://github.com/timeline.json')
    executor = ThreadPoolExecutor(max_workers=2)
    for res in executor.map(get_url, URLS):
        print(res)

    print('------------------------------------------------')
    for future in as_completed(map(partial(executor.submit, get_url), URLS)):
        res = future.result()
        print(res)

(py3env) ➜  concurrent_futures git:(master) python download.py 

('https://my.oschina.net/u/2255341/blog', 403)

('https://github.com/timeline.json', 410)

('http://www.oschina.net/', 403)

------------------------------------------------

('https://my.oschina.net/u/2255341/blog', 403)

('http://www.oschina.net/', 403)

('https://github.com/timeline.json', 410)

代碼地址:https://github.com/kagxin/recipe/blob/master/concurrent_futures/download.py

相關文章
相關標籤/搜索