Python3 asyncio 簡介

概述

一、Terms

併發指的是同時啓動任務,並行指的是同時運行人物。依賴時間切片和多核,併發也能夠是並行。下文中統稱爲併發,都指的是並行的併發。html

現實中須要解決的問題有兩類:python

  • CPU bound
  • IO bound

CPU bound 指的是須要密集 CPU 運行的任務,IO bound 指的是有大量等待 IO 的任務。CPU bound 只能經過多核並行來解決,而 IO bound 則是本文的重點,也是 asyncio 大顯身手的地方。git

二、併發

單核 CPU 的性能有其極限,因此咱們須要併發來提升性能。可是併發又會致使資源的競爭,因此須要引用鎖來保護敏感區。可是鎖又會下降併發度,因此咱們須要探索無鎖的併發方案。github

可使用線程模型來提升併發度,可是每個線程都須要獨立的棧空間(64-bit JVM 中是 1024 KB),這還只是不包含任何資源的初始棧空間,並且棧空間的大小和線程切換的開銷成正比。因此咱們須要尋找比線程更輕量的解決方案。golang

爲了減小線程的切換,咱們能夠建立一個等於 CPU 核數的線程池,把須要運算的邏輯放進線程池,不須要時就拿出來換成其餘的任務,保持線程的持續運算而不是切換。web

爲了更好的使用 CPU 的性能,咱們應該在任務不在須要 CPU 資源時讓其從線程池裏退出來(好比等待 IO 時),這就須要有一種機制,讓任務能夠在阻塞時退出,在資源就緒時恢復運行。因此咱們將任務抽象爲一種用戶態的線程(協程,greenthread、coroutine),當其須要調用阻塞資源時,就在 IO 調度器裏註冊一個事件,並讓出線程資源給其餘協程,當資源就緒時,IO 調度器會在有空餘線程資源時,從新運行這個協程。django

用戶態線程(下文稱之爲協程)的設計方案通常有三種(按照用戶態線程和系統線程的比例):編程

  • 1:1:直接使用系統線程,能夠利用多核,可是上下文開銷大;
  • N:1:多協程對應一個線程,節省了上下文開銷,缺點是不能利用多核,asyncio 就是這個方案;
  • M:N:多協程對應多線程,golang 的方案。

協程的優勢在於,這是一種用戶態的機制,避免的內核態用戶態切換的成本,並且初始棧空間能夠設置的很小(Golang 中的 goroutine 僅爲 2 KB),這樣能夠建立比線程更大數量的協程。flask


3、Python 異步的歷史

簡單聊幾句。後端

我最先據說的異步庫就是 twisted,不過聽說使用極其複雜,因此望而卻步了。

後來在 GoogleAppEngine 上用 web.py 開發後端,接着不久就趕上了 Aaron 不幸被逼自殺, 在選擇新的後端框架時據說了 tornado, 被其簡單的用法所折服,一直用到如今,這個博客也是用 tornado 開發的,我甚至還本身擼了一整套 RESTful 的框架。

不過其實用了 tornado 一兩年後才真正的看懂了它 ioloop 的設計,不得不說 tornado 的註釋寫的真的很好,強烈推薦學習 tornado 的源碼。

tornado 中最難解決的問題就是如何去調度嵌套的異步任務,由於 tornado 是經過 yield 和 decorator 相結合的方式來實現異步任務, 因此致使異步函數很難返回值,在 tornado 裏你只能經過 raise 的方式來返回值,這又致使 coroutine 很難正確的捕獲到異常,爲了解決這個問題我本身寫了一個 decorator, 而後每次寫 tornado 時都是一大堆的:

@tornado.gen.coroutine
@debug_wrapper
def xxx():
    # ...
    raise tornado.gen.Return(xxx)

 

挺煩。

不過 Python 界的主流後端框架除了 tornado 外還有 flask 和 django,那麼使用這兩種框架的人在遇到密集的 IO 時該怎麼辦呢? 還好有神奇的 gevent!gevent 經過 patch 的方式將各類常見的 IO 調用封裝爲協程,而且將整個調度過程徹底封裝,用戶能夠用近乎黑盒的方式來使用, 你惟一須要作的就是先手動 patch 一下,而後用 gevent.spawn 去發起任務,若是須要同步的話就再 joinall 一下。 能夠說 gevent 選擇了和 golang 同樣的思路,gevent.spawn 就像是 golang 裏的 goroutine,gevent 再繼續優化升級下去,終極目標就是實現 golang 的 runtime 吧。

