理解 Python 中的異步編程

讓咱們來寫一些 Python 代碼

你能夠在這個 GitHub 倉庫 下載全部的示例代碼。python

這篇文章中的全部例子都已經在 Python 3.6.1 環境下測試過,並且在代碼示例中的這個 requirements.txt 文件包含了運行全部這些測試所須要的模塊。react

我強烈建議建立一個 Python 虛擬環境來運行這些代碼,這樣就不會和系統級別的 Python 產生耦合。git

示例 1:同步編程

第一個例子展現的是一種有些刻意設計的方式,即有一個任務先從隊列中拉取"工做"以後再執行這個工做。在這種狀況下,這個工做的內容只是獲取一個數字,而後任務會把這個數字疊加起來。在每一個計數步驟中,它還打印了字符串代表該任務正在運行,而且在循環的最後還打印出了總的計數。咱們設計的部分即這個程序爲多任務處理在隊列中的工做提供了很天然的基礎。程序員

"""
example_1.py

Just a short example showing synchronous running of 'tasks'
"""

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f'Task {name} nothing to do')
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            for x in range(count):
                print(f'Task {name} running')
                total += 1
            print(f'Task {name} total: {total}')


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # create some tasks
    tasks = [
        (task, 'One', work_queue),
        (task, 'Two', work_queue)
    ]

    # run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == '__main__':
    main()

該程序中的"任務"就是一個函數,該函數能夠接收一個字符串和一個隊列做爲參數。在執行時,它會去看隊列裏是否有任何須要處理的工做,若是有,它就會把值從隊列中取出來,開啓一個 for 循環來疊加這個計數值而且在最後打印出總數。它會一直這樣運行直到隊列裏什麼都沒剩了纔會結束離開。github

當咱們在執行這個任務時,咱們會獲得一個列表代表任務一(即代碼中的 task One)作了全部的工做。它內部的循環消費了隊列裏的所有工做,而且執行這些工做。當退出任務一的循環後,任務二(即代碼中的 task Two)有機會運行,可是它會發現隊列是空的,由於這個影響,該任務會打印一段語句以後退出。代碼中並無任何地方可讓任務一和任務二協做的很好而且能夠在它們之間切換。web

示例 2: 簡單的協做併發

程序(example_2.py)的下個版本經過使用生成器增長了兩個任務能夠跟好相互協做的能力。在任務函數中添加 yield 語句意味着循環會在執行到這個語句時退出,可是仍然保留當時的上下文,這樣以後就能夠恢復先前的循環。在程序後面 "run the tasks" 的循壞中當 t.next() 被調用時就能夠利用這個。這條語句會在以前生成(即調用 yield 的語句處)的地方從新開始以前的任務。編程

這是一種協做併發的方式。這個程序會讓出對它當前上下文的控制,這樣其它的任務就能夠運行。在這種狀況下,它容許咱們主要的 "run the tasks" 調度器能夠運行任務函數的兩個實例,每個實例都從相同的隊列中消費工做。這種作法雖然聰明一些,可是爲了和第一個示例達成一樣結果的同時作了更多的工做。併發

"""
example_2.py

Just a short example demonstrating a simple state machine in Python
"""

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        for x in range(count):
            print(f'Task {name} running')
            total += 1
            yield
        print(f'Task {name} total: {total}')

def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # create some tasks
    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]

    # run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True


if __name__ == '__main__':
    main()

當程序運行時,輸出代表任務一和任務二都在運行,它們都從隊列裏消耗工做而且處理它。這就是咱們想要的,兩個任務都在處理工做,並且都是以處理從隊列中的兩個項目結束。可是再一次,須要作一點工做來實現這個結果。異步

這裏的技巧在於使用 yield 語句,它將任務函數轉變爲生成器,來實現一個 "上下文切換"。這個程序使用這個上下文切換來運行任務的兩個實例。async

示例 3:經過阻塞調用來協做併發

程序(example_3.py)的下個版本和上一個版本幾乎徹底同樣,除了在咱們任務循環體內添加了一個 time.sleep(1) 調用。這使任務循環中的每次迭代都添加了一秒的延遲。這個添加的延遲是爲了模擬在咱們任務中出現緩慢 IO 操做的影響。

我還導入了一個簡單的 Elapsed Time 類來處理報告中使用的開始時間/已用時間功能。

"""
example_3.py

Just a short example demonstraing a simple state machine in Python
However, this one has delays that affect it
"""

import time
import queue
from lib.elapsed_time import ET


def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
            yield
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)


    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # run the scheduler to run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print('Total elapsed time: {}'.format(et()))


if __name__ == '__main__':
    main()

當該程序運行時,輸出代表任務一和任務二都在運行,消費從隊列裏來的工做並像以前那樣處理它們。隨着增長的模擬 IO 操做延遲,咱們發現咱們協做式的併發並無爲咱們作任何事,延遲會中止整個程序的運行,而 CPU 就只會等待這個 IO 延遲的結束。

