asyncio系列之sleep()實現

先來看個例子,本身實現的模擬耗時操做express

例1api

import types
import select
import time
import socket
import functools


class Future:

    def __init__(self, *, loop=None):
        self._result = None
        self._callbacks = []
        self._loop = loop

    def set_result(self, result):
        self._result = result
        callbacks = self._callbacks[:]
        self._callbacks = []
        for callback in callbacks:
            loop._ready.append(callback)

    def add_callback(self, callback):
        self._callbacks.append(callback)

    def __iter__(self):
        print('enter Future ...')
        print('foo 掛起在yield處 ')
        yield self
        print('foo 恢復執行')
        print('exit Future ...')
        return 'future'

    __await__ = __iter__


class Task:

    def __init__(self, cor, *, loop=None):
        self.cor = cor
        self._loop = loop

    def _step(self):
        cor = self.cor
        try:
            result = cor.send(None)
        # 1. cor 協程執行完畢時,會拋出StopIteration,說明cor執行完畢了,這是關閉loop
        except StopIteration as e:
            self._loop.close()
        # 2. 有異常時
        except Exception as e:
            """處理異常邏輯"""
        # 3. result爲Future對象時
        else:
            if isinstance(result, Future):
                result.add_callback(self._wakeup)

    def _wakeup(self):
        self._step()


class Loop:

    def __init__(self):
        self._stop = False
        self._ready = []
        self._scheduled = []
        self._time = lambda: time.time()
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        self._select = functools.partial(select.select, [sock], [], [])

    def create_task(self, cor):
        task = Task(cor, loop=self)
        self._ready.append(task._step)
        return task

    def call_later(self, delay, callback, *args):
        callback._when = delay
        self._scheduled.append((callback, *args))

    def run_until_complete(self, task):
        assert isinstance(task, Task)
        timeout = None

        while not self._stop:

            if self._ready:
                timeout = 0

            if self._scheduled:
                callback, *args = self._scheduled.pop()
                timeout = callback._when
                self._ready.append(functools.partial(callback, *args))
                
                # 經過select(timeout)來控制阻塞時間
                self._select(timeout)

            n = len(self._ready)
            for i in range(n):
                step = self._ready.pop()
                step()

    def close(self):
        self._stop = True


@types.coroutine
def _sleep():
    yield

# 本身實現一個sleep協程
async def sleep(s, result=None):
    if s <= 0:
        await _sleep()
        return result
    else:
        future = Future(loop=loop)
        future._loop.call_later(s, callback, future)
        await future
        return result

# 延遲迴調函數
def callback(future):
    # 時間到了就回調此函數
    future.set_result(None)


