本文的代碼來源於:snarky.ca/how-the-hec…python
上一篇文章介紹了 Python async、await 關鍵字的發展歷史,說過,async和 await 是 API 而不是 implementation。基於 async、await實現的事件循環有不少,包括 asyncio、curio等。其中 asyncio 底層基於future
對象,curio 底層基於tuple。api
這篇文章咱們來用最小堆
實現一個簡單的事件循環。bash
Heaps are arrays for which a[k] <= a[2k+1] and a[k] <= a[2k+2] for all k, counting elements from 0. For the sake of comparison, non-existing elements are considered to be infinite. The interesting property of a heap is that a[0] is always its smallest element. (來源於 Python 內置模塊 heapq 源代碼)微信
簡單來講,heaps就是一種有特殊性質的 Python 列表:a[k] <= a[2*k+1]
且 a[k] <= a[2*k+2]
,第一個元素永遠是最小的。app
沒錯你確定已經看出來了,這就是一顆二叉樹:async
heapq
模塊主要有下面這幾個 API:ide
Usage:
heap = [] # creates an empty heap
heappush(heap, item) # pushes a new item on the heap
item = heappop(heap) # pops the smallest item from the heap
item = heap[0] # smallest item on the heap without popping it
heapify(x) # transforms list into a heap, in-place, in linear time
item = heapreplace(heap, item) # pops and returns smallest item,and adds new item; the heap size is unchanged
複製代碼
再回顧一下,這個可能有點難理解。函數
next_value = generator.send(value)
oop
會發生三件事:post
next_value
返回。看下這個例子:
>>> def double_inputs():
... while True:
... x = yield
... yield x * 2
...
>>> gen = double_inputs()
>>> next(gen) # run up to the first yield
>>> gen.send(10) # goes into 'x' variable
20
>>> next(gen) # run up to the next yield
>>> gen.send(6) # goes into 'x' again
12
>>> next(gen) # run up to the next yield
>>> gen.send(94.3) # goes into 'x' again
188.5999999999999
複製代碼
執行gen.send(10)
發生的事情以下:
10
賦予了x = yield
的 x
x * 2
的值是20
,此時再次遇到 yield
,函數再次暫停,而且把x * 2
的值做爲返回值,因此發現這個語句輸出了20.next(g)
等價於g.send(None)
,這個常常用來讓生成器運行到 yield 的地方而後停下來。
咱們要實現的事件循環很簡單,核心功能以下:
你能夠把這個想作是
asyncio.Task/curio.Task
。
class Task:
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
複製代碼
這裏定義了兩個特殊方法:__eq__
和__lt__
,用來對Task
進行<
和==
比較。由於咱們這裏用的是heapq
最小堆,『最小』的排在最前面。Task 實例比較大小的依據是他們的waiting_until
(下一次恢復運行的時間點)。
因此,在某一個時刻,最小堆的狀態多是這樣的:
Task A
將在0秒後恢復運行,他的恢復運行時間(wait_until)『最小』,因此就會首先被彈出執行,而後 Task B
會取代他的位置成爲『最小』的元素。
@types.coroutine
def sleep(seconds):
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
actual = yield wait_until
return actual - now
async def countdown(label, length, *, delay=0):
print(label, 'waiting', delay, 'seconds before starting countdown')
delta = await sleep(delay)
print(label, 'starting after waiting', delta)
while length:
print(label, 'T-minus', length)
waited = await sleep(1)
length -= 1
print(label, 'lift-off!')
複製代碼
在delay
秒以後運行一個耗時length
秒的任務。簡要分析一下代碼:
有一點須要明確,countdown()
返回的是一個coroutine
對象,你若是不 await
它(或者調用 next()
, send()
),什麼也不會真正執行。
delta = await sleep(delay)
這一句,會加入coroutine sleep()
裏面,在第一個 yield 的地方暫停。要想讓它恢復運行,須要經過某種方式"send stuff back"(參考上一篇文章),也就是對這個生成器調用send()
方法。 後面會看到,實際上這屬於事件循環的工做。
另外,對於每一個任務,第一次恢復執行的時間應該是delay
秒,因此事件循環應該在程序開始delay
秒的時候調用send()
。
後面的while
循環會再次進行運行、暫停的循環,直到時間超過了length
秒,也就是任務結束。
class SleepingLoop:
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
while self._waiting:
now = datetime.datetime.now()
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# It's time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
def main():
"""Start the event loop, counting down 3 separate launches. This is what a user would typically write. """
loop = SleepingLoop(
countdown('A', 5, delay=0),
countdown('B', 3, delay=2),
countdown('C', 4, delay=1)
)
start = datetime.datetime.now()
loop.run_until_complete()
print('Total elapsed time is', datetime.datetime.now() - start)
if __name__ == '__main__':
main()
複製代碼
代碼一共就只有這麼點,是否是很簡單?來分析一下:
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
複製代碼
wait_for = coro.send(None)
是第一次對這些coroutine對象調用 send()
,如前面所說,這一步會在sleep
的actual = yield wait_until
這個地方停下來。wait_until
的值會傳給wait_for
,這是第一次開始任務開始運行的時間。而後把這些Task 對象添加到最小堆裏面。
接下來是一個 while
循環,每一個循環從最小堆中取出『最小』的元素,也就是下一次恢復運行時間最近的哪個任務。若是發現如今還沒到它的恢復執行時間,就調用阻塞的 time.sleep()
。(這裏能夠阻塞,由於這個事件循環很是簡單,咱們能夠肯定這段時間沒有新的任務須要恢復執行。)
接着對coro
調用send()
方法,若是還沒遇到StopIteration
,就把新的 Task 推到最小堆(前面從最小堆裏面取出任務,若是這個任務沒迭代完,就更新它的下次恢復執行時間,再次推到最小堆裏面)。
那麼何時會發生StopIteration
異常呢?當countdown()
這個 coroutine 得 while 循環結束的時候,也就是沒有更多的 yield 的時候。
import datetime
import heapq
import types
import time
class Task:
"""Represent how long a coroutine should wait before starting again. Comparison operators are implemented for use by heapq. Two-item tuples unfortunately don't work because when the datetime.datetime instances are equal, comparison falls to the coroutine and they don't implement comparison methods, triggering an exception. Think of this as being like asyncio.Task/curio.Task. """
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
class SleepingLoop:
"""An event loop focused on delaying execution of coroutines. Think of this as being like asyncio.BaseEventLoop/curio.Kernel. """
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
# Start all the coroutines.
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
# Keep running until there is no more work to do.
while self._waiting:
now = datetime.datetime.now()
# Get the coroutine with the soonest resumption time.
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
# We're ahead of schedule; wait until it's time to resume.
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# It's time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
@types.coroutine
def sleep(seconds):
"""Pause a coroutine for the specified number of seconds. Think of this as being like asyncio.sleep()/curio.sleep(). """
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
# Make all coroutines on the call stack pause; the need to use `yield`
# necessitates this be generator-based and not an async-based coroutine.
actual = yield wait_until
# Resume the execution stack, sending back how long we actually waited.
return actual - now
async def countdown(label, length, *, delay=0):
"""Countdown a launch for `length` seconds, waiting `delay` seconds. This is what a user would typically write. """
print(label, 'waiting', delay, 'seconds before starting countdown')
delta = await sleep(delay)
print(label, 'starting after waiting', delta)
while length:
print(label, 'T-minus', length)
waited = await sleep(1)
length -= 1
print(label, 'lift-off!')
def main():
"""Start the event loop, counting down 3 separate launches. This is what a user would typically write. """
loop = SleepingLoop(
countdown('A', 5, delay=0),
# countdown('B', 3, delay=2),
# countdown('C', 4, delay=1)
)
start = datetime.datetime.now()
loop.run_until_complete()
print('Total elapsed time is', datetime.datetime.now() - start)
if __name__ == '__main__':
main()
複製代碼
把這個例子裏面的元素和asyncio
作一下對應:
Task
類至關於asyncio.Task
。本文的Task
依據waiting_until
來判斷恢復執行時間;asyncio.Task
是一個future
對象,當asyncio
的事件循環檢測到這個future
對象的狀態發生變化的時候,執行相應的邏輯。sleep()
函數相等於asyncio.sleep()
。不會阻塞。SleepingLoop
至關於asyncio.BaseEventLoop
。SleepingLoop
用的是最小堆,asyncio.BaseEventLoop
更加複雜,基於future
對象,以及 selectors
模塊等。若是你像我同樣真正熱愛計算機科學,喜歡研究底層邏輯,歡迎關注個人微信公衆號: