先來看個例子,本身實現的模擬耗時操做express
例1api
import types import select import time import socket import functools class Future: def __init__(self, *, loop=None): self._result = None self._callbacks = [] self._loop = loop def set_result(self, result): self._result = result callbacks = self._callbacks[:] self._callbacks = [] for callback in callbacks: loop._ready.append(callback) def add_callback(self, callback): self._callbacks.append(callback) def __iter__(self): print('enter Future ...') print('foo 掛起在yield處 ') yield self print('foo 恢復執行') print('exit Future ...') return 'future' __await__ = __iter__ class Task: def __init__(self, cor, *, loop=None): self.cor = cor self._loop = loop def _step(self): cor = self.cor try: result = cor.send(None) # 1. cor 協程執行完畢時,會拋出StopIteration,說明cor執行完畢了,這是關閉loop except StopIteration as e: self._loop.close() # 2. 有異常時 except Exception as e: """處理異常邏輯""" # 3. result爲Future對象時 else: if isinstance(result, Future): result.add_callback(self._wakeup) def _wakeup(self): self._step() class Loop: def __init__(self): self._stop = False self._ready = [] self._scheduled = [] self._time = lambda: time.time() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(False) self._select = functools.partial(select.select, [sock], [], []) def create_task(self, cor): task = Task(cor, loop=self) self._ready.append(task._step) return task def call_later(self, delay, callback, *args): callback._when = delay self._scheduled.append((callback, *args)) def run_until_complete(self, task): assert isinstance(task, Task) timeout = None while not self._stop: if self._ready: timeout = 0 if self._scheduled: callback, *args = self._scheduled.pop() timeout = callback._when self._ready.append(functools.partial(callback, *args)) # 經過select(timeout)來控制阻塞時間 self._select(timeout) n = len(self._ready) for i in range(n): step = self._ready.pop() step() def close(self): self._stop = True @types.coroutine def _sleep(): yield # 本身實現一個sleep協程 async def sleep(s, result=None): if s <= 0: await _sleep() return result else: future = Future(loop=loop) future._loop.call_later(s, callback, future) await future return result # 延遲迴調函數 def callback(future): # 時間到了就回調此函數 future.set_result(None) async def foo(): print(f'enter foo at {time.strftime("%Y-%m-%d %H:%M:%S")}') await sleep(3) print(f'exit foo at {time.strftime("%Y-%m-%d %H:%M:%S")}') if __name__ == '__main__': f = foo() loop = Loop() task = loop.create_task(f) loop.run_until_complete(task)
執行結果:app
enter foo at 2019-07-08 21:09:43 enter Future ... foo 掛起在yield處 foo 恢復執行 exit Future ... exit foo at 2019-07-08 21:09:46
在上一篇文章經過Loop, Task, Future三個類基本上實現了對協程的調度,在此基礎上作了一些修改實現了對協程中耗時操做的模擬。less
首先咱們分析一下async def foo協程中的await sleep(3),這裏其實會進入到sleep中 await future這裏,再進入到future對象的__await__方法中的yield self,foo協程此時被掛起,上一篇文章中咱們分析知道,最終foo仍是被這個future對象給分紅了part1和part2兩部分邏輯。socket
- foo print('enter foo at ...') - sleep - future print('enter Future ...') # 以上是第一次f.send(None)執行的邏輯,命名爲part1 - future yield self --------------------------------------------------------------- - print('exit Future ...') #如下是第二次f.send(None)執行的邏輯,命名爲part2 - sleep - foo print('exit foo at ...')
part1 在 loop 循環的開始就執行了,返回一個 future 對象,把 part2 註冊到 future 中,而後掛起了,下半部分 part2 在何時執行呢?由於在 sleep 中咱們經過註冊了一個3秒以後執行的回調函數 callback 到 loop 對象中,loop 對象在執行完 part1 後,會在下一輪的循環中執行 callback 回調函數,因爲 loop._scheduled 不爲空,timeout 被賦值成3,所以 select(3) 阻塞3秒後就繼續往下執行。也就是說 callback 函數的執行時機就是在 select(3) 阻塞3秒後執行,callback 回調函數中又會調用 future.set_result() ,在 set_result 中會把 part2 註冊到 loop 中,因此最終又在 loop 的下一輪循環中調用 part2 的邏輯,回到上次 foo 掛起的地方,繼續 foo 的流程,直到協程退出。async
其實所謂的模擬耗時3秒,其實就是在執行完part1後經過 select 函數阻塞3秒,而後再次執行 part2 ,這樣就實現了所謂的等待3秒的操做。函數
要實現這個sleep協程的耗時模擬,主要是有2個關鍵點:oop
1.經過 select(timeout) 的 timeout來控制 select 函數的阻塞時間。ui
timeout=None 一直阻塞,直到有真實的IO事件到來,如socket的可讀可寫事件 timeout=0 不管此時是否有IO事件到來,都立馬返回 timeout=n 阻塞n秒,在這n秒內,只要有IO事件到來,就立馬返回,不然阻塞n秒才返回
2.當延遲時間到來時,經過 callback 函數中調用 future.set_result() 方法,來驅動 part2 的執行。debug
瞭解到這裏以後,咱們再來看一下 asyncio 的源碼
Loop類
class BaseEventLoop(events.AbstractEventLoop): ... def __init__(self): ... # 用來保存包裹task.step方法的handle對象的對端隊列 self._ready = collections.deque() # 用來保存包裹延遲迴調函數的handle對象的二叉堆,是一個最小二叉堆 self._scheduled = [] ... def create_task(self, coro): """Schedule a coroutine object. Return a task object. """ self._check_closed() # self._task_factory 默認是None if self._task_factory is None: # 建立一個task對象 task = tasks.Task(coro, loop=self) if task._source_traceback: del task._source_traceback[-1] else: task = self._task_factory(self, coro) # 返回這個task對象 return task def call_soon(self, callback, *args): self._check_closed() if self._debug: self._check_thread() self._check_callback(callback, 'call_soon') # 關鍵代碼callback就是task._step方法,args是task._step的參數 handle = self._call_soon(callback, args) if handle._source_traceback: del handle._source_traceback[-1] return handle def _call_soon(self, callback, args): # 1 handle是一個包裹了task._step方法和args參數的對象 handle = events.Handle(callback, args, self) if handle._source_traceback: del handle._source_traceback[-1] # 2 關鍵代碼,把handle添加到列表self._ready中 self._ready.append(handle) return handle def run_until_complete(self, future): ... # future就是task對象,下面2句是爲了確保future是一個Future類實例對象 new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False # 添加回調方法_run_until_complete_cb到當前的task對象的callbacks列表中,_run_until_complete_cb就是最後 # 把loop的_stop屬性設置爲ture的,用來結束loop循環的 future.add_done_callback(_run_until_complete_cb) try: # 開啓無線循環 self.run_forever() except: ... raise finally: ... # 執行完畢返回cor的返回值 return future.result() def run_forever(self): ... try: events._set_running_loop(self) while True: # 每次運行一次循環,判斷下_stopping是否爲true,也就是是否結束循環 self._run_once() if self._stopping: break finally: ... def _run_once(self): # loop的_scheduled是一個最小二叉堆,用來存放延遲執行的回調函數,根據延遲的大小,把這些回調函數構成一個最小堆,而後再每次從對頂彈出延遲最小的回調函數放入_ready雙端隊列中, # loop的_ready是雙端隊列,全部註冊到loop的回調函數,最終是要放入到這個隊列中,依次取出而後執行的 # 1. self._scheduled是否爲空 sched_count = len(self._scheduled) if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False # 2. 給timeout賦值,self._scheduled爲空,timeout就爲None timeout = None # 只要self._ready和self._scheduled中有一個不爲空,timeout就爲0 if self._ready or self._stopping: timeout = 0 # 只要self._scheduled不爲空 elif self._scheduled: # Compute the desired timeout. # 用堆頂的回調函數的延遲時間做爲timeout的等待時間,也就是說用等待時間最短的回調函數的時間做爲timeout的等待時間 when = self._scheduled[0]._when timeout = max(0, when - self.time()) 、 if self._debug and timeout != 0: ... # 3. 關注else分支,這是關鍵代碼 else: # timeout=None --> 一直阻塞,只要有io事件產生,立馬返回event_list事件列表,不然一直阻塞着 # timeout=0 --> 不阻塞,有io事件產生,就立馬返回event_list事件列表,沒有也返空列表 # timeout=2 --> 阻塞等待2s,在這2秒內只要有io事件產生,立馬返回event_list事件列表,沒有io事件就阻塞2s,而後返回空列表 event_list = self._selector.select(timeout) # 用來處理真正的io事件的函數 self._process_events(event_list) # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution # 4. 依次取出堆頂的回調函數handle添加到_ready隊列中 while self._scheduled: handle = self._scheduled[0] # 當_scheduled[]中有多個延遲迴調時,經過handle._when >= end_time來阻止沒有到時間的延遲函數被彈出, # 也就是說,當有n個延遲迴調時,會產生n個timeout,對應n次run_once循環的調用 if handle._when >= end_time: break # 從堆中彈出堆頂最小的回調函數,放入 _ready 隊列中 handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) # 5. 執行self._ready隊列中全部的回調函數handle對象 ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: # handle._run()實際上就是執行task._step(),也就是執行cor.send(None) handle._run() handle = None # Needed to break cycles when an exception occurs.
Task類
class Task(futures.Future): ... def _step(self, exc=None): """ _step方法能夠看作是task包裝的coroutine對象中的代碼的直到yield的前半部分邏輯 """ ... try: if exc is None: # 1.關鍵代碼,調用協程 result = coro.send(None) else: result = coro.throw(exc) # 2. coro執行完畢會拋出StopIteration異常 except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False self.set_exception(futures.CancelledError()) else: # result爲None時,調用task的callbasks列表中的回調方法,在調用loop.run_until_complite,結束loop循環 self.set_result(exc.value) except futures.CancelledError: super().cancel() # I.e., Future.cancel(self). except Exception as exc: self.set_exception(exc) except BaseException as exc: self.set_exception(exc) raise # 3. result = coro.send(None)不拋出異常,說明協程被yield掛起 else: # 4. 查看result是否含有_asyncio_future_blocking屬性 blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # Yielded Future must come from Future.__iter__(). if result._loop is not self._loop: self._loop.call_soon( self._step, RuntimeError( 'Task {!r} got Future {!r} attached to a ' 'different loop'.format(self, result))) elif blocking: if result is self: self._loop.call_soon( self._step, RuntimeError( 'Task cannot await on itself: {!r}'.format( self))) # 4.1. 若是result是一個future對象時,blocking會被設置成true else: result._asyncio_future_blocking = False # 把_wakeup回調函數設置到此future對象中,當此future對象調用set_result()方法時,就會調用_wakeup方法 result.add_done_callback(self._wakeup) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel(): self._must_cancel = False else: self._loop.call_soon( self._step, RuntimeError( 'yield was used instead of yield from ' 'in task {!r} with {!r}'.format(self, result))) # 5. 若是result是None,則註冊task._step到loop對象中去,在下一輪_run_once中被回調 elif result is None: # Bare yield relinquishes control for one event loop iteration. self._loop.call_soon(self._step) # --------下面的代碼能夠暫時不關注了-------- elif inspect.isgenerator(result): # Yielding a generator is just wrong. self._loop.call_soon( self._step, RuntimeError( 'yield was used instead of yield from for ' 'generator in task {!r} with {}'.format( self, result))) else: # Yielding something else is an error. self._loop.call_soon( self._step, RuntimeError( 'Task got bad yield: {!r}'.format(result))) finally: self.__class__._current_tasks.pop(self._loop) self = None # Needed to break cycles when an exception occurs. def _wakeup(self, future): try: future.result() except Exception as exc: # This may also be a cancellation. self._step(exc) else: # 這裏是關鍵代碼,上次的_step()執行到第一次碰到yield的地方掛住了,此時再次執行_step(), # 也就是再次執行 result = coro.send(None) 這句代碼,也就是從上次yield的地方繼續執行yield後面的邏輯 self._step() self = None # Needed to break cycles when an exception occurs.
Future類
class Future: ... def add_done_callback(self, fn, *, context=None): if self._state != _PENDING: self._loop.call_soon(fn, self, context=context) else: if context is None: context = contextvars.copy_context() self._callbacks.append((fn, context)) def set_result(self, result): if self._state != _PENDING: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED self.__schedule_callbacks() def __iter__(self): # self.done()返回False, if not self.done(): self._asyncio_future_blocking = True # 把Future對象本身返回出去 yield self # This tells Task to wait for completion. assert self.done(), "yield from wasn't used with future" return self.result() # May raise too. if compat.PY35: __await__ = __iter__ # make compatible with 'await' expression
sleep協程
#延遲迴調函數,裏面調用fut.set_result def _set_result_unless_cancelled(fut, result): if fut.cancelled(): return # 關鍵是這一步,驅動協程從上次掛起的地方繼續執行 fut.set_result(result) @types.coroutine def __sleep0(): yield async def sleep(delay, result=None, *, loop=None): """Coroutine that completes after a given time (in seconds).""" if delay <= 0: await __sleep0() return result if loop is None: loop = events.get_event_loop() # 建立一個future對象 future = loop.create_future() # 註冊一個延遲迴調函數到loop對象中 h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result) try: return await future finally: h.cancel()
關鍵地方我都寫了註釋,若是能耐着性子細心看下來,你會發現例1中的實現,就是模仿asyncio中的這幾個類去實現的。
asyncio的sleep中的延遲迴調函數是_set_result_unless_cancelled與我寫的callback對應,關鍵都是要回調future.set_result方法,這樣才能驅動協程從上次掛起的地方開始繼續執行。
對於使用asyncio.sleep的例子
import asyncio async def cor(): print('enter cor ...') await asyncio.sleep(2) print('exit cor ...') return 'cor' loop = asyncio.get_event_loop() task = loop.create_task(cor()) rst = loop.run_until_complete(task) print(rst)
await asyncio.sleep(2) 這句代碼一樣是把cor協程分爲以下兩個部分:
- cor print('enter cor ...') - sleep - future print('enter Future ...') # 以上是第一次cor.send(None)執行的邏輯,命名爲part1 - future yield self --------------------------------------------------------------- - future print('exit Future ...') # 如下是第二次cor.send(None)執行的邏輯,命名爲part2 - sleep - cor print('exit foo ...')
總之,只要有要耗時的地方,就必需要有一個 future 用來 await future,而後協程就被分紅了part1和part2,part1和part2就被分別封裝到了task._step和task._wakeup中,而後在loop循環中先調用part1,再經過select函數阻塞n秒以後,再執行part2,最後,協程執行完畢。