[toc]python
In [1]: import asyncio In [2]: async def f(i): ...: await asyncio.sleep(i) ...: print(i) ...: In [3]: async def func(): ...: tasks = [] ...: for i in range(10): ...: await asyncio.sleep(0) ...: print('建立協程參數',i) ...: tasks.append(asyncio.create_task(f(i))) ...: _ = [await t for t in tasks] ...: In [4]: asyncio.run(func()) 建立協程參數 0 建立協程參數 1 0 建立協程參數 2 建立協程參數 3 建立協程參數 4 建立協程參數 5 建立協程參數 6 建立協程參數 7 建立協程參數 8 建立協程參數 9 1 2 3 4 5 6 7 8 9
源碼就從 asyncio.run()
看起:linux
# asyncio.runners.py def run(main, *, debug=False): """Run a coroutine. ... """ if events._get_running_loop() is not None: # 檢查當前是否已經有 loop 實例,有則報異常 raise RuntimeError( "asyncio.run() cannot be called from a running event loop") if not coroutines.iscoroutine(main): # 檢查 main 是不是協程對象, 不是則報異常 raise ValueError("a coroutine was expected, got {!r}".format(main)) loop = events.new_event_loop() # 建立事件循環實例 try: events.set_event_loop(loop) # 綁定事件循環實例 loop.set_debug(debug) # 設置 debug 模式 return loop.run_until_complete(main) # 開始在事件循環中執行main函數,直到main函數運行結束 finally: try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) finally: events.set_event_loop(None) loop.close()
上面這個過程就是建立 loop 而後在loop中執行函數。下面看一下 events.new_event_loop()
如何建立的事件循環實例:api
# asyncio.events.py def new_event_loop(): """Equivalent to calling get_event_loop_policy().new_event_loop().""" return get_event_loop_policy().new_event_loop() # 調用 new_event_loop() 方法返回一個 loop 實例 def get_event_loop_policy(): """Get the current event loop policy.""" if _event_loop_policy is None: _init_event_loop_policy() # 爲空就初始化 loop return _event_loop_policy # _event_loop_policy 是事件循環策略,全局變量,初始爲None def _init_event_loop_policy(): # 初始化全局變量 _event_loop_policy global _event_loop_policy with _lock: # 加鎖 if _event_loop_policy is None: # pragma: no branch from . import DefaultEventLoopPolicy _event_loop_policy = DefaultEventLoopPolicy() # 使用默認的DefaultEventLoopPolicy初始化並獲取實例
loop
實例就是由 DefaultEventLoopPolicy()
實例的 new_event_loop()
方法建立的。DefaultEventLoopPolicy
類在linux中和爲window中是不一樣的:app
我電腦是linux 那就看linux下DefaultEventLoopPo
-> DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
:async
# asyncio.unix_events.py class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): """UNIX event loop policy with a watcher for child processes.""" _loop_factory = _UnixSelectorEventLoop # loop 工廠 ... ... # asyncio.events.py class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): _loop_factory = None class _Local(threading.local): # 保存一個全局變量,只有當前線程可以訪問 _loop = None _set_called = False def __init__(self): self._local = self._Local() def get_event_loop(self): # 獲取當前線程的 loop ,沒有則建立 """Get the event loop. This may be None or an instance of EventLoop. """ if (self._local._loop is None and not self._local._set_called and isinstance(threading.current_thread(), threading._MainThread)): self.set_event_loop(self.new_event_loop()) if self._local._loop is None: raise RuntimeError('There is no current event loop in thread %r.' % threading.current_thread().name) return self._local._loop def set_event_loop(self, loop): # 設置當前線程的 loop 並標記已經建立 """Set the event loop.""" self._local._set_called = True assert loop is None or isinstance(loop, AbstractEventLoop) self._local._loop = loop def new_event_loop(self): # 建立 loop """Create a new event loop. You must call set_event_loop() to make this the current event loop. """ return self._loop_factory() # loop 由 _loop_factory 工廠建立 即 _UnixSelectorEventLoop
到此 loop 就是_UnixSelectorEventLoop
的實例。有了 loop 而後就是下一步run_until_complete
, run_until_complete
方法並不在 _UnixSelectorEventLoop
類中定義查看其父類:unix_events._UnixSelectorEventLoop
---> selector_events.BaseSelectorEventLoop
---> base_events.BaseEventLoop
--> events.AbstractEventLoop
在 BaseEventLoop
中找到該方法:ide
# asyncio.base_events.py class BaseEventLoop(events.AbstractEventLoop): ... ... def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. Return the Future's result, or raise its exception. """ self._check_closed() # 檢查當前 loop 循環是否關閉,已關報異常 new_task = not futures.isfuture(future) # 判斷傳進來的 future 是不是 future或task類型 future = tasks.ensure_future(future, loop=self) # 若是 future 是協程coroutine 那麼調用loop.create_task() 建立task 並將task 加入loop,而後返回task if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) # 添加 task 執行完時的回調函數, task 執行完 中止 loop try: self.run_forever() # 開始循環運行 loop 中的事件 except: if new_task and future.done() and not future.cancelled(): # 任務已經完成而且沒有被取消 # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() # 觸發任務執行中的異常 raise finally: future.remove_done_callback(_run_until_complete_cb) # 移除任務的回調函數 if not future.done(): # 任務沒有完成報錯 raise RuntimeError('Event loop stopped before Future completed.') return future.result() # 返回任務執行結果
接下來看 task 是如何建立的 tasks.ensure_future(future, loop=self)
:函數
# asyncio.tasks.py def ensure_future(coro_or_future, *, loop=None): """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ if coroutines.iscoroutine(coro_or_future): # 若是 coro_or_future 是協程函數 if loop is None: loop = events.get_event_loop() task = loop.create_task(coro_or_future) # 調用loop 的 create_task 方法 if task._source_traceback: del task._source_traceback[-1] return task elif futures.isfuture(coro_or_future): # 若是是 future 直接返回 if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('loop argument must agree with Future') return coro_or_future elif inspect.isawaitable(coro_or_future): # 若是是 可等待對象即 協程對象 使用 _wrap_awaitable 封裝一下再調用本函數 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) else: raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 'required') @coroutine def _wrap_awaitable(awaitable): # 被該函數包裝後變爲 協程函數 """Helper for asyncio.ensure_future(). Wraps awaitable (an object with __await__) into a coroutine that will later be wrapped in a Task by ensure_future(). """ return (yield from awaitable.__await__())
接下來看 loop.create_task(coro_or_future)
, task 是如何建立的,該方法在BaseEventLoop()
:oop
# asyncio.base_events def create_task(self, coro): """Schedule a coroutine object. Return a task object. """ self._check_closed() if self._task_factory is None: # 若是task 工廠爲空(task工廠能夠設置) task = tasks.Task(coro, loop=self) # 調用tasks.Task 來建立task if task._source_traceback: del task._source_traceback[-1] else: # 若是_task_factory不是空的話使用 這個工廠函數建立 task = self._task_factory(self, coro) return task
而後看一下 Task
類是如何初始化的:ui
class Task(futures._PyFuture): # Inherit Python Task implementation # 協程繼承自 Future # from a Python Future implementation. """A coroutine wrapped in a Future.""" _log_destroy_pending = True @classmethod def current_task(cls, loop=None): # 在事件循環中返回當前正在運行的任務,或者返回「無」。 warnings.warn("Task.current_task() is deprecated, " "use asyncio.current_task() instead", PendingDeprecationWarning, stacklevel=2) if loop is None: loop = events.get_event_loop() return current_task(loop) def __init__(self, coro, *, loop=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] if not coroutines.iscoroutine(coro): # raise after Future.__init__(), attrs are required for __del__ # prevent logging for pending task in __del__ self._log_destroy_pending = False raise TypeError(f"a coroutine was expected, got {coro!r}") self._must_cancel = False self._fut_waiter = None self._coro = coro # coro 就是協程函數 self._context = contextvars.copy_context() # contextvars 用來存儲管理上下文的,這裏的這個方法是拷貝當前上下文 self._loop.call_soon(self.__step, context=self._context) # 在下一次事件循環的迭代中調用 self.__step 方法 環境變量是 self._context _register_task(self) # 存儲記錄當前task def set_result(self, result): raise RuntimeError('Task does not support set_result operation') def set_exception(self, exception): raise RuntimeError('Task does not support set_exception operation') def get_stack(self, *, limit=None): """Return the list of stack frames for this task's coroutine. ... """ return base_tasks._task_get_stack(self, limit) def print_stack(self, *, limit=None, file=None): """Print the stack or traceback for this task's coroutine. ... """ return base_tasks._task_print_stack(self, limit, file) def cancel(self): """Request that this task cancel itself. ... """ self._log_traceback = False if self.done(): return False if self._fut_waiter is not None: if self._fut_waiter.cancel(): # Leave self._fut_waiter; it may be a Task that # catches and ignores the cancellation so we may have # to cancel it again later. return True # It must be the case that self.__step is already scheduled. self._must_cancel = True return True def __step(self, exc=None): if self.done(): # __step 已經運行完成報異常 raise futures.InvalidStateError( f'_step(): already done: {self!r}, {exc!r}') if self._must_cancel: # if not isinstance(exc, futures.CancelledError): exc = futures.CancelledError() self._must_cancel = False coro = self._coro # 協程函數的主體 self._fut_waiter = None _enter_task(self._loop, self) # 設置當前正在運行的 task # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # 若是沒有異常 # We use the `send` method directly, because coroutines # don't have `__iter__` and `__next__` methods. result = coro.send(None) # 執行協程函數的 send 方法,並返回 result else: result = coro.throw(exc) # 向 協程函數發送異常(協程函數可能會處理該異常, 而後觸發 Stopiteration) except StopIteration as exc: # 協程函數執行完成,如有返回結果存在於 exc.value 中 if self._must_cancel: # 若是task被取消了 # Task is cancelled right before coro stops. self._must_cancel = False super().set_exception(futures.CancelledError()) # 向 Future (task) 設置 CancelledError() 異常 else: super().set_result(exc.value) # 向 Future (task) 設置結果, 並設置狀態爲完成 except futures.CancelledError: super().cancel() # I.e., Future.cancel(self). except Exception as exc: # 下面都是向 Future 設置異常的 super().set_exception(exc) except BaseException as exc: super().set_exception(exc) raise # 觸發異常 else: blocking = getattr(result, '_asyncio_future_blocking', None) # result 是否具備 '_asyncio_future_blocking' 屬性,有而且不爲 false 就是一個 future??,看 asyncio.base_futures.isfuture 函數 if blocking is not None: # blocking 不是 None 多是 False 或 True # Yielded Future must come from Future.__iter__(). if futures._get_loop(result) is not self._loop: # task 的 loop 是否當前 loop,不是將錯誤傳遞給 協程函數 new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) # 將錯誤傳遞給 協程函數 elif blocking: # True, result 是一個 future 對象(此時 該future 已經加入下一次loop循環了?) if result is self: # 若是 result 是 當前 task,將異常傳遞給 協程函數來處理 new_exc = RuntimeError( f'Task cannot await on itself: {self!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) # 將異常傳遞給 協程函數來處理 else: result._asyncio_future_blocking = False # 將 result 的 _asyncio_future_blocking 設置爲 False result.add_done_callback( self.__wakeup, context=self._context) # 向 result 設置回調函數,上下文同本task self._fut_waiter = result # 設置 _fut_waiter, 若_fut_waiter 不是 None 那麼 當前 task 就沒結束 if self._must_cancel: # 當前是取消狀態那麼 result future 也要被取消 if self._fut_waiter.cancel(): self._must_cancel = False else: # 若 _asyncio_future_blocking 是 false 那麼說明 result future 不是 由 yield from 代理的? new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) # 將該異常交給 協程函數處理 elif result is None: # 協程函數.send 後沒有結果輸出,那麼將 .send 將在下一次loop循環中執行 # Bare yield relinquishes control for one event loop iteration. self._loop.call_soon(self.__step, context=self._context) elif inspect.isgenerator(result): # 若是返回的 result 是一 生成器 # Yielding a generator is just wrong. new_exc = RuntimeError( # 生成器應該由 yield from 代理 f'yield was used instead of yield from for ' f'generator in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: # 產生其餘結果都是錯誤的 # Yielding something else is an error. new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) finally: _leave_task(self._loop, self) # 切換loop中正在執行的 task self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): # 給 result future 添加的回調函數 try: future.result() # 獲取 result future 的結果 except Exception as exc: # This may also be a cancellation. self.__step(exc) # 出現異常交給 協程函數去處理 else: # Don't pass the value of `future.result()` explicitly, # as `Future.__iter__` and `Future.__await__` don't need it. # If we call `_step(value, None)` instead of `_step()`, # Python eval loop would use `.send(value)` method call, # instead of `__next__()`, which is slower for futures # that return non-generator iterators from their `__iter__`. self.__step() # 協程函數的result future 執行完了,繼續執行 本task 的 __step 即協程函數的 send() 方法 self = None # Needed to break cycles when an exception occurs.
⇑ps:_asyncio_future_blocking
標誌會在 future
的 __await__
中置爲 True,而 __await__
會被上面的 _wrap_awaitable
函數所代理 yield from
.this
# asyncio.future.Future() def __await__(self): if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() # May raise too.
接下來看一下 loop 如何循環執行其中的task:
# base_event.BaseEventLoop def call_soon(self, callback, *args, context=None): """Arrange for a callback to be called as soon as possible. This operates as a FIFO queue: callbacks are called in the order in which they are registered. Each callback will be called exactly once. Any positional arguments after the callback will be passed to the callback when it is called. """ self._check_closed() if self._debug: self._check_thread() self._check_callback(callback, 'call_soon') handle = self._call_soon(callback, args, context) # 調用 _call_soon if handle._source_traceback: del handle._source_traceback[-1] return handle # 返回 handle def _call_soon(self, callback, args, context): handle = events.Handle(callback, args, self, context) # 建立 handle if handle._source_traceback: del handle._source_traceback[-1] self._ready.append(handle) # 將 handle 加入 self._ready return handle ... def run_forever(self): """Run until stop() is called.""" self._check_closed() if self.is_running(): raise RuntimeError('This event loop is already running') if events._get_running_loop() is not None: raise RuntimeError( 'Cannot run the event loop while another loop is running') self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() # 線程id old_agen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) # 這兩行不知道幹啥的。。 try: events._set_running_loop(self) # 設置事件循環 while True: # 開始循環執行 self._run_once() # 執行一次循環 if self._stopping: break finally: self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks) ... def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules 'call_later' callbacks. """ sched_count = len(self._scheduled) # 定時任務數量 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and # 清理已經被取消的 定時handle? self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. when = self._scheduled[0]._when timeout = max(0, when - self.time()) if self._debug and timeout != 0: t0 = self.time() event_list = self._selector.select(timeout) dt = self.time() - t0 if dt >= 1.0: level = logging.INFO else: level = logging.DEBUG nevent = len(event_list) if timeout is None: logger.log(level, 'poll took %.3f ms: %s events', dt * 1e3, nevent) elif nevent: logger.log(level, 'poll %.3f ms took %.3f ms: %s events', timeout * 1e3, dt * 1e3, nevent) elif dt >= 1.0: logger.log(level, 'poll %.3f ms took %.3f ms: timeout', timeout * 1e3, dt * 1e3) else: event_list = self._selector.select(timeout) # linux 下是因此用 SelectSelector 選擇有事件的 fb self._process_events(event_list) # 將有讀事件或寫事件 就將事件 event 的 handle 加入 self._ready # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: # 定時任務的時間 大於當前事件 退出循環 break handle = heapq.heappop(self._scheduled) # pop 出定時任務的headle handle._scheduled = False # 標記已經執行 self._ready.append(handle) # 將這個到時間的 headle 加入 self._ready # This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. ntodo = len(self._ready) for i in range(ntodo): # 開始挨個處理 _ready 中的任務 handle = self._ready.popleft() if handle._cancelled: # 若是已經取消就下一個 continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() # 執行這個處理器對管理的代碼即task handle = None # Needed to break cycles when an exception occurs.
看完了源碼,開始一步步解析例子中的代碼是如何切換 協程與執行寫成的:await
+ 可等待對象
其實就是建立了一個 Task,而後加入到 loop。好比 asyncio.sleep
就是一個使用 async def
定義的函數, 建立 Task 的過程看了源碼咱們就知道了:由 async def
複合語句定義函數在建立Task時,都會被 yield from
代理, 因此
await asyncio.sleep(n)
能夠寫爲:
yield from asyncio.sleep(n).__await__() # .__await__() 可省略
可是 yield from
不能夠寫在 async def
中,那麼可使用協程函數(非協程對象),那麼上面的例子能夠寫成:
In [2]: @asyncio.coroutine ...: def f(i): ...: yield from asyncio.sleep(i).__await__() ...: print(i) ...: In [3]: @asyncio.coroutine ...: def func(): ...: tasks = [] ...: for i in range(10): ...: yield from asyncio.sleep(0) ...: print('建立協程參數',i) ...: tasks.append(asyncio.create_task(f(i))) ...: for t in tasks: ...: yield from t ...: In [4]: asyncio.run(func())
協程函數 和 協程對象(可等待對象)是不一樣的,在 ensure_future
函數中如果協程函數就會直接建立 Task 對象,如果協程對象則先被 _wrap_awaitable
封裝一下再建立 Task 對象。
await 轉換成 yield from 更容易分析,開始一步一步解析例子:
loop=[]
:建立 func() Task記爲 m
, 並執行__step
--> send(None)
返回第一個 result
;loop=[m]
:第一個 result
是由 asyncio.sleep(0)
建立的 延時 Task(Future類型)記爲--> sm
,而後把它的 __step
加入到 loop ,在下次循環時被執行, sleep 0秒;loop=[sm]
:sm
執行完,接着上次掛起的位置往下執行,打印 建立協程參數 0
,而後建立 task(f(0), 此時這個 task.__step
已經直接加入到loop)記爲---> f1
。接着下一個 for 循環,返回第二個 result,sleep future,--> sm
。loop=[f1, sm]
: 執行 f1, 遇到 sleep --> sf1
,。切換協程 sm
,接着上次掛起的位置往下執行,打印建立協程參數1
,建立 協程 --> f2
,下一個for循環 sleep --> sm
。loop=[sf1, f2, sm]
: sf1
, 接着 f(0) 函數上次掛起的地方執行,打印 0
,切換 f2
,sleep --> sf2
掛起。切換協程 sm
,接着上次掛起的位置往下執行,打印建立協程參數2
,建立 協程 --> f3
,下一個for循環 sleep --> sm。loop=[sf2, f3, sm]
:...