異步編程 101:寫一個事件循環

本文的代碼來源於:snarky.ca/how-the-hec…python

回顧一下

上一篇文章介紹了 Python async、await 關鍵字的發展歷史,說過,async和 await 是 API 而不是 implementation。基於 async、await實現的事件循環有不少,包括 asyncio、curio等。其中 asyncio 底層基於future對象,curio 底層基於tuple。api

這篇文章咱們來用最小堆實現一個簡單的事件循環。bash

heapq 模塊

Heaps are arrays for which a[k] <= a[2k+1] and a[k] <= a[2k+2] for all k, counting elements from 0. For the sake of comparison, non-existing elements are considered to be infinite. The interesting property of a heap is that a[0] is always its smallest element. (來源於 Python 內置模塊 heapq 源代碼)微信

簡單來講,heaps就是一種有特殊性質的 Python 列表:a[k] <= a[2*k+1]a[k] <= a[2*k+2],第一個元素永遠是最小的。app

沒錯你確定已經看出來了,這就是一顆二叉樹:async

heapq模塊主要有下面這幾個 API:ide

Usage:

heap = []            # creates an empty heap
heappush(heap, item) # pushes a new item on the heap
item = heappop(heap) # pops the smallest item from the heap
item = heap[0]       # smallest item on the heap without popping it
heapify(x)           # transforms list into a heap, in-place, in linear time
item = heapreplace(heap, item) # pops and returns smallest item,and adds new item; the heap size is unchanged
複製代碼
  • 初始化堆:heap = []
  • 往堆中添加元素:heappush(heap,item)
  • 從堆中 pop 出最小的元素:item = heappop(heap)
  • 從堆中獲取最小元素可是不移除:item = heap[0]
  • 將隊列轉換成堆:heapify(x)
  • pop 最小元素並添加一個新的元素進去:item = heapreplace(heap, item)

生成器 send() 方法

再回顧一下,這個可能有點難理解。函數

next_value = generator.send(value)oop

會發生三件事:post

  • 恢復生成器繼續執行
  • value 成爲了生成器當前 yield 表達式的值
  • 生成器下一次 yield表達式的值,做爲next_value返回。

看下這個例子:

>>> def double_inputs():
...     while True:
...         x = yield
...         yield x * 2
...
>>> gen = double_inputs()
>>> next(gen)       # run up to the first yield
>>> gen.send(10)    # goes into 'x' variable
20
>>> next(gen)       # run up to the next yield
>>> gen.send(6)     # goes into 'x' again
12
>>> next(gen)       # run up to the next yield
>>> gen.send(94.3)  # goes into 'x' again
188.5999999999999
複製代碼

執行gen.send(10)發生的事情以下:

  • 讓生成器恢復運行
  • 10賦予了x = yieldx
  • x * 2的值是20,此時再次遇到 yield,函數再次暫停,而且把x * 2的值做爲返回值,因此發現這個語句輸出了20.

next(g)等價於g.send(None),這個常常用來讓生成器運行到 yield 的地方而後停下來。

事件循環功能設計

咱們要實現的事件循環很簡單,核心功能以下:

  • 處理不少延時任務
  • 運行時間點最先的任務最早運行
  • 假如前面的任務須要很長時間才能完成,不會阻塞後面的任務(也就是他們能夠並行執行)

代碼

Task 類

你能夠把這個想作是asyncio.Task/curio.Task

class Task:
    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until
複製代碼

這裏定義了兩個特殊方法:__eq____lt__,用來對Task進行<==比較。由於咱們這裏用的是heapq最小堆,『最小』的排在最前面。Task 實例比較大小的依據是他們的waiting_until下一次恢復運行的時間點)。

因此,在某一個時刻,最小堆的狀態多是這樣的:

Task A將在0秒後恢復運行,他的恢復運行時間(wait_until)『最小』,因此就會首先被彈出執行,而後 Task B會取代他的位置成爲『最小』的元素。

實際執行的任務

@types.coroutine
def sleep(seconds):
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    actual = yield wait_until
    return actual - now

async def countdown(label, length, *, delay=0):
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = await sleep(delay)
    print(label, 'starting after waiting', delta)
    while length:
        print(label, 'T-minus', length)
        waited = await sleep(1)
        length -= 1
    print(label, 'lift-off!')
複製代碼

delay秒以後運行一個耗時length秒的任務。簡要分析一下代碼:

有一點須要明確,countdown()返回的是一個coroutine對象,你若是不 await它(或者調用 next(), send()),什麼也不會真正執行。

delta = await sleep(delay)這一句,會加入coroutine sleep()裏面,在第一個 yield 的地方暫停。要想讓它恢復運行,須要經過某種方式"send stuff back"(參考上一篇文章),也就是對這個生成器調用send()方法。 後面會看到,實際上這屬於事件循環的工做。

另外,對於每一個任務,第一次恢復執行的時間應該是delay秒,因此事件循環應該在程序開始delay秒的時候調用send()

