Asyncio 源碼分析


python 中的協程

從我我的的理解來講一說 python中的協程,咱們知道 linux 中的線程比進程輕量級,切換成本低,協程比線程更輕量級。因此切換成本耕地,是基於生成器來實現的也就是 yield 語句,後來又有 yeild from 子協程的語法出現,生成器是迭代器,迭代器不是生成器,生成器可以輸出值,也能夠接收值,能夠 hang/resume。固然在 python3.5 使用了新的語法 async/await, 本質沒啥變化,僅僅是防止在語法上的混淆。能夠進行隱式切換或者顯式切換,在一個線程中實現多協程切換,asyncio 就是顯式的來切換協程。python


Asyncio 異步框架

asyncio 框架是創建在 epoll、poll、seledct等功能基之上的,下文統一用 epoll 代替,固然使用那種事件機制取決於操做系統,在使用asyncio時,大部分操做是用asyncio運行任務,運行任務時 asyncio 並無使用epoll 機制,由於咱們知道 epoll 是須要註冊文件描述符的,是在使用協程,至於協程和 epoll 怎麼結合運行的,下文會細說。epoll 是用來實現異步 web 框架用的。協程使用來運行用戶的 task。linux


Asyncio 的運行流程

簡單寫一個異步任務,這個任務簡單點,由於本篇文章主要講的是 asyncio 的運行機制而不是 asyncio 的使用git

import asyncio
async def print_hello_after3second():
    await asyncio.sleep(3)
    print("hello")
asyncio.run(print_hello_after3second)

這裏使用的 run 這個接口,使用 asyncio 運行異步任務有不少種方式,run 我以爲更像是一個命令行,從外面看接口簡單,其實內部幫忙作了不少事情.爲了節省篇幅以及使得文章看起來清晰每一個代碼片斷只截取重要部分,其他的省略。github

asyncio/runners.pygolang

#run 第一個參數要是個協程
def run(main, *, debug=False):
    # loop 理解成 epoll 就好
    events.set_event_loop(loop)
    #重點在這裏
    loop.run_until_complete(loop.shutdown_asyncgens())
    ....

asyncio/base_events.pyweb

