基於async關鍵字的原生協程python
# 定義一個簡單的原生協程cor async def cor(): print('enter cor') print('exit cor') print(type(cor)) # <class 'function'> print(type(cor())) # <class 'coroutine'>
能夠看到cor
的類型<class 'function'>
函數類型,說明async
關鍵字修飾的函數也是一個函數而已,跟普通函數在定義上沒啥什麼差異,差異在於被調用的時候,cor()
並非執行函數體中的語句,而是生成一個<class 'coroutine'>
類型的對象,即協程對象,跟生成器相似,協程也有send()
方法。express
c = cor() try: c.send(None) except StopIteration as e: print(e.value) # None
當c.send(None)時,纔會執行cor中的語句,也就是執行print('enter cor')
和print('exit cor')
這兩句,就執行完畢了,執行完畢時會拋出StopIteration
異常,這個異常對象的value
屬性中保存着cor
的返回值,這裏core
沒有return
語句,其實默認返回None
。api
原生協程中使用 await 關鍵字app
async def cor(): print('enter cor') rst = await cor1() print('rst -->', rst) print('exit cor') async def cor1(): print('enter cor1') print('exit cor1') return 'cor1' c = cor() try: c.send(None) except StopIteration as e: print('cor -->', e.value) # None
輸出以下:less
enter cor enter cor1 exit cor1 rst --> cor1 exit cor cor --> None
await
關鍵字後面必須是一個實現了__awwit__
方法的對象,不必定是協程對象,只要實現了這個方法,就會進入到此方法中,調用該方法中的邏輯。好比後面要介紹的Future
對象,它也是實現了__await__
方法的。socket
await
關鍵字的語義是表示掛起當前的cor
協程的執行,進入到cor1
協程中執行cor1
的邏輯,直到cor1
執行完畢,而後執行流程又回到cor
掛起的地方,繼續執行cor
中await
後面的語句。直到最後拋出StopIteration
異常。async
基於生成器的協程,也就是非原生協程函數
import types async def cor(): print('enter cor') rst = await cor2() print('rst --> ', rst) print('exit cor') @types.coroutine def cor2(): print('enter cor2') yield print('exit cor2') return 'cor2' c = cor() try: c.send(None) c.send(None) except StopIteration as e: print('cor -->', e.value) # None
輸出以下:oop
enter cor enter cor2 exit cor2 rst --> cor2 exit cor cor --> None
與上面的原生協程的嵌套不一樣的是,調用了兩次c.send(None)
,執行第一次c.send(None)
時,會在cor2
的yield
關鍵字處掛起,第二次c.send(None)
則會在yield
掛起的地方,接着往下執行,而後core2
返回'cor2'
,賦值給rst
變量,繼續執行cor
中await
後面的語句,直到最後拋出StopIteration
異常。ui
總結
async 是一個關鍵字,async def 定義的類型仍是一個function類型,只有當它被調用時才返回一個協程對象
async def
跟def
定義的方法在沒被調用時沒有任何區別,沒必要看得很神祕,它也能夠有return
語句,這點也正常,
由於python
中沒有return
語句的函數實際上默認是返回None
的,因此只是顯式的return
和隱式return None
的區別
對於協程的send(None)
方法,跟生成器的send(None)
相似,不一樣的是,原生協程的send方法被調用的時候,會一直執行到碰
到await
語句,可是不會停下來,會直接再進入到await EXPR
的EXPR
中,其實EXPR
是一個awaitable
對象,會調用該對象的
__await__()
執行該方法的裏面的邏輯,若是該awaitable
對象是一個原生協程對象,那麼它的__await__()
方法中的邏輯就
是在定義此協程時async def
下面的邏輯,執行完畢後,該協程對象就關閉了,執行流程就再次跳轉到當前掛起的協程中,
執行該協程中餘下的邏輯,最後執行完畢,拋出StopIteration
異常。
對於原生生協程來講,調用send()
方法時,會一直執行到出現StopIteration
異常爲止,只有在 __await__()
方法中有yield
語句時才
會掛起在那裏,若是__await__()
方法中沒有yield
語句,不會掛起,會返回__await__的返回值,繼續執行,直到拋出StopIteration
異常。
先拋出一個結論,在asyncio
中協程的流程的掛起操做,實際上仍是是經過yield
關鍵字來實現的,並非await
關鍵字, async
和await
關鍵字只不過是語法糖。
import asyncio async def cor(): print('enter cor ...') await asyncio.sleep(2) print('exit cor ...') return 'cor' loop = asyncio.get_event_loop() task = loop.create_task(cor()) rst = loop.run_until_complete(task) print(rst)
從這個簡單的例子入手,逐步分析協程在事件循環中的執行流程。
第1步 async def cor()
定義了一個cor
協程。
第2步 loop = asyncio.get_event_loop()
獲得一個事件循環對象loop
,這個loop
在一個線程中只有惟一的一個實例,只要在同一個線程中調用此方法,獲得的都是同一個loop
對象。
第3步 task = loop.create_task(cor())
把cor
協程包裝成一個task
對象。
第4步 rst = loop.run_until_complete(task)
把這個task
對象添加到事件循環中去。
首先看第3步的loop.create_task(cor())這個方法
class BaseEventLoop(events.AbstractEventLoop): ... def __init__(self): ... # 用來保存包裹task.step方法的handle對象的對端隊列 self._ready = collections.deque() # 用來保存包裹延遲迴調函數的handle對象的二叉堆,是一個最小二叉堆 self._scheduled = [] ... def create_task(self, coro): """Schedule a coroutine object. Return a task object. """ self._check_closed() # self._task_factory 默認是None if self._task_factory is None: # 建立一個task對象 task = tasks.Task(coro, loop=self) if task._source_traceback: del task._source_traceback[-1] else: task = self._task_factory(self, coro) # 返回這個task對象 return task def call_soon(self, callback, *args): self._check_closed() if self._debug: self._check_thread() self._check_callback(callback, 'call_soon') # 關鍵代碼callback就是task._step方法,args是task._step的參數 handle = self._call_soon(callback, args) if handle._source_traceback: del handle._source_traceback[-1] return handle def _call_soon(self, callback, args): # 1 handle是一個包裹了task._step方法和args參數的對象 handle = events.Handle(callback, args, self) if handle._source_traceback: del handle._source_traceback[-1] # 2 關鍵代碼,把handle添加到列表self._ready中 self._ready.append(handle) return handle
loop.create_task(cor())
其實是建立了一個Task
類的實例。再來看一下Task
這個類
class Task(futures.Future): ... def __init__(self, coro, *, loop=None): assert coroutines.iscoroutine(coro), repr(coro) # 調用父類的__init__得到線程中惟一的loop對象 super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] self._coro = coro self._fut_waiter = None self._must_cancel = False # 關鍵代碼,把_step方法註冊到loop對象中去 self._loop.call_soon(self._step) self.__class__._all_tasks.add(self) def _step(self, exc=None): ... result = coro.send(None) ...
task
實例化時,調用了self._loop.call_soon(self._step)
--> loop.call_soon(callback, *args)
--> loop._call_soon(callback, args)
,其實是把handle(task._step)
這個對象放到了loop._ready
隊列中,放在這個隊列中有什麼用呢?先告訴你們,_step
方法會在loop
對象的循環中被調用,也就是會執行coro.send(None)
這句。coro.send(None)
實際上就是執行上面定義的cor
協程的裏面的語句。
也就是說到第3步
執行完時,loop
對象已經實例化,task
對象也實例化,而且task
對象的_step
方法被封裝成handle
對象放入了loop
對象的_ready
隊列中去了。
再來看第4步loop.run_until_complete(task)
class BaseEventLoop(events.AbstractEventLoop): def run_until_complete(self, future): ... # future就是task對象,下面2句是爲了確保future是一個Future類實例對象 new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False # 添加回調方法_run_until_complete_cb到當前的task對象的callbacks列表中,_run_until_complete_cb就是最後 # 把loop的_stop屬性設置爲ture的,用來結束loop循環的 future.add_done_callback(_run_until_complete_cb) try: # 開啓無線循環 self.run_forever() except: ... raise finally: ... # 執行完畢返回cor的返回值 return future.result() def run_forever(self): ... try: events._set_running_loop(self) while True: # 每次運行一次循環,判斷下_stopping是否爲true,也就是是否結束循環 self._run_once() if self._stopping: break finally: ... def _run_once(self): # loop的_scheduled是一個最小二叉堆,用來存放延遲執行的回調函數,根據延遲的大小,把這些回調函數構成一個最小堆,而後再每次從對頂彈出延遲最小的回調函數放入_ready雙端隊列中, # loop的_ready是雙端隊列,全部註冊到loop的回調函數,最終是要放入到這個隊列中,依次取出而後執行的 # 1. self._scheduled是否爲空 sched_count = len(self._scheduled) if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False # 2. 給timeout賦值,self._scheduled爲空,timeout就爲None timeout = None # 只要self._ready和self._scheduled中有一個不爲空,timeout就爲0 if self._ready or self._stopping: timeout = 0 # 只要self._scheduled不爲空 elif self._scheduled: # Compute the desired timeout. # 用堆頂的回調函數的延遲時間做爲timeout的等待時間,也就是說用等待時間最短的回調函數的時間做爲timeout的等待時間 when = self._scheduled[0]._when timeout = max(0, when - self.time()) 、 if self._debug and timeout != 0: ... # 3. 關注else分支,這是關鍵代碼 else: # timeout=None --> 一直阻塞,只要有io事件產生,立馬返回event_list事件列表,不然一直阻塞着 # timeout=0 --> 不阻塞,有io事件產生,就立馬返回event_list事件列表,沒有也返空列表 # timeout=2 --> 阻塞等待2s,在這2秒內只要有io事件產生,立馬返回event_list事件列表,沒有io事件就阻塞2s,而後返回空列表 event_list = self._selector.select(timeout) # 用來處理真正的io事件的函數 self._process_events(event_list) # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution # 4. 依次取出堆頂的回調函數handle添加到_ready隊列中 while self._scheduled: handle = self._scheduled[0] # 當_scheduled[]中有多個延遲迴調時,經過handle._when >= end_time來阻止沒有到時間的延遲函數被彈出, # 也就是說,當有n個延遲迴調時,會產生n個timeout,對應n次run_once循環的調用 if handle._when >= end_time: break # 從堆中彈出堆頂最小的回調函數,放入 _ready 隊列中 handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) # 5. 執行self._ready隊列中全部的回調函數handle對象 ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: # handle._run()實際上就是執行task._step(),也就是執行cor.send(None) handle._run() handle = None # Needed to break cycles when an exception occurs.
執行run_until_complete
方法時future.add_done_callback(_run_until_complete_cb)
這一步實際上是task.add_done_callback(_run_until_complete_cb)
,也就是把_run_until_complete_cb
回調函數註冊到task
對象中去了,這個回調函數的做用是當cor協程執行完畢時,回調_run_until_complete_cb
把loop對象的 _stopping
屬性設爲True
,而後_run_once
執行完畢時,判斷_stopping
爲True
就跳出while
循環,run_until_complete
才能返回task.result
從上面的函數調用流程
run_until_complete()
--> run_forever()
--> _run_once()
,重點看_run_once
這個方法的執行流程。
此時:
cor
協程還未開始執行。
loop._ready = [handle(task._step)]
,loop._scheduled = []
。
_run_once()
的調用執行開始注意這裏的第1,2,3,4,5步是在_run_once()
上標記的1,2,3,4,5註釋
第1,2步的邏輯判斷,timeout = 0
。
第3步 event_list = self._selector.select(0)
,此時立馬返回空[]。
第4步 因爲loop._scheduled = []
,不執行第4步中的語句。
第5步 依次從_ready
隊列中取出回調函數handle,執行handle._run()
。
執行handle._run()
方法,也就是調用task._step()
,來看看task._step()
的執行邏輯:
class Task(futures.Future): ... def _step(self, exc=None): """ _step方法能夠看作是task包裝的coroutine對象中的代碼的直到yield的前半部分邏輯 """ ... try: if exc is None: # 1.關鍵代碼 result = coro.send(None) else: result = coro.throw(exc) # 2. coro執行完畢會拋出StopIteration異常 except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False self.set_exception(futures.CancelledError()) else: # result爲None時,調用task的callbasks列表中的回調方法,在調用loop.run_until_complite,結束loop循環 self.set_result(exc.value) except futures.CancelledError: super().cancel() # I.e., Future.cancel(self). except Exception as exc: self.set_exception(exc) except BaseException as exc: self.set_exception(exc) raise # 3. result = coro.send(None)不拋出異常 else: # 4. 查看result是否含有_asyncio_future_blocking屬性 blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # Yielded Future must come from Future.__iter__(). if result._loop is not self._loop: self._loop.call_soon( self._step, RuntimeError( 'Task {!r} got Future {!r} attached to a ' 'different loop'.format(self, result))) elif blocking: if result is self: self._loop.call_soon( self._step, RuntimeError( 'Task cannot await on itself: {!r}'.format( self))) # 4.1. 若是result是一個future對象時,blocking會被設置成true else: result._asyncio_future_blocking = False # 把_wakeup回調函數設置到此future對象中,當此future對象調用set_result()方法時,就會調用_wakeup方法 result.add_done_callback(self._wakeup) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel(): self._must_cancel = False else: self._loop.call_soon( self._step, RuntimeError( 'yield was used instead of yield from ' 'in task {!r} with {!r}'.format(self, result))) # 5. 若是result是None,則註冊task._step到loop對象中去,在下一輪_run_once中被回調 elif result is None: # Bare yield relinquishes control for one event loop iteration. self._loop.call_soon(self._step) # --------下面的代碼能夠暫時不關注了-------- elif inspect.isgenerator(result): # Yielding a generator is just wrong. self._loop.call_soon( self._step, RuntimeError( 'yield was used instead of yield from for ' 'generator in task {!r} with {}'.format( self, result))) else: # Yielding something else is an error. self._loop.call_soon( self._step, RuntimeError( 'Task got bad yield: {!r}'.format(result))) finally: self.__class__._current_tasks.pop(self._loop) self = None # Needed to break cycles when an exception occurs. def _wakeup(self, future): try: future.result() except Exception as exc: # This may also be a cancellation. self._step(exc) else: # 這裏是關鍵代碼,上次的_step()執行到第一次碰到yield的地方掛住了,此時再次執行_step(), # 也就是再次執行 result = coro.send(None) 這句代碼,也就是從上次yield的地方繼續執行yield後面的邏輯 self._step() self = None # Needed to break cycles when an exception occurs.
當task._step()執行時,調用core.send(None),即調用:
async def cor(): # 1. print('enter cor ...') # 2. await asyncio.sleep(2) # 3. print('exit cor ...') # 4. return 'cor'
注意這裏的第1,2,3,4步是在cor
上標記的1,2,3,4註釋
第1步 print('enter cor ...')
第2步 await asyncio.sleep(2)
,sleep
是一個非原生協程,前面介紹過,await
語句掛起當前的協程也就是cor
,而後會進入到sleep
協程中的。注意,此時執行流程已經在sleep
協程中了,咱們來看一下sleep
協程的代碼邏輯。
看一下sleep
協程實現
@coroutine def sleep(delay, result=None, *, loop=None): """Coroutine that completes after a given time (in seconds).""" if delay == 0: yield return result if loop is None: loop = events.get_event_loop() # 1. 建立一個future對象,用來銜接前一個task future = loop.create_future() # 2. 添加一個延遲執行的回調函數futures._set_result_unless_cancelled 到當前loop對象的_scheduled二叉堆中,這個堆中的回調函數按照delay的大小,造成一個最小堆 h = future._loop.call_later(delay, futures._set_result_unless_cancelled, future, result) try: # 3. 執行 yield from future 語句,此時會調用future的 __iter__() 方法,而後在 yield future 處掛住,返回future自己,self._asyncio_future_blocking = True return (yield from future) finally: h.cancel()
sleep
是一個非原生協程,delay=2
注意這裏的第1,2,3步是在sleep
上標記的1,2,3註釋
第1步 生成一個新Future
對象,這個對象不一樣於跟task
是不一樣的對象,他們都是Future
類型的對象,由於Task
類繼承自Future
類。
第2步 loop
對象中註冊了一個futures._set_result_unless_cancelled
的延遲迴調函數handle
對象,前面介紹過,延遲迴調函數handle
對象是放在loop._scheduled
這個最小二叉堆
中的,此時,loop
對象的_scheduled
最小堆中只有一個延遲迴調函數handle
。到sleep
中的第2步完成爲止,loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]
,loop._ready=[]
,注意正在執行的handle._run()
的流程還沒走完,如今是進入到了sleep
協程中的第2步中。
第3步 執行(yield from future)
會調用future
的__iter__
方法,進入到__iter__
方法中,首先把self._asyncio_future_blocking
賦值爲 True
了,,而後yield self
,注意,此時cor
協程的執行流程就掛起在了yield
處,返回self
也就是Future
對象本身,也就是說執行result= core.send(None)
最終掛起在新的Future
對象的yield self
處,返回獲得了一個Future
對象賦值給result
。即result
就是在sleep()
協程中新生成的一個Future
對象了。
咱們看一下Future
對象的這個方法。
class Future: ... def __iter__(self): # self.done()返回False, if not self.done(): self._asyncio_future_blocking = True # 把Future對象本身返回出去 yield self # This tells Task to wait for completion. assert self.done(), "yield from wasn't used with future" return self.result() # May raise too. if compat.PY35: __await__ = __iter__ # make compatible with 'await' expression
到此爲止,result = core.send(None)
的調用獲得了一個Future
對象,而後執行流程繼續往下走也就是因爲result是Future對象,所以進入到_step
方法的第4.1步,這裏看一下代碼片斷
# 4.1. 若是result是一個future對象時,blocking會被設置成true else: result._asyncio_future_blocking = False # 把_wakeup回調函數設置到此future對象中,當此future對象調用set_result()方法時,就會調用_wakeup方法 result.add_done_callback(self._wakeup)
result.add_done_callback(self._wakeup)
實際上就是把task._wakeup
方法註冊到了新Futrue
對象的回調方法列表_callbacks = [task._wakeup,]
中,到此爲止,task._step
方法才完全執行完畢。第一輪_run_once()
的調用執行結束了。此時 loop._stopping = Fasle
,而後繼續執行下一輪的_run_once()
。
此時:
cor
協程的執行流程掛起在sleep
協程的中產生的新Future
對象的__iter__
方法的yield
處。cor
協程的執行了cor
中標記的第1,2步,第3,4步未執行。
新Future
對象的_callbacks = [task._wakeup,]
。
loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]
,loop._ready=[]
。
_run_once()
的調用執行開始進入到_run_once()
方法中,因爲loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]
注意這裏的第1,2,3,4,5步是在_run_once()
上標記的1,2,3,4,5註釋
第1,2步的邏輯判斷,timeout = 2
第3步 event_list = self._selector.select(2)
,也就是說阻塞2s中,注意,此時由於咱們編寫的那個cor
協程是沒有io事件的,是一個經過sleep
協程模擬耗時操做的,不涉及到真正的io事件,因此這個時候,selector.select(2)
會完整的阻塞2秒鐘。
第4步 依次取出 _scheduled
的延遲迴調函數handle
,放入到 _ready
隊列中。
第5步 依次從_ready
隊列中取出延遲迴調函數handle
,執行handle._run()
。
第5步中的回調函數就是sleep
協程中註冊到loop
對象的futures._set_result_unless_cancelled
函數
def _set_result_unless_cancelled(fut, result): """Helper setting the result only if the future was not cancelled.""" if fut.cancelled(): return # fut就是sleep中新生成的Future對象,調用set_result()方法 fut.set_result(result)
Future
對象的set_result
方法
class Future: ... def set_result(self, result): if self._state != _PENDING: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED # 回調Future對象中添加的全部回調函數 self._schedule_callbacks() def _schedule_callbacks(self): callbacks = self._callbacks[:] if not callbacks: return self._callbacks[:] = [] # 依次取出註冊到Future對象中的全部回調函數,放入到loop._ready隊列中去,等待下一輪 _run_onece的調用時,執行這些回調 for callback in callbacks: self._loop.call_soon(callback, self)
fut.set_result(result)
--> fut._schedule_callbacks()
--> fut._loop.call_soon(callback, self)
, callback
其實是新Future
對象的_callbacks
中的task._wakeup
方法,task._wakeup
又被添加到loop._ready
隊列中去了。到此爲止handle._run()
執行完畢,第二輪的_run_once()
執行完畢。
此時:
cor
協程的執行流程掛起在sleep
協程的中產生的新Future
對象的__iter__
方法的yield
處。
新Future
對象的_callbacks = []
。
loop._ready = [handle(task._wakeup)]
, loop._scheduled=[]
。
_run_once()
的調用執行開始注意這裏的第1,2,3,4,5步是在_run_once()
上標記的1,2,3,4,5註釋
第1,2步的邏輯判斷,timeout = 0
。
第3步 event_list = self._selector.select(0)
,也就是說當即返回。
第4步 因爲loop._scheduled=[]
,所以不執行第4步中的邏輯。
第5步 依次從_ready
隊列中取出回調函數handle
,執行handle._run()
執行handle._run()
就是執行task._wakeup()
,又要回到task._wakeup()
代碼中看看
class Task(futures.Future): def _wakeup(self, future): try: # future爲sleep協程中生成的新的Future對象 future.result() except Exception as exc: # This may also be a cancellation. self._step(exc) else: # 這裏是關鍵代碼,上次的_step()執行到第一次碰到yield的地方掛住了,此時再次執行_step(), # 也就是再次執行 result = coro.send(None) 這句代碼,也就是從上次yield的地方繼續執行yield後面的邏輯 self._step() self = None # Needed to break cycles when an exception occurs.
調用task._wakeup()
實際上又是執行task._step()
也就是再次執行result = core.send(None)
這行代碼,前面提到過,core
協程被掛起在Future
對象的__iter__
方法的yield
處,此時再次執行result = core.send(None)
,就是執行yield
後面的語句
class Future: ... def __iter__(self): # self.done()返回False, if not self.done(): self._asyncio_future_blocking = True # 把Future對象本身返回出去 yield self # This tells Task to wait for completion. assert self.done(), "yield from wasn't used with future" # 調用task._wakeup再次進入到core掛起的地方執行yield後面的語句 return self.result() # May raise too. if compat.PY35: __await__ = __iter__ # make compatible with 'await' expression
self.result()
返回的是None
,因此 cor
協程的await asyncio.sleep(2)
返回的是None
,到此爲止cor
協程的第三步await asyncio.sleep(2)
才真正的執行完畢,也就是說sleep
協程執行完畢了,而後繼續執行cor
協程await下面的語句print('exit cor ...')
最後返回'cor'
,到此爲止cor
協程就徹底執行完畢了。
async def cor(): print('enter cor ...') await asyncio.sleep(2) # 上次在這裏掛起 print('exit cor ...') return 'cor'
前面介紹了,原生協程在執行結束時會拋出StopIteration
異常,而且把返回值存放在異常的的value
屬性中,所以在task._step()
的第2步捕捉到StopIteration
異常
# 2. coro執行完畢會拋出StopIteration異常 except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False self.set_exception(futures.CancelledError()) else: # exc.value是'cor' # 調用task.set_result('cor') self.set_result(exc.value)
task.set_result('cor')
其實就是把task
中的存放的回調函數又放到loop._ready
隊列中去,task
中的回調函數就是前面介紹的_run_until_complete_cb
函數。到此爲止第3輪的_run_once()
執行完畢。
此時:
cor
協程的執行完畢。
新Future
對象的_callbacks = []
。
loop._ready = [handle(_run_until_complete_cb)]
, loop._scheduled=[]
。
_run_once()
開始執行注意這裏的第1,2,3,4,5步是在_run_once()
上標記的1,2,3,4,5註釋
第1,2步的邏輯判斷,timeout = 0
。
第3步 event_list = self._selector.select(0)
,也就是說當即返回。
第4步 因爲loop._scheduled=[]
,所以不執行第4步中的邏輯。
第5步 依次從_ready
隊列中取出回調函數handle
,執行handle._run()
執行handle._run()
就是執行_run_until_complete_cb(task)
def _run_until_complete_cb(fut): exc = fut._exception if (isinstance(exc, BaseException) and not isinstance(exc, Exception)): # Issue #22429: run_forever() already finished, no need to # stop it. return # fut就是task對象,_loop.stop()就是把loop._stopping賦值爲Ture fut._loop.stop()
loop._stopping
爲True
。第4輪_run_once()
執行完畢。
def run_forever(self): ... try: events._set_running_loop(self) while True: # 第4輪_run_once()結束 self._run_once() # _stopping爲true,跳出循環,run_forever()執行結束 if self._stopping: break finally: ...
跳出while
循環, run_forever()
執行結束,run_until_complete()
也就執行完畢了,最後把cor
協程的返回值'cor'返回出來賦值給rst
變量。
到此爲止全部整個task
任務執行完畢,loop
循環關閉。
總結
loop._ready
和loop._scheduled
是loop
對象中很是重要的兩個屬性
loop._ready
是一個雙端隊列deque
,用來存放調用loop.call_soon
方法時中放入的回調函數。
loop._scheduled
是一個最小堆,根據延遲時間的大小構建的最小堆,用來存放調用loop.call_later
時存放的延遲迴調函數。loop._scheduled
中有延遲調用函數是,timeout
被賦值爲堆頂的延遲函數的等待時間,這樣會使得select(timeout)
阻塞等待timeout
秒。到時間後loop._scheduled
中的回調函數最終仍是會被轉移到loop._ready
隊列中去執行。
sleep
協程模擬耗時IO
操做,經過向loop中註冊一個延遲迴調函數,明確的控制select(timeout)
中的timeout
超時時間 + future
對象的延遲timeout
秒調用future.set_result()
函數來實現一個模擬耗時的操做。
這個其實每個真實的耗時的IO
操做都會對應一個future
對象。只不過sleep
中的回調明確的傳入了延遲迴調的時間,而真實的IO
操做時的future.set_result()
的調用則是由真實的IO
事件,也就是select(timeout)
返回的socket
對象的可讀或者可寫事件來觸發的,一旦有相應的事件產生,就會回調對應的可讀可寫事件的回調函數,而在這些回調函數中又會去觸發future.set_result()
方法。