看 asyncio 源代碼



In [1]: import asyncio

In [2]: async def f(i):
   ...:      await asyncio.sleep(i)
   ...:      print(i)

In [3]: async def func():
   ...:      tasks = []
   ...:      for i in range(10):
   ...:          await asyncio.sleep(0)
   ...:          print('建立協程參數',i)
   ...:          tasks.append(asyncio.create_task(f(i)))
   ...:     _ = [await t for t in tasks]

In [4]: asyncio.run(func())
建立協程參數 0
建立協程參數 1
建立協程參數 2
建立協程參數 3
建立協程參數 4
建立協程參數 5
建立協程參數 6
建立協程參數 7
建立協程參數 8
建立協程參數 9


源碼就從 asyncio.run() 看起:linux

# asyncio.runners.py
def run(main, *, debug=False):
    """Run a coroutine.
    if events._get_running_loop() is not None:         # 檢查當前是否已經有 loop 實例,有則報異常
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):                # 檢查 main 是不是協程對象, 不是則報異常
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()                     # 建立事件循環實例
        events.set_event_loop(loop)                        # 綁定事件循環實例
        loop.set_debug(debug)                                # 設置 debug 模式
        return loop.run_until_complete(main)           # 開始在事件循環中執行main函數,直到main函數運行結束

上面這個過程就是建立 loop 而後在loop中執行函數。下面看一下 events.new_event_loop() 如何建立的事件循環實例:api

# asyncio.events.py
def new_event_loop():
    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
    return get_event_loop_policy().new_event_loop()  # 調用 new_event_loop()  方法返回一個 loop 實例

def get_event_loop_policy():
    """Get the current event loop policy."""
    if _event_loop_policy is None:      
        _init_event_loop_policy()   # 爲空就初始化 loop 
    return _event_loop_policy   #  _event_loop_policy 是事件循環策略,全局變量,初始爲None

def _init_event_loop_policy():   # 初始化全局變量 _event_loop_policy
    global _event_loop_policy    
    with _lock:             # 加鎖
        if _event_loop_policy is None: # pragma: no branch
            from . import DefaultEventLoopPolicy
            _event_loop_policy = DefaultEventLoopPolicy()   # 使用默認的DefaultEventLoopPolicy初始化並獲取實例

loop 實例就是由 DefaultEventLoopPolicy() 實例的 new_event_loop()方法建立的。
DefaultEventLoopPolicy 類在linux中和爲window中是不一樣的:app

我電腦是linux 那就看linux下DefaultEventLoopPo -> DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicyasync

# asyncio.unix_events.py
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
    """UNIX event loop policy with a watcher for child processes."""
    _loop_factory = _UnixSelectorEventLoop      # loop 工廠


# asyncio.events.py
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
   _loop_factory = None

    class _Local(threading.local):    # 保存一個全局變量,只有當前線程可以訪問
        _loop = None
        _set_called = False

    def __init__(self):
        self._local = self._Local()

    def get_event_loop(self):   # 獲取當前線程的 loop ,沒有則建立
        """Get the event loop.

        This may be None or an instance of EventLoop.
        if (self._local._loop is None and
                not self._local._set_called and
                isinstance(threading.current_thread(), threading._MainThread)):

        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)

        return self._local._loop

    def set_event_loop(self, loop):            # 設置當前線程的 loop 並標記已經建立
        """Set the event loop."""
        self._local._set_called = True
        assert loop is None or isinstance(loop, AbstractEventLoop)
        self._local._loop = loop

    def new_event_loop(self):                # 建立 loop
        """Create a new event loop.

        You must call set_event_loop() to make this the current event
        return self._loop_factory()    # loop 由 _loop_factory 工廠建立 即 _UnixSelectorEventLoop

到此 loop 就是_UnixSelectorEventLoop的實例。有了 loop 而後就是下一步run_until_completerun_until_complete 方法並不在 _UnixSelectorEventLoop類中定義查看其父類:
unix_events._UnixSelectorEventLoop ---> selector_events.BaseSelectorEventLoop ---> base_events.BaseEventLoop --> events.AbstractEventLoop
BaseEventLoop 中找到該方法:ide

# asyncio.base_events.py
class BaseEventLoop(events.AbstractEventLoop):
    def run_until_complete(self, future):
        """Run until the Future is done.

        If the argument is a coroutine, it is wrapped in a Task.

        WARNING: It would be disastrous to call run_until_complete()
        with the same coroutine twice -- it would wrap it in two
        different Tasks and that can't be good.

        Return the Future's result, or raise its exception.
        self._check_closed()                    # 檢查當前 loop 循環是否關閉,已關報異常

        new_task = not futures.isfuture(future) # 判斷傳進來的 future 是不是 future或task類型
        future = tasks.ensure_future(future, loop=self)     
        # 若是 future 是協程coroutine 那麼調用loop.create_task() 建立task 並將task 加入loop,而後返回task
        if new_task:     
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False

        future.add_done_callback(_run_until_complete_cb)  # 添加 task 執行完時的回調函數, task 執行完 中止 loop
            self.run_forever()       # 開始循環運行 loop 中的事件
            if new_task and future.done() and not future.cancelled():   # 任務已經完成而且沒有被取消
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()                    # 觸發任務執行中的異常
            future.remove_done_callback(_run_until_complete_cb)  # 移除任務的回調函數
        if not future.done():                                        # 任務沒有完成報錯
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()  # 返回任務執行結果

接下來看 task 是如何建立的 tasks.ensure_future(future, loop=self)函數

# asyncio.tasks.py
def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    if coroutines.iscoroutine(coro_or_future):      # 若是 coro_or_future 是協程函數
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)      # 調用loop 的 create_task 方法
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif futures.isfuture(coro_or_future):  # 若是是 future 直接返回
        if loop is not None and loop is not futures._get_loop(coro_or_future):
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif inspect.isawaitable(coro_or_future): # 若是是 可等待對象即 協程對象 使用 _wrap_awaitable 封裝一下再調用本函數
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

def _wrap_awaitable(awaitable):         # 被該函數包裝後變爲 協程函數
    """Helper for asyncio.ensure_future().

    Wraps awaitable (an object with __await__) into a coroutine
    that will later be wrapped in a Task by ensure_future().
    return (yield from awaitable.__await__())

接下來看 loop.create_task(coro_or_future), task 是如何建立的,該方法在BaseEventLoop()oop

# asyncio.base_events
def create_task(self, coro):
    """Schedule a coroutine object.

    Return a task object.
    if self._task_factory is None:  # 若是task 工廠爲空(task工廠能夠設置)
        task = tasks.Task(coro, loop=self)  # 調用tasks.Task 來建立task
        if task._source_traceback:
            del task._source_traceback[-1] 
    else:   # 若是_task_factory不是空的話使用 這個工廠函數建立
        task = self._task_factory(self, coro)
    return task

而後看一下 Task 類是如何初始化的:ui

class Task(futures._PyFuture): # Inherit Python Task implementation   # 協程繼承自 Future
                                # from a Python Future implementation.
    """A coroutine wrapped in a Future.""" 
    _log_destroy_pending = True

    def current_task(cls, loop=None):     # 在事件循環中返回當前正在運行的任務,或者返回「無」。
        warnings.warn("Task.current_task() is deprecated, "
                      "use asyncio.current_task() instead",
        if loop is None:
            loop = events.get_event_loop()
        return current_task(loop)

    def __init__(self, coro, *, loop=None):
        if self._source_traceback:
            del self._source_traceback[-1]
        if not coroutines.iscoroutine(coro):
            # raise after Future.__init__(), attrs are required for __del__
            # prevent logging for pending task in __del__
            self._log_destroy_pending = False
            raise TypeError(f"a coroutine was expected, got {coro!r}")

        self._must_cancel = False
        self._fut_waiter = None
        self._coro = coro     # coro 就是協程函數
        self._context = contextvars.copy_context()    # contextvars 用來存儲管理上下文的,這裏的這個方法是拷貝當前上下文

        self._loop.call_soon(self.__step, context=self._context) # 在下一次事件循環的迭代中調用 self.__step 方法 環境變量是 self._context
        _register_task(self)          # 存儲記錄當前task

    def set_result(self, result):
        raise RuntimeError('Task does not support set_result operation')

    def set_exception(self, exception):
        raise RuntimeError('Task does not support set_exception operation')

    def get_stack(self, *, limit=None):
        """Return the list of stack frames for this task's coroutine.
        return base_tasks._task_get_stack(self, limit)

    def print_stack(self, *, limit=None, file=None):
        """Print the stack or traceback for this task's coroutine.
        return base_tasks._task_print_stack(self, limit, file)

    def cancel(self):
        """Request that this task cancel itself.
        self._log_traceback = False
        if self.done():
            return False
        if self._fut_waiter is not None:
            if self._fut_waiter.cancel():
                # Leave self._fut_waiter; it may be a Task that
                # catches and ignores the cancellation so we may have
                # to cancel it again later.
                return True
        # It must be the case that self.__step is already scheduled.
        self._must_cancel = True
        return True

    def __step(self, exc=None):
        if self.done():   # __step 已經運行完成報異常
            raise futures.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:   # 
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        coro = self._coro        # 協程函數的主體
        self._fut_waiter = None

        _enter_task(self._loop, self)    # 設置當前正在運行的 task
        # Call either coro.throw(exc) or coro.send(None).
            if exc is None:     # 若是沒有異常
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)     # 執行協程函數的 send 方法,並返回 result
                result = coro.throw(exc)      # 向 協程函數發送異常(協程函數可能會處理該異常, 而後觸發 Stopiteration)
        except StopIteration as exc:   # 協程函數執行完成,如有返回結果存在於 exc.value 中
            if self._must_cancel:   # 若是task被取消了
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                super().set_exception(futures.CancelledError())   # 向 Future (task) 設置 CancelledError() 異常
                super().set_result(exc.value)   # 向 Future (task) 設置結果, 並設置狀態爲完成
        except futures.CancelledError:
            super().cancel() # I.e., Future.cancel(self).
        except Exception as exc:        # 下面都是向 Future 設置異常的
        except BaseException as exc:
            raise                                # 觸發異常        
            blocking = getattr(result, '_asyncio_future_blocking', None) 
             # result 是否具備 '_asyncio_future_blocking' 屬性,有而且不爲 false 就是一個 future??,看 asyncio.base_futures.isfuture 函數
            if blocking is not None:  # blocking 不是 None 多是 False 或 True
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:  # task 的 loop 是否當前 loop,不是將錯誤傳遞給 協程函數
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                        self.__step, new_exc, context=self._context)  # 將錯誤傳遞給 協程函數
                elif blocking:    # True,  result 是一個 future 對象(此時 該future 已經加入下一次loop循環了?)
                    if result is self:   # 若是 result 是 當前 task,將異常傳遞給 協程函數來處理
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                            self.__step, new_exc, context=self._context)    # 將異常傳遞給 協程函數來處理
                        result._asyncio_future_blocking = False # 將 result 的 _asyncio_future_blocking 設置爲 False
                            self.__wakeup, context=self._context)  # 向 result 設置回調函數,上下文同本task
                        self._fut_waiter = result   # 設置 _fut_waiter, 若_fut_waiter 不是 None 那麼 當前 task 就沒結束
                        if self._must_cancel:    # 當前是取消狀態那麼 result future 也要被取消
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
                else:  # 若 _asyncio_future_blocking  是 false 那麼說明 result future 不是 由 yield from 代理的?
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                        self.__step, new_exc, context=self._context) # 將該異常交給 協程函數處理

            elif result is None:   # 協程函數.send 後沒有結果輸出,那麼將 .send 將在下一次loop循環中執行
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)
            elif inspect.isgenerator(result):   # 若是返回的 result 是一 生成器
                # Yielding a generator is just wrong. 
                new_exc = RuntimeError(                     # 生成器應該由 yield from 代理
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                    self.__step, new_exc, context=self._context)
            else:                                  # 產生其餘結果都是錯誤的
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                    self.__step, new_exc, context=self._context)
            _leave_task(self._loop, self)            # 切換loop中正在執行的 task
            self = None # Needed to break cycles when an exception occurs.

    def __wakeup(self, future): # 給 result future 添加的回調函數
            future.result()                 # 獲取 result future 的結果
        except Exception as exc:
            # This may also be a cancellation.
            self.__step(exc)              # 出現異常交給 協程函數去處理
            # Don't pass the value of `future.result()` explicitly,
            # as `Future.__iter__` and `Future.__await__` don't need it.
            # If we call `_step(value, None)` instead of `_step()`,
            # Python eval loop would use `.send(value)` method call,
            # instead of `__next__()`, which is slower for futures
            # that return non-generator iterators from their `__iter__`.
            self.__step()     # 協程函數的result future 執行完了,繼續執行 本task 的 __step 即協程函數的 send() 方法
        self = None # Needed to break cycles when an exception occurs.

⇑ps:_asyncio_future_blocking 標誌會在 future__await__ 中置爲 True,而 __await__ 會被上面的 _wrap_awaitable 函數所代理 yield from.this

# asyncio.future.Future()
    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result() # May raise too.

接下來看一下 loop 如何循環執行其中的task:

# base_event.BaseEventLoop
    def call_soon(self, callback, *args, context=None):
        """Arrange for a callback to be called as soon as possible.

        This operates as a FIFO queue: callbacks are called in the
        order in which they are registered. Each callback will be
        called exactly once.

        Any positional arguments after the callback will be passed to
        the callback when it is called.
        if self._debug:
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args, context)  # 調用 _call_soon
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle   # 返回 handle
    def _call_soon(self, callback, args, context):
        handle = events.Handle(callback, args, self, context)    # 建立 handle 
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)                     # 將 handle 加入 self._ready
        return handle
    def run_forever(self):
        """Run until stop() is called."""
        if self.is_running():
            raise RuntimeError('This event loop is already running')
        if events._get_running_loop() is not None:
            raise RuntimeError(
                'Cannot run the event loop while another loop is running')
        self._thread_id = threading.get_ident()    # 線程id

        old_agen_hooks = sys.get_asyncgen_hooks()
                               finalizer=self._asyncgen_finalizer_hook)   # 這兩行不知道幹啥的。。
            events._set_running_loop(self)    # 設置事件循環
            while True:            # 開始循環執行
                self._run_once()        # 執行一次循環
                if self._stopping:
            self._stopping = False
            self._thread_id = None
    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.

        sched_count = len(self._scheduled)  # 定時任務數量
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and        # 清理已經被取消的 定時handle?
            self._timer_cancelled_count / sched_count >
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False

            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = max(0, when - self.time())

        if self._debug and timeout != 0:
            t0 = self.time()
            event_list = self._selector.select(timeout)
            dt = self.time() - t0
            if dt >= 1.0:
                level = logging.INFO
                level = logging.DEBUG
            nevent = len(event_list)
            if timeout is None:
                logger.log(level, 'poll took %.3f ms: %s events',
                           dt * 1e3, nevent)
            elif nevent:
                           'poll %.3f ms took %.3f ms: %s events',
                           timeout * 1e3, dt * 1e3, nevent)
            elif dt >= 1.0:
                           'poll %.3f ms took %.3f ms: timeout',
                           timeout * 1e3, dt * 1e3)
            event_list = self._selector.select(timeout)     # linux 下是因此用 SelectSelector 選擇有事件的 fb
        self._process_events(event_list)           # 將有讀事件或寫事件 就將事件 event 的 handle 加入 self._ready

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:        # 定時任務的時間 大於當前事件 退出循環
            handle = heapq.heappop(self._scheduled)   # pop 出定時任務的headle
            handle._scheduled = False     # 標記已經執行
            self._ready.append(handle)    # 將這個到時間的 headle 加入 self._ready

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)
        for i in range(ntodo):                   # 開始挨個處理 _ready 中的任務
            handle = self._ready.popleft()
            if handle._cancelled:            # 若是已經取消就下一個
            if self._debug:
                    self._current_handle = handle
                    t0 = self.time()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                    self._current_handle = None
                handle._run()      # 執行這個處理器對管理的代碼即task
        handle = None # Needed to break cycles when an exception occurs.