這就是異步文檔中 」阻塞代碼「的確切含義。注意運行整個程序所須要的時間,你會發現這就是全部 IO 延遲的累積時間。這再次意味着經過這種方式運行程序並非勝利了。

示例 4:使用非阻塞調用來協做併發

程序(example_4.py)的下一個版本已經修改了很多代碼。它在程序一開始就使用了 gevent 異步編程模塊。該 模塊以及另外一個叫作 monkey 的模塊被導入了。

以後 monkey 模塊一個叫作 patch_all() 的方法被調用。這個方法是用來幹嗎的呢?簡單來講它配置了這個應用程序,使其它全部包含阻塞(同步)代碼的模塊都會被打上"補丁",這樣這些同步代碼就會變成異步的。

就像大多數簡單的解釋同樣,這個解釋對你並無很大的幫助。在咱們示例代碼中與之相關的就是 time.sleep(1)(咱們模擬的 IO 延遲)不會再"阻塞"整個程序。取而代之的是它讓出程序的控制返回給系統。請注意,"example_3.py" 中的 "yield" 語句再也不存在,它如今已是 time.sleep(1) 函數調用內的一部分。

因此,若是 time.sleep(1) 已經被 gevent 打補丁來讓出控制,那麼這個控制又到哪裏去了?使用 gevent 的一個做用是它會在程序中運行一個事件循環的線程。對於咱們的目的來講,這個事件循環就像在 example_3.py 中 "run the tasks" 的循環。當 time.sleep(1) 的延遲結束時,它就會把控制返回給 time.sleep(1) 語句的下一條可執行語句。這樣作的優勢是 CPU 不會由於延遲被阻塞,而是能夠有空閒去執行其它代碼。

咱們 "run the tasks" 的循環已經再也不存在了,取而代之的是咱們的任務隊列包含了兩個對 gevent.spawn(...) 的調用。這兩個調用會啓動兩個 gevent 線程(叫作 greenlet),它們是相互協做進行上下文切換的輕量級微線程,而不是像普通線程同樣由系統切換上下文。

注意在咱們任務生成以後的 gevent.joinall(tasks) 調用。這條語句會讓咱們的程序會一直等待任務一和任務二都完成。若是沒有這個的話,咱們的程序將會繼續執行後面打印的語句,可是實際上沒有作任何事。

"""
example_4.py

Just a short example demonstrating a simple state machine in Python
However, this one has delays that affect it
"""

import gevent
from gevent import monkey
monkey.patch_all()

import time
import queue
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        count = work_queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    This is the main entry point for the programWhen
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # run the tasks
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

當這個程序運行的時候,請注意任務一和任務二都在一樣的時間開始,而後等待模擬的 IO 調用結束。這代表 time.sleep(1) 調用已經再也不阻塞,其它的工做也正在被作。

在程序結束時,看下總的運行時間你就會發現它其實是 example_3.py 運行時間的一半。如今咱們開始看到異步程序的優點了。

在併發運行兩個或者多個事件能夠經過非阻塞的方式來執行 IO 操做。經過使用 gevent greenlets 和控制上下文切換,咱們就能夠在多個任務之間實現多路複用,這個實現並不會遇到太多麻煩。

示例 5:異步(阻塞)HTTP 下載

程序(example_5.py)的下一個版本有一點進步也有一點退步。這個程序如今處理的是有真正 IO 操做的工做,即向一個 URL 列表發起 HTTP 請求來獲取頁面內容,可是它仍然是以阻塞(同步)的方式運行的。

咱們修改了這個程序導入了很是棒的 requests 模塊 來建立真實的 HTTP 請求,並且咱們把一份 URL 列表加入到隊列中,而不是像以前同樣只是數字。在這個任務中,咱們也沒有再用計數器,而是使用 requests 模塊來獲取從隊列裏獲得 URL 頁面的內容,而且咱們打印了執行這個操做的時間。

"""
example_5.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue
"""

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')
        yield


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # run the scheduler to run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

和這個程序以前版本同樣,咱們使用一個 yield 關鍵字來把咱們的任務函數轉換成生成器,而且爲了讓其餘任務實例能夠執行,咱們執行了一次上下文切換。

每一個任務都會從工做隊列中獲取到一個 URL,獲取這個 URL 指向頁面的內容而且報告獲取這些內容花了多長時間。

和以前同樣,這個 yield 關鍵字讓咱們兩個任務都能運行,可是由於這個程序是以同步的方式運行的,每一個 requests.get() 調用在獲取到頁面以前都會阻塞 CPU。注意在最後運行整個程序的總時間,這對於下一個示例會頗有意義。

示例 6:使用 gevent 實現異步(非阻塞)HTTP 下載

這個程序(example_6.py)的版本修改了先前的版本再次使用了 gevent 模塊。記得 gevent 模塊的 monkey.patch_all() 調用會修改以後的全部模塊,這樣這些模塊的同步代碼就會變成異步的,其中也包括 requests 模塊。