後面的while循環會再次進行運行、暫停的循環,直到時間超過了length秒,也就是任務結束。

事件循環代碼

class SleepingLoop:
    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))
        while self._waiting:
            now = datetime.datetime.now()
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                # It's time to resume the coroutine.
                wait_until = task.coro.send(now)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                # The coroutine is done.
                pass

def main():
    """Start the event loop, counting down 3 separate launches. This is what a user would typically write. """
    loop = SleepingLoop(
        countdown('A', 5, delay=0),
        countdown('B', 3, delay=2),
        countdown('C', 4, delay=1)
    )
    start = datetime.datetime.now()
    loop.run_until_complete()
    print('Total elapsed time is', datetime.datetime.now() - start)

if __name__ == '__main__':
    main()
複製代碼

代碼一共就只有這麼點,是否是很簡單?來分析一下:

for coro in self._new:
    wait_for = coro.send(None)
    heapq.heappush(self._waiting, Task(wait_for, coro))
複製代碼

wait_for = coro.send(None) 是第一次對這些coroutine對象調用 send(),如前面所說,這一步會在sleepactual = yield wait_until這個地方停下來。wait_until的值會傳給wait_for,這是第一次開始任務開始運行的時間。而後把這些Task 對象添加到最小堆裏面。

接下來是一個 while循環,每一個循環從最小堆中取出『最小』的元素,也就是下一次恢復運行時間最近的哪個任務。若是發現如今還沒到它的恢復執行時間,就調用阻塞time.sleep()。(這裏能夠阻塞,由於這個事件循環很是簡單,咱們能夠肯定這段時間沒有新的任務須要恢復執行。)

接着對coro調用send()方法,若是還沒遇到StopIteration,就把新的 Task 推到最小堆(前面從最小堆裏面取出任務,若是這個任務沒迭代完,就更新它的下次恢復執行時間,再次推到最小堆裏面)。

那麼何時會發生StopIteration異常呢?當countdown()這個 coroutine 得 while 循環結束的時候,也就是沒有更多的 yield 的時候。

最終的代碼

import datetime
import heapq
import types
import time


class Task:
    """Represent how long a coroutine should wait before starting again. Comparison operators are implemented for use by heapq. Two-item tuples unfortunately don't work because when the datetime.datetime instances are equal, comparison falls to the coroutine and they don't implement comparison methods, triggering an exception. Think of this as being like asyncio.Task/curio.Task. """

    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until


class SleepingLoop:
    """An event loop focused on delaying execution of coroutines. Think of this as being like asyncio.BaseEventLoop/curio.Kernel. """

    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        # Start all the coroutines.
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))
        # Keep running until there is no more work to do.
        while self._waiting:
            now = datetime.datetime.now()
            # Get the coroutine with the soonest resumption time.
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                # We're ahead of schedule; wait until it's time to resume.
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                # It's time to resume the coroutine.
                wait_until = task.coro.send(now)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                # The coroutine is done.
                pass


@types.coroutine
def sleep(seconds):
    """Pause a coroutine for the specified number of seconds. Think of this as being like asyncio.sleep()/curio.sleep(). """
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    # Make all coroutines on the call stack pause; the need to use `yield`
    # necessitates this be generator-based and not an async-based coroutine.
    actual = yield wait_until
    # Resume the execution stack, sending back how long we actually waited.
    return actual - now


async def countdown(label, length, *, delay=0):
    """Countdown a launch for `length` seconds, waiting `delay` seconds. This is what a user would typically write. """
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = await sleep(delay)
    print(label, 'starting after waiting', delta)
    while length:
        print(label, 'T-minus', length)
        waited = await sleep(1)
        length -= 1
    print(label, 'lift-off!')


def main():
    """Start the event loop, counting down 3 separate launches. This is what a user would typically write. """
    loop = SleepingLoop(
        countdown('A', 5, delay=0),
        # countdown('B', 3, delay=2),
        # countdown('C', 4, delay=1)
    )
    start = datetime.datetime.now()
    loop.run_until_complete()
    print('Total elapsed time is', datetime.datetime.now() - start)



if __name__ == '__main__':
    main()
複製代碼

總結一下

把這個例子裏面的元素和asyncio作一下對應:

  • Task類至關於asyncio.Task。本文的Task依據waiting_until來判斷恢復執行時間;asyncio.Task是一個future對象,當asyncio的事件循環檢測到這個future對象的狀態發生變化的時候,執行相應的邏輯。
  • sleep()函數相等於asyncio.sleep()。不會阻塞。
  • SleepingLoop至關於asyncio.BaseEventLoopSleepingLoop用的是最小堆,asyncio.BaseEventLoop更加複雜,基於future對象,以及 selectors模塊等。

若是你像我同樣真正熱愛計算機科學,喜歡研究底層邏輯,歡迎關注個人微信公衆號:

相關文章
相關標籤/搜索