async def foo():
    print(f'enter foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
    await sleep(3)
    print(f'exit foo  at {time.strftime("%Y-%m-%d %H:%M:%S")}')


if __name__ == '__main__':
    f = foo()
    loop = Loop()
    task = loop.create_task(f)
    loop.run_until_complete(task)

執行結果:app

enter foo at 2019-07-08 21:09:43
enter Future ...
foo 掛起在yield處 
foo 恢復執行
exit Future ...
exit foo  at 2019-07-08 21:09:46

在上一篇文章經過Loop, Task, Future三個類基本上實現了對協程的調度,在此基礎上作了一些修改實現了對協程中耗時操做的模擬。less

首先咱們分析一下async def foo協程中的await sleep(3),這裏其實會進入到sleep中 await future這裏,再進入到future對象的__await__方法中的yield self,foo協程此時被掛起,上一篇文章中咱們分析知道,最終foo仍是被這個future對象給分紅了part1和part2兩部分邏輯。socket

- foo print('enter foo at ...')
    - sleep
        - future print('enter Future ...')   # 以上是第一次f.send(None)執行的邏輯,命名爲part1
        - future yield  self  ---------------------------------------------------------------
        - print('exit Future ...')        #如下是第二次f.send(None)執行的邏輯,命名爲part2
    - sleep
- foo print('exit foo at ...')

part1 在 loop 循環的開始就執行了,返回一個 future 對象,把 part2 註冊到 future 中,而後掛起了,下半部分 part2 在何時執行呢?由於在 sleep 中咱們經過註冊了一個3秒以後執行的回調函數 callback 到 loop 對象中,loop 對象在執行完 part1 後,會在下一輪的循環中執行 callback 回調函數,因爲 loop._scheduled 不爲空,timeout 被賦值成3,所以 select(3) 阻塞3秒後就繼續往下執行。也就是說 callback 函數的執行時機就是在 select(3) 阻塞3秒後執行,callback 回調函數中又會調用 future.set_result() ,在 set_result 中會把 part2 註冊到 loop 中,因此最終又在 loop 的下一輪循環中調用 part2 的邏輯,回到上次 foo 掛起的地方,繼續 foo 的流程,直到協程退出。async

其實所謂的模擬耗時3秒,其實就是在執行完part1後經過 select 函數阻塞3秒,而後再次執行 part2 ,這樣就實現了所謂的等待3秒的操做。函數

要實現這個sleep協程的耗時模擬,主要是有2個關鍵點:oop

  • 1.經過 select(timeout) 的 timeout來控制 select 函數的阻塞時間。ui

    timeout=None    一直阻塞,直到有真實的IO事件到來,如socket的可讀可寫事件
      timeout=0       不管此時是否有IO事件到來,都立馬返回
      timeout=n       阻塞n秒,在這n秒內,只要有IO事件到來,就立馬返回,不然阻塞n秒才返回
  • 2.當延遲時間到來時,經過 callback 函數中調用 future.set_result() 方法,來驅動 part2 的執行。debug

瞭解到這裏以後,咱們再來看一下 asyncio 的源碼

Loop類

class BaseEventLoop(events.AbstractEventLoop):

    ...

    def __init__(self):
        ...
        # 用來保存包裹task.step方法的handle對象的對端隊列
        self._ready = collections.deque()
        # 用來保存包裹延遲迴調函數的handle對象的二叉堆,是一個最小二叉堆
        self._scheduled = []
        ...

    def create_task(self, coro):
        """Schedule a coroutine object.

        Return a task object.
        """
        self._check_closed()
        # self._task_factory 默認是None
        if self._task_factory is None:
            # 建立一個task對象
            task = tasks.Task(coro, loop=self)
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
        # 返回這個task對象
        return task

    def call_soon(self, callback, *args):

        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        # 關鍵代碼callback就是task._step方法,args是task._step的參數
        handle = self._call_soon(callback, args)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle

    def _call_soon(self, callback, args):
        # 1 handle是一個包裹了task._step方法和args參數的對象
        handle = events.Handle(callback, args, self)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        # 2 關鍵代碼,把handle添加到列表self._ready中
        self._ready.append(handle)
        return handle

    def run_until_complete(self, future):
        ...
        
        # future就是task對象,下面2句是爲了確保future是一個Future類實例對象
        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False
            
        # 添加回調方法_run_until_complete_cb到當前的task對象的callbacks列表中,_run_until_complete_cb就是最後
        # 把loop的_stop屬性設置爲ture的,用來結束loop循環的
        future.add_done_callback(_run_until_complete_cb)
        try:
            # 開啓無線循環
            self.run_forever()
        except:
            ...
            raise
        finally:
            ...
        # 執行完畢返回cor的返回值
        return future.result()

    def run_forever(self):

        ...

        try:
            events._set_running_loop(self)
            while True:
                # 每次運行一次循環,判斷下_stopping是否爲true,也就是是否結束循環
                self._run_once()
                if self._stopping:
                    break
        finally:
            ...

    def _run_once(self):

        # loop的_scheduled是一個最小二叉堆,用來存放延遲執行的回調函數,根據延遲的大小,把這些回調函數構成一個最小堆,而後再每次從對頂彈出延遲最小的回調函數放入_ready雙端隊列中,
        # loop的_ready是雙端隊列,全部註冊到loop的回調函數,最終是要放入到這個隊列中,依次取出而後執行的
        # 1. self._scheduled是否爲空
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False
        
        # 2. 給timeout賦值,self._scheduled爲空,timeout就爲None
        timeout = None
        # 只要self._ready和self._scheduled中有一個不爲空,timeout就爲0
        if self._ready or self._stopping:
            timeout = 0
        # 只要self._scheduled不爲空
        elif self._scheduled:
            # Compute the desired timeout.
            # 用堆頂的回調函數的延遲時間做爲timeout的等待時間,也就是說用等待時間最短的回調函數的時間做爲timeout的等待時間
            when = self._scheduled[0]._when
            timeout = max(0, when - self.time())
        、
        
        if self._debug and timeout != 0:
            ...
        # 3. 關注else分支,這是關鍵代碼
        else:
            # timeout=None --> 一直阻塞,只要有io事件產生,立馬返回event_list事件列表,不然一直阻塞着
            # timeout=0 --> 不阻塞,有io事件產生,就立馬返回event_list事件列表,沒有也返空列表
            # timeout=2 --> 阻塞等待2s,在這2秒內只要有io事件產生,立馬返回event_list事件列表,沒有io事件就阻塞2s,而後返回空列表
            event_list = self._selector.select(timeout)
            
        #  用來處理真正的io事件的函數
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        # 4. 依次取出堆頂的回調函數handle添加到_ready隊列中
        while self._scheduled:
            handle = self._scheduled[0]
            # 當_scheduled[]中有多個延遲迴調時,經過handle._when >= end_time來阻止沒有到時間的延遲函數被彈出,
            # 也就是說,當有n個延遲迴調時,會產生n個timeout,對應n次run_once循環的調用
            if handle._when >= end_time:
                break
            # 從堆中彈出堆頂最小的回調函數,放入 _ready 隊列中
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)
        
        # 5. 執行self._ready隊列中全部的回調函數handle對象
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                # handle._run()實際上就是執行task._step(),也就是執行cor.send(None)
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

Task類