如今的任務已經改爲移除了對 yield 的調用,由於 requests.get(url) 調用已經不會再阻塞了,反而是執行一次上下文切換讓出控制給 gevent 的事件循環。在 「run the task」 部分咱們使用 gevent 來產生兩個任務生成器,以後使用 joinall() 來等待它們完成。

"""
example_6.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue. It's also using gevent to get the
URL's in an asynchronous manner.
"""

import gevent
from gevent import monkey
monkey.patch_all()

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')

def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')

if __name__ == '__main__':
    main()

在程序運行的最後,你能夠看下總共的時間和獲取每一個 URL 分別的時間。你將會看到總時間會少於 requests.get() 函數調用的累計時間。

這是由於這些函數調用是異步運行的,因此咱們能夠同一時間發送多個請求,從而更好地發揮出 CPU的優點。

示例 7:使用 Twisted 實現異步(非阻塞)HTTP 下載

程序(example_7.py)的版本使用了 Twisted 模塊 ,該模塊本所作的質上和 gevent 模塊同樣,即以非阻塞的方式下載 URL 對應的內容。

Twisted是一個很是強大的系統,採用了和 gevent 根本上不同的方式來建立異步程序。gevent 模塊是修改其模塊使它們的同步代碼變成異步,Twisted 提供了它本身的函數和方法來達到一樣的結果。

以前在 example_6.py 中使用被打補丁的 requests.get(url) 調用來獲取 URL 內容的位置,如今咱們使用 Twisted 函數 getPage(url)

在這個版本中,@defer.inlineCallbacks 函數裝飾器和語句 yield getPage(url) 一塊兒實現把上下文切換到 Twisted 的事件循環。

在 gevent 中這個事件循環是隱含的,可是在 Twisted 中,事件循環由位於程序底部的 reactor.run() 明確提供。

"""
example_7.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a work_queue. This version uses the Twisted
framework to provide the concurrency
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


@defer.inlineCallbacks
def my_task(name, work_queue):
    try:
        while not work_queue.empty():
            url = work_queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            yield getPage(url)
            print(f'Task {name} got URL: {url}')
            print(f'Task {name} total elapsed time: {et():.1f}')
    except Exception as e:
        print(str(e))


def main():
    """
    This is the main entry point for the program
    """
    # create the work_queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the work_queue
    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()
    defer.DeferredList([
        task.deferLater(reactor, 0, my_task, 'One', work_queue),
        task.deferLater(reactor, 0, my_task, 'Two', work_queue)
    ]).addCallback(lambda _: reactor.stop())

    # run the event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

注意最後的結果和 gevent 版本同樣,整個程序運行的時間會小於獲取每一個 URL 內容的累計時間。

示例8:使用 Twisted 回調函數實現異步(非阻塞)HTTP 下載

程序 (example_8.py)的這個版本也是使用 Twisted 庫,可是是以更傳統的方式使用 Twisted。

這裏個人意思是再也不使用 @defer.inlineCallbacks / yield 這種代碼風格,這個版本會使用明確的回調函數。一個"回調函數"是一個被傳遞給系統的函數,該函數能夠在以後的事件響應中被調用。在下面的例子中,success_callback() 被提供給 Twisted,用來在 getPage(url) 調用完成後被調用。

注意在這個程序中 @defer.inlineCallbacks 裝飾器並無在 my_task() 函數中使用。除此以外,這個函數產出一個叫作 d 的變量,該變量是延後調用的縮寫,是調用函數 getPage(url) 獲得的返回值。

延後是 Twisted 處理異步編程的方式,回調函數就附加在其之上。當這個延後"觸發"(即當 getPage(url) 完成時),會以回調函數被附加時定義的變量做爲參數,來調用這個回調函數。

"""
example_8.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue. This version uses the Twisted
framework to provide the concurrency
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


def success_callback(results, name, url, et):
    print(f'Task {name} got URL: {url}')
    print(f'Task {name} total elapsed time: {et():.1f}')


def my_task(name, queue):
    if not queue.empty():
        while not queue.empty():
            url = queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            d = getPage(url)
            d.addCallback(success_callback, name, url, et)
            yield d


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()

    # create cooperator
    coop = task.Cooperator()

    defer.DeferredList([
        coop.coiterate(my_task('One', work_queue)),
        coop.coiterate(my_task('Two', work_queue)),
    ]).addCallback(lambda _: reactor.stop())

    # run the event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

運行這個程序的最終結果和先前的兩個示例同樣,運行程序的總時間小於獲取 URLs 內容的總時間。

不管你使用 gevent 仍是 Twisted,這只是我的的喜愛和代碼風格問題。這兩個都是強大的庫,提供了讓程序員能夠編寫異步代碼的機制

相關文章
相關標籤/搜索