gevent 的一個例子:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
cost 0.7102580070495605s for url http://httpbin.org/user-agent
cost 0.7106029987335205s for url http://httpbin.org/get
cost 0.7245540618896484s for url http://httpbin.org/headers
cost 0.7327840328216553s for url http://httpbin.org/
cost 1.073429822921753s for url http://httpbin.org/ip
total cost 1.0802628993988037s
"""
import time

import gevent
import gevent.monkey


gevent.monkey.patch_socket()

try:
    import urllib2
except ImportError:
    import urllib.request as urllib2


TARGET_URLS = (
    'http://httpbin.org/',
    'http://httpbin.org/ip',
    'http://httpbin.org/user-agent',
    'http://httpbin.org/headers',
    'http://httpbin.org/get',
)


def demo_task(url):
    start_ts = time.time()
    r = urllib2.urlopen(url)
    print('cost {}s for url {}'.format(time.time() - start_ts, url))


def demo_handler():
    start_ts = time.time()
    tasks = [gevent.spawn(demo_task, url) for url in TARGET_URLS]
    gevent.joinall(tasks)
    print('total cost {}s'.format(time.time() - start_ts))


def main():
    demo_handler()


if __name__ == '__main__':
    main()

 

Python 3 的官方的解決方案 asyncio 選擇了更爲白盒的調用方式, 該方案極大的吸取了 tornado 的優勢,而且爲了解決 tornado 的協程返回,增長了新語法 yield from, 因此在 Python 3.4 的時代,你能夠用近乎和 tornado 徹底相同的方法寫 asyncio:

# python 3.4
# 注意:Python 3.6 已經不這麼寫了

import asyncio


@asyncio.coroutine
def coroutine_demo():
    r = yield from coroutine_child_demo()
    print(r)


@asyncio.coroutine
def coroutine_child_demo():
    asyncio.sleep(1)
    return 2

 

不過這麼寫仍是太醜陋,並且總讓人以爲 coroutine 只是一個第三方包提供的功能,好在反正 asyncio 包被聲明爲一個不穩定的開發狀態的包, 因此咱們能夠繼續大改,因此 asyncio 的大幅修改一直到了 Python3.6 纔算正式結束。

Python 3.6 做爲 asyncio 的第一個穩定版,新的語法已經變成了這樣:

import asyncio


async def coroutine_demo():
    r = awiat coroutine_child_demo()
    print(r)


async def coroutine_child_demo():
    asyncio.sleep(1)
    return 2


if __name__ == '__main__':
    ioloop = asyncio.get_event_loop()
    ioloop.run_until_complete(coroutine_demo())

 

下面會稍微詳細的講解 asyncio 包的用法。


4、asyncio

後面的例子裏,我都會用 asyncio.sleep 來表示一個耗時的阻塞操做, 你能夠將其理解爲實際操做中的網絡請求或文件讀寫等 IO 操做。

一、cotouine

首先,你要會建立協程:

async def coroutine_demo():
    await asyncio.sleep(2)


print(coroutine_demo)
# <function coroutine_demo at 0x7fd35c4c89d8>

print(coroutine_demo())
# <coroutine object coroutine_demo at 0x7fd35c523ca8>

 

協程都是非阻塞的,當你調用一個協程時(形如 coroutine_demo()), 這個協程程序就被執行了,直到執行到另外一個協程(asyncio.sleep), 這時會在 ioloop 裏掛起一個事件,而後馬上返回。

此時你須要作的,就是繼續幹你的事情,而且確保你給了這個協程足夠的時間執行完成, 因此繼續寫完這個簡短的腳本:

if __name__ == '__main__':
    ioloop = asyncio.get_event_loop()  # 建立事件循環 ioloop
    coroutine = coroutine_demo()  # 啓動協程
    future = asyncio.ensure_future(coroutine)  # 將其封裝爲 Future 對象

    # 而後就只須要將 future 提交給 ioloop,讓其等待該 future 對象完成就好了
    ioloop.run_untile_complete(future)
    print('all done')

 

二、Task & Future

Future 有點像是一個 lazy object,當你調用一個協程時,這個協程會被註冊到 ioloop, 同時該協程會馬上返回一個 coroutine 對象,而後你能夠用 asyncio.ensure_future 將其封裝爲一個 Future 對象。

當協程任務結束時,這個 future 對象的狀態也會變化,能夠經過這個 future 對象來獲取該任務的結果值(或異常):

future = asyncio.ensure_future(coroutine_demo())
future.done()
# 任務是否結束
# True or False

future.result(timeout=None)
# 獲取任務的結果
# 默認會阻塞等待到任務結束

 

目前提到了 coroutine、Task 和 future,對於這三者的關係,個人理解以下:

  • coroutine 是一種函數,能夠用來定義協程;
  • Task 就是 future,是 asyncio 裏最小的任務單位,asyncio 裏的各類調度都是基於 future 來就進行的;

下面舉一些用例

三、調度

先簡單的說一下 asyncio 的使用,首先你須要啓動一個主函數,在主函數裏你實例化 ioloop, 而後在這個 ioloop 裏註冊任意多的 task,task 也能夠註冊子 task,以後你能夠選擇讓 ioloop 永久的運行下去, 或者運行到最後一個 task 完成爲止。

首先看一個最簡單的案例,請求多個 URL:

urls = [
    'https://httpbin.org/',
    'https://httpbin.org/get',
    'https://httpbin.org/ip',
    'https://httpbin.org/headers',
]

async def crawler():
    async with aiohttp.ClientSession() as session:
        futures = map(asyncio.ensure_future, map(session.get, urls))
        for f in asyncio.as_completed(futures):
            print(await f)


if __name__ == '__main__':
    ioloop = asyncio.get_event_loop()
    ioloop.run_untile_complete(asyncio.ensure_future(crawler()))

 

上面的例子裏能夠看到,咱們啓動了不少了 session.get 的子協程,而後用 asyncio.ensure_future 將其封裝爲 future, 而後調用 as_completed 方法監聽這一堆的子任務,每當有子任務完成時,就會觸發 for 循環對結果進行處理。

asyncio 裏除了 as_completed 外,經常使用的還有 asyncio.wait(fs, timeout=None, when=ALL_COMPLETED)。 方法就是能夠等待多個 futureswhen 參數能夠設定等待的模式,可接受的參數有:

  • FIRST_COMPLETED:等到第一個完成;
  • FIRST_EXCEPTION:等到一個出錯;
  • ALL_COMPLETED:等待所有完成。

因此上面的函數,as_completed 那段還能夠寫成:

await asyncio.wait(futures)
for f in futures:
    print(f.result())

 

四、定時任務

除了上面舉的那些事件觸發的任務外,asyncio 還能夠依靠時間進行觸發。

ioloop = asyncio.get_event_loop()

# 一段時間後運行
ioloop.call_later(delay_in_seconds, callback, args)

# 指定時間運行
ioloop.call_at(when, callback, *args)

 

這裏須要注意的是,ioloop 使用的是本身的時間,你能夠經過 ioloop.time() 獲取到 ioloop 當前的時間,因此若是你要用 call_at,你須要計算出相對於 ioloop 的時間。因此其實這個方法沒什麼意義,通常用 ioloop.call_later 這個方法用的更多。


5、鎖

一、併發控制

攜程帶來的性能提高很是的顯著,以致於你須要考慮一個你之前可能從未考慮過的問題:併發控制。 對資源的控制也是異步編程的難點所在。

舉個例子,你須要下載 100 萬 張圖片,過去你開了 20 個 線程來下載,那麼在同一時間最大的併發量就是 20, 對於服務器而言,最多須要處理 20 qps 的請求,對於客戶端而言,最多須要在內存裏放 20 張 圖片的數據,僅此而已。

可是進入協程時代,全部的東西都是非阻塞的,你能夠在很短的時間內向遠程發起 100 萬 的請求, 也可能在內存裏掛起 100 萬 次請求的數據,這不管對於服務端仍是客戶端都是難以接受的。

asyncio 裏提供了四種鎖:

  • Lock
  • Semaphore
  • Event
  • Condition

下面先介紹一個最經常使用的案例,而後再逐一介紹這幾個鎖的區別。

首先講一下協程任務的併發控制,asyncio 提供了信號量方法 asyncio.Semaphore(value=1) , 這個方法會返回一個信號量,你能夠初始化一個信號量後,而後在每次發起請求時都去請求這個信號量, 來實現對攜程任務數量的控制,好比咱們能夠經過信號量來控制對服務器的請求併發數:

# initiallize semaphore
concurrency_sem = asyncio.Semaphore(50)

async with aiohttp.ClientSession() as session:
while 1:  # 即便這樣寫也不用擔憂併發數會爆炸啦
    # require semaphore
    # will be blocked when accesses to 50 concurrency
    async with concurrency_sem:
        async with session.get(url, timeout=10) as resp:
            assert resp.status == 200

 

若是不知道信號量是什麼,能夠參閱《並行編程中的各類鎖》

信號量能夠有效的控制同一時間任務的併發數,可是有時候一些協程任務的執行很是迅速, 致使任務執行返回的數據大量堆積,也就是所咱們須要限制任務的處理總量,而不是併發量, 這時候就能夠採用 asyncio.Queue(maxsize=0) 來進行控制, 咱們能夠經過設定 maxsize 來設定隊列的總長度,當隊列滿時,put 操做就會被掛起, 直到後續邏輯逐漸消化掉了隊列裏的任務後,才能繼續添加,這樣就實現了對任務堆積總量的控制。

好比咱們能夠用 Queue 來限制我讀取大文件時,不要一會兒把整個文件都讀進來, 而是讀幾行,處理幾行:

task_q = asyncio.Queue(maxsize=1000)


async def worker_to_process_each_line():
    while not task_q.empty():
        line = await task_q.get()
        # do something with this line


with open('huge_file_with_many_lines.txt', 'r') as f:
    worker_to_process_each_line()
    for line in f:
        await task_q.put(line)

 

活用 Semaphore 和 Queue,基本就能夠解決絕大部分的併發控制問題了。

二、Lock

最簡單的互斥鎖,其實會用 Semaphore 的話徹底不須要用 Lock 了,畢竟 mutex 只是 Semaphore 爲 1 時的特例。

lock = Lock()
async with lock():
    # ...

 

三、Event

事件鎖,這個鎖有兩個狀態:set 和 unset,能夠調用 evt.wait() 掛起等待,直到這個事件被 set()

evt = Event()

async def demo():
    await evt.wait()  # wait for set
    print('done)


demo()
print(evt.is_set())
# False


evt.set()  # release evt
# done

 

四、Condition

就像 Semaphore 能夠簡單理解爲帶計數器的 Lock,Condition 也能夠簡單理解爲帶計數器的 Event。

一個 Condition 能夠被多個協程等待,而後能夠按照需求喚醒指定數量的協程。

其實 Condition 是 threading 模塊裏一直存在的鎖,簡單介紹一下使用方法, 使用 condition 前須要先獲取鎖(async with cond),這是一個互斥鎖,調用 wait()時會自動的釋放鎖, ,針對 condition 的 notifynotify_all、wait必須在獲取鎖後才能操做,不然會拋出RuntimeError` 錯誤。

因此當你 notify 後若是須要當即生效的話,須要退出這個 mutex,而且掛起當前協程等待調度, 其餘協程才能順利的獲取 mutex,而且獲取到 condition 的信號,執行後續的任務,並在完成後釋放鎖。

from asyncio import Condition, sleep, get_event_loop, wait, ensure_future


async def workers(cond, i):
    async with cond:  # require lock
        print('worker {} is waiting'.format(i))
        await cond.wait()  # wait for notify and release lock

    print('worker {} done, released'.format(i))


async def main():
    cond = Condition()
    fs = list([ensure_future(workers(cond, i)) for i in range(5)])  # run workers

    await sleep(0.1)
    for i in range(3):
        print('notify {} workers'.format(i))
        async with cond:  # require lock
            cond.notify(i)  # notify

        await sleep(0.1)  # let another coroutine run

    async with cond:
        await sleep(0.5)
        print('notify all')
        cond.notify_all()

    await wait(fs)  # wait all workers done



get_event_loop().run_until_complete(main())

# Output:
# worker 0 is waiting
# worker 1 is waiting
# worker 2 is waiting
# worker 3 is waiting
# worker 4 is waiting
# notify 0 workers
# notify 1 workers
# worker 0 done, released
# notify 2 workers
# worker 1 done, released
# worker 2 done, released
# notify all
# worker 3 done, released
# worker 4 done, released

 


6、多進程

上面提到了,python asyncio 的實現方案是 N:1,因此協程是不能跨核的。爲了利用多核,你須要建立多進程程序,而且爲每個進程初始化一個 ioloop。

咱們可使用 concurrent.futures 裏提供的 ProcessPoolExecutor 來輕鬆的實現多進程。

from concurrent.futures import ProcessPoolExecutor, as_completed
from asyncio import get_event_loop, sleep, ensure_future


async def coroutine_demo():
    await sleep(1)

def runner():
    ioloop = get_event_loop()
    future = ensure_future(coroutine_demo())
    ioloop.run_until_complete(future)


def main():
    executor = ProcessPoolExecutor(max_workers=7)  # CPU 數 - 1
    for futu in as_completed([executor.submit(runner) for _ in range(7)]):
        result = futu.result()
        # ...

 

一、多線程

順便提一下多線程,有時候須要兼容舊代碼,你須要調用過去用線程寫的程序,或者有些阻塞無法用 asyncio 解決,你只能包一層線程,可是你又但願用 asyncio 的方式來調用,這時候就須要用到 run_in_executor

代碼片斷示例:

from concurrent.futures import ThreadPoolExecutor
import time

executor = ThreadPoolExecutor(max_workers=10)
ioloop = get_event_loop()

def something_blocking():
    time.sleep(5)

# 關鍵代碼
ioloop.run_in_executor(executor, something_blocking, *args)

 

你能夠經過 ioloop.set_default_executor(executor) 設置好經常使用的 executor,以後再調用 run_in_executor(None, somthing_blocking, *args) 的時候,第一個參數就能夠傳 None 了。


7、社區

由於 asyncio 幾乎顛覆了過去 python 的寫法邏輯,若是你要使用 asyncio,你幾乎須要重構全部的阻塞庫,不過感謝活躍的社區,目前各類第三方庫發展的速度很是快。

好比你能夠在下面這個頁面找到各式各樣的支持 asyncio 的第三方庫:

並且由於 asyncio 已經做爲官方的事實標準,因此包括 tornado 在內的第三方異步解決方案目前也開始對 asyncio 提供了支持。我稍後會另寫一篇介紹如何將過去的 tornado 項目無縫的遷移到 asyncio 來。


知識點差很少就這些,瞭解了這些,就能夠上手開動了。


8、實用案例

爲了方便,我寫過一個基於 asyncio 的腳本框架,能夠按時執行各個任務:https://github.com/Laisky/ramjet

再貼一個給同事寫的批量下載 s3 圖片的腳本,這個腳本須要讀取一個有一千萬行的圖片文件地址文件, 而後按照每一行的地址去請求服務器下載文件,因此我作了一次最多讀取 1000 行,最多發起 10 個 鏈接的併發控制:

import os
import asyncio
import datetime

import aiohttp
import aiofiles


async def image_downloader(task_q):
    async with aiohttp.ClientSession() as session:
        while not task_q.empty():
            url = await task_q.get()
            try:
                async with session.get(url, timeout=5) as resp:
                    assert resp.status == 200
                    content = await resp.read()
            except Exception as err:
                print('Error for url {}: {}'.format(url, err))
            else:
                fname = split_fname(url)
                print('{} is ok'.format(fname))
                await save_file(fname, content)


def split_fname(url):
    # do something
    return 'FILENAME_AFTER_PROCESSED'


async def save_file(fname, content):
    async with aiofiles.open(fname, mode='wb') as f:
        await f.write(content)


async def produce_tasks(task_q):
    with open('images.txt', 'r') as f:
        for count, image_url in enumerate(f):
            image_url = image_url.strip()

            if os.path.isfile(split_fname(image_url)):
                continue

            await task_q.put(image_url)


async def run():
    task_q = asyncio.Queue(maxsize=1000)
    task_producer = asyncio.ensure_future(produce_tasks(task_q))
    workers = [asyncio.ensure_future(image_downloader(task_q)) for _ in range(10)]
    try:
        await asyncio.wait(workers+[task_producer])
    except Exception as err:
        print(err.msg)


def main():
    print('start at', datetime.datetime.utcnow())
    ioloop = asyncio.get_event_loop()
    ioloop.run_until_complete(asyncio.ensure_future(run()))
    print('end at', datetime.datetime.utcnow())


if __name__ == '__main__':
    main()
相關文章
相關標籤/搜索