看完了源碼,開始一步步解析例子中的代碼是如何切換 協程與執行寫成的:
await + 可等待對象 其實就是建立了一個 Task,而後加入到 loop。好比 asyncio.sleep 就是一個使用 async def 定義的函數, 建立 Task 的過程看了源碼咱們就知道了:由 async def 複合語句定義函數在建立Task時,都會被 yield from 代理, 因此

await asyncio.sleep(n)


yield from asyncio.sleep(n).__await__() # .__await__()  可省略

可是 yield from 不能夠寫在 async def 中,那麼可使用協程函數(非協程對象),那麼上面的例子能夠寫成:

In [2]: @asyncio.coroutine 
   ...: def f(i):
   ...:      yield from asyncio.sleep(i).__await__()
   ...:      print(i)

In [3]: @asyncio.coroutine
   ...: def func():
   ...:      tasks = []
   ...:      for i in range(10):
   ...:          yield from asyncio.sleep(0)
   ...:          print('建立協程參數',i)
   ...:          tasks.append(asyncio.create_task(f(i)))
   ...:      for t in tasks:
   ...:         yield from t
In [4]: asyncio.run(func())

協程函數 和 協程對象(可等待對象)是不一樣的,在 ensure_future 函數中如果協程函數就會直接建立 Task 對象,如果協程對象則先被 _wrap_awaitable 封裝一下再建立 Task 對象。
await 轉換成 yield from 更容易分析,開始一步一步解析例子:

  • 建立 loop,第一個loop循環 loop=[]:建立 func() Task記爲 m, 並執行__step --> send(None) 返回第一個 result
  • 第二個loop循環 loop=[m]:第一個 result 是由 asyncio.sleep(0) 建立的 延時 Task(Future類型)記爲--> sm,而後把它的 __step 加入到 loop ,在下次循環時被執行, sleep 0秒;
  • loop 開始下一個循環 loop=[sm]sm 執行完,接着上次掛起的位置往下執行,打印 建立協程參數 0,而後建立 task(f(0), 此時這個 task.__step 已經直接加入到loop)記爲---> f1。接着下一個 for 循環,返回第二個 result,sleep future,--> sm
  • loop 開始下一個循環 loop=[f1, sm]: 執行 f1, 遇到 sleep --> sf1,。切換協程 sm,接着上次掛起的位置往下執行,打印建立協程參數1,建立 協程 --> f2,下一個for循環 sleep --> sm
  • loop 開始下一個循環 loop=[sf1, f2, sm]: sf1, 接着 f(0) 函數上次掛起的地方執行,打印 0,切換 f2,sleep --> sf2掛起。切換協程 sm,接着上次掛起的位置往下執行,打印建立協程參數2,建立 協程 --> f3,下一個for循環 sleep --> sm。
  • loop開始下一個循環 loop=[sf2, f3, sm] :...