def run_until_complete(self, future):
        ....
        # asyncio 會把咱們傳進來的任務封裝成 task,也能夠說是 future,task 是 future 的子類
        future = tasks.ensure_future(future, loop=self
        # 裏面有 _run_once 是用來調度事件循環的
        self.run_forever()
        ....

asyncio/task.py編程

# ensure_future 也是一個傳遞任務的接口
def ensure_future(coro_or_future, *, loop=None):
        ....
        # 在調用 Task 類中的__init__方法進行初始化,同時將 Task 類中的 _step方法做爲回掉函數註冊到了事件循環中
        task = loop.create_task(coro_or_future)
        ....

asyncio/base_events.pyapi

#這個方法很重要因此在這裏所有列出,裏面包含了asyncio的調用思想,調度 task 和 epoll
    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """
        print("run once")
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []

            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

        if self._debug and timeout != 0:
            t0 = self.time()
            # 這裏是在檢查 epoll 事件
            event_list = self._selector.select(timeout)
            dt = self.time() - t0
            if dt >= 1.0:
                level = logging.INFO
            else:
                level = logging.DEBUG
            nevent = len(event_list)
            if timeout is None:
                logger.log(level, 'poll took %.3f ms: %s events',
                           dt * 1e3, nevent)
            elif nevent:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: %s events',
                           timeout * 1e3, dt * 1e3, nevent)
            elif dt >= 1.0:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: timeout',
                           timeout * 1e3, dt * 1e3)
        else:
            event_list = self._selector.select(timeout)
        #這裏將從epoll中獲取到可讀可寫的事件後,添加回掉函數到self._ready這個列表中,這個列表同時也包含了用戶添加的異步任務,那麼在何時添加進來的呢?
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        # 定時任務,能夠使得異步任務在將來的某個事件點運行,用堆實現了優先級隊列,按照時間排序
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    #咱們的異步任務對應的回掉函數被封裝成了 handler 實例了,這個實例是協程安全的,
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                print("hanle %s", handle)
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

總結一下,asyncio 將異步任務和epoll獲取來的可讀可寫的回掉事件都放到了 self._ready 這個列表中統一運行。那麼異步任務何時被放到 self._ready 這個列表中來的呢
asyncio/base_events.py安全

#誰在調用這個函數呢,前文說過咱們的異步任務都會被封裝成 asyncio 中的 Task 類的 task 類的 __init__  中的這個方法調用了 call_soon , 那麼 call_at 這種將來執行的任務呢?固然最終也會調用 call_soon 的,在運行時間到的時候。
    def call_soon(self, callback, *args, context=None):
        """Arrange for a callback to be called as soon as possible.

        This operates as a FIFO queue: callbacks are called in the
        order in which they are registered.  Each callback will be
        called exactly once.

        Any positional arguments after the callback will be passed to
        the callback when it is called.
        """
        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle
 # 沒錯就是在這裏進行添加和封裝成 handler 的
 def _call_soon(self, callback, args, context):
        print("register")
        handle = events.Handle(callback, args, self, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)
        return handle

以上的一些說明只是講解了 asyncio 如何運行用戶側 task 以及異步事件的,其實用戶側異步task,被隱藏在了epoll的概念中,這也是 asyncio 很高明之處。
到此僅僅說明了 asyncio 是如何調度 task 和 epoll 事件的回掉的執行。可是異步task的回掉在這裏很重要,也就是上文提到的 _step 這個方法,這個方法在 Task 類中。這也關係到了 aio-libs 中這些python的異步庫如何改造,下文會說如何本身實現python異步庫的改造和編寫。
asyncio/tasks.pyapp

#方法很重要,不進行刪減
    def __step(self, exc=None):
        if self.done():
            raise futures.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        # 這個coro 是咱們添加進來的異步任務
        coro = self._coro
        self._fut_waiter = None

        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                #觸發異步任務運行
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        # 協程運行結束會拋出這個異常
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                super().set_exception(futures.CancelledError())
            else:
                # future(不打算翻譯成中文啦,也不解釋在編程語言或者python中什麼意思),拋出 StopIteration 異常表明異步任務運行結束,設置結果給 future,很重要,不設置結果異步任務就停不下來了(哈哈!)
                super().set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            super().set_exception(exc)
        except BaseException as exc:
            super().set_exception(exc)
            raise
        else:
            #result 是一個future實例
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                elif blocking:
                    if result is self:
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
                    else:
                        #註冊wakeup到future的callback中,這個wakeup是用來提取future中的結果用的
                        result._asyncio_future_blocking = False
                        result.add_done_callback(
                            self.__wakeup, context=self._context)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
          #小面除了 result 爲none的分以外都是出現異常
                else:
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                    print("call soon")
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)

            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                new_exc = RuntimeError(
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
            else:
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
        finally:
            _leave_task(self._loop, self)
            self = None  # Needed to break cycles when an exception occurs.

_run_once 方法是調度,調度 task 和 epoll 事件,_step 是在處理 await something() 這種語句,_step方法不是很好理解,用一段話總結一下 _step 作的事情。固然描述起來也是很抽象,
用戶寫的協程函數,會被 asyncio 封裝成 task,協程函數做爲 Task 類中 _step方法中的一個屬性,_step又會被封裝成 handler 做爲異步事件被調用,每個協程函數都有一個 future 和 wakeup 與之綁定,函數運行結果會設置到 future 中,wakeup 做爲 future 的回掉被調用(真正調用的仍是事件循環),當設置好結果後 wakeup 喚醒協程函數來提取結果,

async def a(): # 協程函數 a 有個future 和 wakeup 與之綁定
    await b() 

async def b(): # 協程函數 b 有個 future 和 wakeup 與之綁定
    await c()
    
async def c(): # 協程函數 c 有個 future 和 wakeup 與之綁定
    await asyncio.sleep(2)

asyncio 不管是使用仍是理解原理都是很難的,不像 golang 這種原生支持協程,python的協程經歷了很慢長的步伐,不去理解背後的原理在使用過程會出現不少問題。asyncio 的生態也還不完善,有時須要本身去實現異步改造。因此理解 asyncio 背後的原理很重要,只有知道原理後才知道如何本身去改造或者寫出與 asyncio 配套的工具。

參考

深刻理解asyncio(二)

相關文章
相關標籤/搜索