Python 「黑魔法」 之 Generator Coroutines

首發於 個人博客 轉載請註明出處python

寫在前面

  • 本文默認讀者對 Python 生成器 有必定的瞭解,不瞭解者請移步至生成器 - 廖雪峯的官方網站git

  • 本文基於 Python 3.5.1,文中全部的例子均可在 Github 上得到。github

學過 Python 的都知道,Python 裏有一個很厲害的概念叫作 生成器(Generators)。一個生成器就像是一個微小的線程,能夠隨處暫停,也能夠隨時恢復執行,還能夠和代碼塊外部進行數據交換。恰當使用生成器,能夠極大地簡化代碼邏輯。web

也許,你能夠熟練地使用生成器完成一些看似不可能的任務,如「無窮斐波那契數列」,並引覺得豪,認爲所謂的生成器也不過如此——那我可要告訴你:這些都過小兒科了,下面我所要介紹的絕對會讓你大開眼界。數據庫

生成器 能夠實現 協程,你相信嗎?編程

什麼是協程

在異步編程盛行的今天,也許你已經對 協程(coroutines) 早有耳聞,但卻不必定了解它。咱們先來看看 Wikipedia 的定義:多線程

Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.閉包

也就是說:協程是一種 容許在特定位置暫停或恢復的子程序——這一點和 生成器 類似。但和 生成器 不一樣的是,協程 能夠控制子程序暫停以後代碼的走向,而 生成器 僅能被動地將控制權交還給調用者。app

協程 是一種很實用的技術。和 多進程 與 多線程 相比,協程 能夠只利用一個線程更加輕便地實現 多任務,將任務切換的開銷降至最低。和 回調 等其餘異步技術相比,協程 維持了正常的代碼流程,在保證代碼可讀性的同時最大化地利用了 阻塞 IO 的空閒時間。它的高效與簡潔贏得了開發者們的擁戴。框架

Python 中的協程

早先 Python 是沒有原生協程支持的,所以在 協程 這個領域出現了百家爭鳴的現象。主流的實現由如下兩種:

  • 用 C 實現協程調度。這一派以 gevent 爲表明,在底層實現了協程調度,並將大部分的 阻塞 IO 重寫爲異步。

  • 用 生成器模擬。這一派以 Tornado 爲表明。Tornado 是一個老牌的異步 Web 框架,涵蓋了五花八門的異步編程方式,其中包括 協程。本文部分代碼借鑑於 Tornado。

直至 Python 3.4,Python 第一次將異步編程歸入標準庫中(參見 PEP 3156),其中包括了用生成器模擬的 協程。而在 Python 3.5 中,Guido 總算在語法層面上實現了 協程(參見 PEP 0492)。比起 yield 關鍵字,新關鍵字 asyncawait 具備更好的可讀性。在不久的未來,新的實現將會慢慢統一混亂已久的協程領域。

儘管 生成器協程 已成爲了過去時,但它曾經的輝煌卻不可磨滅。下面,讓咱們一塊兒來探索其中的魔法。

一個簡單的例子

假設有兩個子程序 mainprinterprinter 是一個死循環,等待輸入、加工並輸出結果。main 做爲主程序,不時地向 printer 發送數據。

這應該怎麼實現呢?

傳統方式中,這幾乎不可能在一個線程中實現,由於死循環會阻塞。而協程卻能很好地解決這個問題:

def printer():

    counter = 0
    while True:
        string = (yield)
        print('[{0}] {1}'.format(counter, string))
        counter += 1

if __name__ == '__main__':
    p = printer()
    next(p)
    p.send('Hi')
    p.send('My name is hsfzxjy.')
    p.send('Bye!')

輸出:

[0] Hi
[1] My name is hsfzxjy.
[2] Bye!

這其實就是最簡單的協程。程序由兩個分支組成。主程序經過 send 喚起子程序並傳入數據,子程序處理完後,用 yield 將本身掛起,並返回主程序,如此交替進行。

協程調度

有時,你的手頭上會有多個任務,每一個任務耗時很長,而你又不想同步處理,而是但願能像多線程同樣交替執行。這時,你就須要一個調度器來協調流程了。

做爲例子,咱們假設有這麼一個任務:

def task(name, times):

    for i in range(times):
        print(name, i)

若是你直接執行 task,那它會在遍歷 times 次以後纔會返回。爲了實現咱們的目的,咱們須要將 task 人爲地切割成若干塊,以便並行處理:

def task(name, times):

    for i in range(times):
        yield
        print(name, i)

這裏的 yield 沒有邏輯意義,僅是做爲暫停的標誌點。程序流能夠在此暫停,也能夠在此恢復。而經過實現一個調度器,咱們能夠完成多個任務的並行處理:

from collections import deque

class Runner(object):

    def __init__(self, tasks):
        self.tasks = deque(tasks)

    def next(self):
        return self.tasks.pop()

    def run(self):
        while len(self.tasks):
            task = self.next()
            try:
                next(task)
            except StopIteration:
                pass
            else:
                self.tasks.appendleft(task)

這裏咱們用一個隊列(deque)儲存任務列表。其中的 run 是一個重要的方法: 它經過輪轉隊列依次喚起任務,並將已經完成的任務清出隊列,簡潔地模擬了任務調度的過程。

而如今,咱們只需調用:

Runner([
    task('hsfzxjy', 5),
    task('Jack', 4),
    task('Bob', 6)
]).run()

就能夠獲得預想中的效果了:

Bob 0
Jack 0
hsfzxjy 0
Bob 1
Jack 1
hsfzxjy 1
Bob 2
Jack 2
hsfzxjy 2
Bob 3
Jack 3
hsfzxjy 3
Bob 4
hsfzxjy 4
Bob 5

簡直完美!答案和醜陋的多線程別無二樣,代碼卻簡單了不止一個數量級。

異步 IO 模擬

你絕對有過這樣的煩惱:程序經常被時滯嚴重的 IO 操做(數據庫查詢、大文件讀取、越過長城拿數據)阻塞,在等待 IO 返回期間,線程就像死了同樣,空耗着時間。爲此,你不得不用多線程甚至是多進程來解決問題。

而事實上,在等待 IO 的時候,你徹底能夠作一些與數據無關的操做,最大化地利用時間。Node.js 在這點作得不錯——它將一切異步化,壓榨性能。只惋惜它的異步是基於事件回調機制的,稍有不慎,你就有可能陷入 Callback Hell 的深淵。

而協程並不使用回調,相比之下可讀性會好不少。其思路大體以下:

  • 維護一個消息隊列,用於儲存 IO 記錄。

  • 協程函數 IO 時,自身掛起,同時向消息隊列插入一個記錄。

  • 經過輪詢或是 epoll 等事件框架,捕獲 IO 返回的事件。

  • 從消息隊列中取出記錄,恢復協程函數。

如今假設有這麼一個耗時任務:

def task(name):
    print(name, 1)
    sleep(1)
    print(name, 2)
    sleep(2)
    print(name, 3)

正常狀況下,這個任務執行完須要 3 秒,假若多個同步任務同步執行,執行時間會成倍增加。而若是利用協程,咱們就能夠在接近 3 秒的時間內完成多個任務。

首先咱們要實現消息隊列:

events_list = []


class Event(object):

    def __init__(self, *args, **kwargs):
        self.callback = lambda: None
        events_list.append(self)

    def set_callback(self, callback):
        self.callback = callback

    def is_ready(self):
        result = self._is_ready()

        if result:
            self.callback()

        return result

Event 是消息的基類,其在初始化時會將本身放入消息隊列 events_list 中。Event 和 調度器 使用回調進行交互。

接着咱們要 hack 掉 sleep 函數,這是由於原生的 time.sleep() 會阻塞線程。經過自定義 sleep 咱們能夠模擬異步延時操做:

# sleep.py

from event import Event
from time import time


class SleepEvent(Event):

    def __init__(self, timeout):
        super(SleepEvent, self).__init__(timeout)
        self.timeout = timeout
        self.start_time = time()

    def _is_ready(self):
        return time() - self.start_time >= self.timeout


def sleep(timeout):
    return SleepEvent(timeout)

能夠看出:sleep 在調用後就會當即返回,同時一個 SleepEvent 對象會被放入消息隊列,通過timeout 秒後執行回調。

再接下來即是協程調度了:

# runner.py

from event import events_list


def run(tasks):
    for task in tasks:
        _next(task)

    while len(events_list):
        for event in events_list:
            if event.is_ready():
                events_list.remove(event)
                break


def _next(task):

    try:
        event = next(task)
        event.set_callback(lambda: _next(task)) # 1
    except StopIteration:
        pass

run 啓動了全部的子程序,並開始消息循環。每遇到一處掛起,調度器自動設置回調,並在回調中從新恢復代碼流。「1」 處巧妙地利用閉包保存狀態。

最後是主代碼:

from sleep import sleep
import runner


def task(name):
    print(name, 1)
    yield sleep(1)
    print(name, 2)
    yield sleep(2)
    print(name, 3)

if __name__ == '__main__':
    runner.run((task('hsfzxjy'), task('Jack')))

輸出:

hsfzxjy 1
Jack 1
hsfzxjy 2
Jack 2
hsfzxjy 3
Jack 3
# [Finished in 3.0s]

協程函數的層級調用

上面的代碼有一個不足之處,即協程函數返回的是一個 Event 對象。然而事實上只有直接操縱 IO 的協程函數纔有可能接觸到這個對象。那麼,對於調用了 IO 的函數的調用者,它們應該如何實現呢?

設想以下任務:

def long_add(x, y, duration=1):
    yield sleep(duration)
    return x + y


def task(duration):
    print('start:', time())
    print((yield long_add(1, 2, duration)))
    print((yield long_add(3, 4, duration)))

long_add 是 IO 的一級調用者,task 調用 long_add,並利用其返回值進行後續操做。

簡而言之,咱們遇到的問題是:一個被喚起的協程函數如何喚起它的調用者?

正如在上個例子中,協程函數經過 Event 的回調與調度器交互。同理,咱們也可使用一個相似的對象,在這裏咱們稱其爲 Future

Future 保存在被調用者的閉包中,並由被調用者返回。而調用者經過在其上面設置回調函數,實現兩個協程函數之間的交互。

Future 的代碼以下,看起來有點像 Event

# future.py

class Future(object):
    def __init__(self):
        super(Future, self).__init__()
        self.callback = lambda *args: None
        self._done = False

    def set_callback(self, callback):
        self.callback = callback

    def done(self, value=None):
        self._done = True
        self.callback(value)

Future 的回調函數容許接受一個參數做爲返回值,以儘量地模擬通常函數。

但這樣一來,協程函數就會有些複雜了。它們不只要負責喚醒被調用者,還要負責與調用者之間的交互。這會產生許多重複代碼。爲了 D.R.Y,咱們用裝飾器封裝這一邏輯:

# co.py

from functools import wraps
from future import Future


def _next(gen, future, value=None):

    try:
        try:
            yielded_future = gen.send(value)
        except TypeError:
            yielded_future = next(gen)

        yielded_future.set_callback(lambda value: _next(gen, future, value))
    except StopIteration as e:
        future.done(e.value)


def coroutine(func):

    @wraps(func)
    def wrapper(*args, **kwargs):
        future = Future()

        gen = func(*args, **kwargs)
        _next(gen, future)
        return future

    return wrapper

coroutine 包裝過的生成器成爲了一個普通函數,返回一個 Future 對象。_next 爲喚醒的核心邏輯,經過一個相似遞歸的回調設置簡潔地實現自我喚醒。當本身執行完時,會將本身閉包內的Future對象標記爲done,從而喚醒調用者。

爲了適應新變化,sleep 也要作相應的更改:

from event import Event
from future import Future
from time import time


class SleepEvent(Event):

    def __init__(self, timeout):
        super(SleepEvent, self).__init__()
        self.start_time = time()
        self.timeout = timeout

    def _is_ready(self):
        return time() - self.start_time >= self.timeout


def sleep(timeout):
    future = Future()
    event = SleepEvent(timeout)
    event.set_callback(lambda: future.done())
    return future

sleep 再也不返回 Event 對象,而是一致地返回 Future,並做爲 EventFuture 之間的代理者。

基於以上更改,調度器能夠更加簡潔——這是由於協程函數可以自我喚醒:

# runner.py

from event import events_list

def run():
    while len(events_list):
        for event in events_list:
            if event.is_ready():
                events_list.remove(event)
                break

主程序:

from co import coroutine
from sleep import sleep
import runner
from time import time


@coroutine
def long_add(x, y, duration=1):
    yield sleep(duration)
    return x + y


@coroutine
def task(duration):
    print('start:', time())
    print((yield long_add(1, 2, duration)), time())
    print((yield long_add(3, 4, duration)), time())

task(2)
task(1)
runner.run()

因爲咱們使用了一個糟糕的事件輪詢機制,密集的計算會阻塞通往 stdout 的輸出,於是看起來全部的結果都是一塊兒打印出來的。爲此,我在打印時特意加上了時間戳,以演示協程的效果。輸出以下:

start: 1459609512.263156
start: 1459609512.263212
3 1459609513.2632613
3 1459609514.2632234
7 1459609514.263319
7 1459609516.2633028

這事實上是 tornado.gen.coroutine 的簡化版本,爲了敘述方便我略去了許多細節,如異常處理以及調度優化,目的是讓你們能較清晰地瞭解 生成器協程 背後的機制。所以,這段代碼並不能用於實際生產中

小結

  • 這,才叫精通生成器。

  • 學習編程,不只要知其然,亦要知其因此然。

  • Python 是有魔法的,只有想不到,沒有作不到。

References

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息