class Task(futures.Future):
    
    ...
    
    def _step(self, exc=None):
        """
        _step方法能夠看作是task包裝的coroutine對象中的代碼的直到yield的前半部分邏輯
        """
        ...
        try:
            if exc is None:
                
                # 1.關鍵代碼,調用協程
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        # 2. coro執行完畢會拋出StopIteration異常
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                self.set_exception(futures.CancelledError())
            else:
                # result爲None時,調用task的callbasks列表中的回調方法,在調用loop.run_until_complite,結束loop循環
                self.set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            self.set_exception(exc)
        except BaseException as exc:
            self.set_exception(exc)
            raise
        # 3. result = coro.send(None)不拋出異常,說明協程被yield掛起
        else:
            # 4. 查看result是否含有_asyncio_future_blocking屬性
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if result._loop is not self._loop:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'Task {!r} got Future {!r} attached to a '
                            'different loop'.format(self, result)))
                
                elif blocking:
                    if result is self:
                        self._loop.call_soon(
                            self._step,
                            RuntimeError(
                                'Task cannot await on itself: {!r}'.format(
                                    self)))
                    # 4.1. 若是result是一個future對象時,blocking會被設置成true
                    else:
                        result._asyncio_future_blocking = False
                        # 把_wakeup回調函數設置到此future對象中,當此future對象調用set_result()方法時,就會調用_wakeup方法
                        result.add_done_callback(self._wakeup)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
                else:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'yield was used instead of yield from '
                            'in task {!r} with {!r}'.format(self, result)))
            # 5. 若是result是None,則註冊task._step到loop對象中去,在下一輪_run_once中被回調
            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self._step)

            # --------下面的代碼能夠暫時不關注了--------
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'yield was used instead of yield from for '
                        'generator in task {!r} with {}'.format(
                            self, result)))
            else:
                # Yielding something else is an error.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'Task got bad yield: {!r}'.format(result)))
        finally:
            self.__class__._current_tasks.pop(self._loop)
            self = None  # Needed to break cycles when an exception occurs.

    def _wakeup(self, future):
        try:
            future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self._step(exc)
        else:
            
            # 這裏是關鍵代碼,上次的_step()執行到第一次碰到yield的地方掛住了,此時再次執行_step(),
            # 也就是再次執行 result = coro.send(None) 這句代碼,也就是從上次yield的地方繼續執行yield後面的邏輯
            self._step()
        self = None  # Needed to break cycles when an exception occurs.

Future類

class Future:

    ...

    def add_done_callback(self, fn, *, context=None):
        if self._state != _PENDING:
            self._loop.call_soon(fn, self, context=context)
        else:
            if context is None:
                context = contextvars.copy_context()
            self._callbacks.append((fn, context))

    def set_result(self, result):

        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
    self.__schedule_callbacks()

    def __iter__(self):
        # self.done()返回False,
        if not self.done():
            self._asyncio_future_blocking = True
            # 把Future對象本身返回出去
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

    if compat.PY35:
        __await__ = __iter__ # make compatible with 'await' expression

sleep協程

#延遲迴調函數,裏面調用fut.set_result
def _set_result_unless_cancelled(fut, result):
    if fut.cancelled():
        return
    # 關鍵是這一步,驅動協程從上次掛起的地方繼續執行
    fut.set_result(result)

@types.coroutine
def __sleep0():

    yield

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    if loop is None:
        loop = events.get_event_loop()
    # 建立一個future對象
    future = loop.create_future()
    # 註冊一個延遲迴調函數到loop對象中
    h = loop.call_later(delay, futures._set_result_unless_cancelled, future,  result)
    try:
        return await future
    finally:
        h.cancel()

關鍵地方我都寫了註釋,若是能耐着性子細心看下來,你會發現例1中的實現,就是模仿asyncio中的這幾個類去實現的。

asyncio的sleep中的延遲迴調函數是_set_result_unless_cancelled與我寫的callback對應,關鍵都是要回調future.set_result方法,這樣才能驅動協程從上次掛起的地方開始繼續執行。

對於使用asyncio.sleep的例子

import asyncio


async def cor():
    print('enter cor ...')
    await asyncio.sleep(2)
    print('exit cor ...')
    
    return 'cor'

loop = asyncio.get_event_loop()
task = loop.create_task(cor())
rst = loop.run_until_complete(task)
print(rst)

await asyncio.sleep(2) 這句代碼一樣是把cor協程分爲以下兩個部分:

- cor print('enter cor ...')
    - sleep 
        - future print('enter Future ...')   # 以上是第一次cor.send(None)執行的邏輯,命名爲part1
        - future yield  self  ---------------------------------------------------------------
        - future print('exit Future ...')    # 如下是第二次cor.send(None)執行的邏輯,命名爲part2
    - sleep 
- cor print('exit foo ...')

總之,只要有要耗時的地方,就必需要有一個 future 用來 await future,而後協程就被分紅了part1和part2,part1和part2就被分別封裝到了task._step和task._wakeup中,而後在loop循環中先調用part1,再經過select函數阻塞n秒以後,再執行part2,最後,協程執行完畢。

相關文章
相關標籤/搜索