Python併發編程之協程/異步IO

引言

隨着node.js的盛行,相信你們今年多多少少都聽到了異步編程這個概念。Python社區雖然對於異步編程的支持相比其餘語言稍顯遲緩,可是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await語法層面的支持,剛正式發佈的Python3.6中asyncio也已經由臨時版改成了穩定版。下面咱們就基於Python3.4+來了解一下異步編程的概念以及asyncio的用法。html

什麼是協程

一般在Python中咱們進行併發編程通常都是使用多線程或者多進程來實現的,對於計算型任務因爲GIL的存在咱們一般使用多進程來實現,而對與IO型任務咱們能夠經過線程調度來讓線程在執行IO任務時讓出GIL,從而實現表面上的併發。node

其實對於IO型任務咱們還有一種選擇就是協程,協程是運行在單線程當中的「併發」,協程相比多線程一大優點就是省去了多線程之間的切換開銷,得到了更大的運行效率。Python中的asyncio也是基於協程來進行實現的。在進入asyncio以前咱們先來了解一下Python中怎麼經過生成器進行協程來實現併發。python

example1

咱們先來看一個簡單的例子來了解一下什麼是協程(coroutine),對生成器不瞭解的朋友建議先看一下Stackoverflow上面的這篇高票回答git

>>> def coroutine():
...     reply = yield 'hello'
...     yield reply
...

>>> c = coroutine()

>>> next(c)
'hello'

>>> c.send('world')
'world'

example2

下面這個程序咱們要實現的功能就是模擬多個學生同時向一個老師提交做業,按照傳統的話咱們或許要採用多線程/多進程,可是這裏咱們能夠採用生成器來實現協程用來模擬併發。github

from collections import deque


def student(name, homeworks):
    for homework in homeworks.items():
        yield (name, homework[0], homework[1])  # 學生"生成"做業給老師


class Teacher(object):
    def __init__(self, students):
        self.students = deque(students)

    def handle(self):
        """老師處理學生做業"""
        while len(self.students):
            student = self.students.pop()
            try:
                homework = next(student)
                print('handling', homework[0], homework[1], homework[2])
            except StopIteration:
                pass
            else:
                self.students.appendleft(student)

下面咱們來調用一下這個程序。web

Teacher([
    student('Student1', {'math': '1+1=2', 'cs': 'operating system'}),
    student('Student2', {'math': '2+2=4', 'cs': 'computer graphics'}),
    student('Student3', {'math': '3+3=5', 'cs': 'compiler construction'})
]).handle()

這是輸出結果,咱們僅僅只用了一個簡單的生成器就實現了併發(concurrence),注意不是並行(parallel),由於咱們的程序僅僅是運行在一個單線程當中。數據庫

handling Student3 cs compiler construction
handling Student2 cs computer graphics
handling Student1 cs operating system
handling Student3 math 3+3=5
handling Student2 math 2+2=4
handling Student1 math 1+1=2

使用asyncio模塊實現協程

從Python3.4開始asyncio模塊加入到了標準庫,經過asyncio咱們能夠輕鬆實現協程來完成異步IO操做。編程

解釋一下下面這段代碼,咱們本身定義了一個協程display_date(num, loop),而後它使用關鍵字yield from來等待協程asyncio.sleep(2)的返回結果。而在這等待的2s之間它會讓出CPU的執行權,直到asyncio.sleep(2)返回結果。gather()或者wait()來返回future的執行結果。json

# coroutine.py
import asyncio
import datetime


@asyncio.coroutine  # 聲明一個協程
def display_date(num, loop):
    end_time = loop.time() + 10.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(2)  # 阻塞直到協程sleep(2)返回結果


loop = asyncio.get_event_loop()  # 獲取一個event_loop

tasks = [display_date(1, loop), display_date(2, loop)]

loop.run_until_complete(asyncio.gather(*tasks))  # 阻塞直到全部的tasks完成
loop.close()

下面是運行結果,注意到併發的效果沒有,程序從開始到結束只用大約10s,而在這裏咱們並無使用任何的多線程/多進程代碼。在實際項目中你能夠將asyncio.sleep(secends)替換成相應的IO任務,好比數據庫/磁盤文件讀寫等操做。bash

ziwenxie :: ~ » python coroutine.py
Loop: 1 Time: 2016-12-19 16:06:46.515329
Loop: 2 Time: 2016-12-19 16:06:46.515446
Loop: 1 Time: 2016-12-19 16:06:48.517613
Loop: 2 Time: 2016-12-19 16:06:48.517724
Loop: 1 Time: 2016-12-19 16:06:50.520005
Loop: 2 Time: 2016-12-19 16:06:50.520169
Loop: 1 Time: 2016-12-19 16:06:52.522452
Loop: 2 Time: 2016-12-19 16:06:52.522567
Loop: 1 Time: 2016-12-19 16:06:54.524889
Loop: 2 Time: 2016-12-19 16:06:54.525031
Loop: 1 Time: 2016-12-19 16:06:56.527713
Loop: 2 Time: 2016-12-19 16:06:56.528102

在Python3.5中爲咱們提供更直接的對協程的支持,引入了async/await關鍵字,上面的代碼咱們能夠這樣改寫,使用async代替了@asyncio.coroutine,使用了await代替了yield from,這樣咱們的代碼變得更加簡潔可讀。

import asyncio
import datetime


async def display_date(num, loop):  # 聲明一個協程
    end_time = loop.time() + 10.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(2)  # 等同於yield from


loop = asyncio.get_event_loop()  # 獲取一個event_loop

tasks = [display_date(1, loop), display_date(2, loop)]

loop.run_until_complete(asyncio.gather(*tasks))  # 阻塞直到全部的tasks完成
loop.close()

asyncio模塊的其餘方法

開啓事件循環有兩種方法,一種方法就是經過調用run_until_complete,另一種就是調用run_forever。run_until_complete內置add_done_callback,使用run_forever的好處是能夠經過本身自定義add_done_callback,具體差別請看下面兩個例子。

run_until_complete()

import asyncio


async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')


loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
print(loop.is_running())  # False
loop.run_until_complete(future)
print(future.result())
loop.close()

run_forever()

run_forever相比run_until_complete的優點是添加了一個add_done_callback,可讓咱們在task(future)完成的時候調用相應的方法進行後續處理。

import asyncio


async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')


def got_result(future):
    print(future.result())
    loop.stop()


loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
    loop.run_forever()
finally:
    loop.close()

這裏還要注意一點,即便你調用了協程方法,可是若是事件循環沒有開啓,協程也不會執行,參考官方文檔的描述,我剛被坑過。

Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method. Coroutines (and tasks) can only run when the event loop is running.

Call

call_soon()

import asyncio


def hello_world(loop):
    print('Hello World')
    loop.stop()


loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

下面是運行結果,咱們能夠經過call_soon提早註冊咱們的task,而且也能夠根據返回的Handle進行cancel。

Hello World

call_later()

import asyncio
import datetime


def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()


loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

改動一下上面的例子咱們來看一下call_later的用法,注意這裏並無像上面那樣使用while循環進行操做,咱們能夠經過call_later來設置每隔1秒去調用display_date()方法。

2016-12-24 19:17:13.421649
2016-12-24 19:17:14.422933
2016-12-24 19:17:15.424315
2016-12-24 19:17:16.425571
2016-12-24 19:17:17.426874

Chain coroutines

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)  # 協程compute不會繼續往下面執行,直到協程sleep返回結果
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)  # 協程print_sum不會繼續往下執行,直到協程compute返回結果
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

下面是輸出結果

ziwenxie :: ~ » python chain.py
Compute 1 + 2 ...
1 + 2 = 3

如何將同步的代碼改爲異步

結合上面提到的內容下面來小結一下如何將同步的代碼改爲異步

同步模型

def handle(id):
    subject = get_subject_from_db(id)  # 1
    buyinfo = get_buyinfo(id)  # 2
    change = process(subject, buyinfo)
    notify_change(change)
    flush_cache(id)

上面是一個典型的同步編程模型,每一個步驟必須創建在上一個步驟完成的前提,可是注意到步驟1步驟2之間並無任何的關係,因此能夠將這兩個IO型改爲異步的,讓二者能夠併發進行。

異步模型

# 先要將get_subject_from_db, get_buyinfo, process, notify_change修改爲協程函數/方法
import asyncio

def handle(id):
    subject = asyncio.ensure_future(get_subject_from_db(id))  # 1
    buyinfo = asyncio.ensure_future(get_buyinfo(id))  # 2
    results = asyncio.gather(subject, buyinfo)
    change = await process(results)
    await notify_change(change)
    loop.call_soon(flush_cache(id))

使用ensure_future, loop.crate_task, Task能夠將協程包裝成一個Future對象,這裏咱們選擇ensure_future。

Queue

在asyncio使用Queue來模擬生產者-消費者模式:

import asyncio
import random


async def produce(queue, n):
    for x in range(n):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()

        # process the item
        print('consuming {}...'.format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())

        # Notify the queue that the item has been processed
        queue.task_done()


async def run(n):
    queue = asyncio.Queue()
    # schedule the consumer
    consumer = asyncio.ensure_future(consume(queue))
    # run the producer and wait for completion
    await produce(queue, n)
    # wait until the consumer has processed all items
    await queue.join()
    # the consumer is still awaiting for an item, cancel it
    consumer.cancel()


loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
loop.close()

實戰

by the way: 在asyncio中使用requests沒有任何意義,requests是基於同步實現的,目前也沒有要支持asyncio的動向,若是要充分發回異步的威力,應該使用aiohttp。並且也要合理使用concurrent.futures模塊提供的線程池/進程池。

Asyncio+Aiohttp

import aiohttp
import asyncio
import time


NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'


async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return data['args']['a']


start = time.time()
event_loop = asyncio.get_event_loop()
tasks = [fetch_async(num) for num in NUMBERS]
results = event_loop.run_until_complete(asyncio.gather(*tasks))

for num, result in zip(NUMBERS, results):
    print('fetch({}) = {}'.format(num, result))

print('Use asyncio+aiohttp cost: {}'.format(time.time() - start))

下面是運行結果:

ziwenxie :: ~ » python example1.py
fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use asyncio+aiohttp cost: 0.8980867862701416

Requests+Pool

若是使用傳統的Requests和ThreadPool/ProcessPool方式的話,因爲多線程/多進程之間切換的開銷速度會慢了許多。

import requests
import time
from concurrent.futures import ThreadPoolExecutor


NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'


def fetch(a):
    r = requests.get(URL.format(a))
    return r.json()['args']['a']


start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
    for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)):
        print('fetch({}) = {}'.format(num, result))

print('Use requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

線程池的執行結果:

ziwenxie :: ~ » python example2.py
fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use requests+ThreadPoolExecutor cost: 3.356502056121826

進程池的執行結果:

fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use requests+ProcessPoolExecutor cost: 3.2979931831359863

Asyncio+Requests+Pool

雖然上面提到requests不支持異步,可是在某些情形須要控制event loop中運行在單獨的線程/進程中的function會阻塞直到這些function返回結果,這個時候能夠結合run_in_executor()和wait()來進行控制。

p.s: 下面這個例子在處理純IO任務的時候並無太多的意義,只是爲了理解如何在不支持異步的模塊中引入異步的概念。

import asyncio
import requests
import time
from concurrent.futures import ThreadPoolExecutor

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'

def fetch(a):
    r = requests.get(URL.format(a))
    return r.json()['args']['a']

async def run_scraper_tasks(executor):
    loop = asyncio.get_event_loop()

    blocking_tasks = []
    for num in NUMBERS:
        task = loop.run_in_executor(executor, fetch, num)
        task.__num = num
        blocking_tasks.append(task)

    completed, pending = await asyncio.wait(blocking_tasks)
    results = {t.__num: t.result() for t in completed}
    for num, result in sorted(results.items(), key=lambda x: x[0]):
        print('fetch({}) = {}'.format(num, result))

start = time.time()
executor = ThreadPoolExecutor(3)
event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(
    run_scraper_tasks(executor)
)

print('Use asyncio+requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

結果可想而知與requests+ThreadPoolExecutor執行速度上並無太多的差異,由於咱們的IO任務仍是放在對應的子線程中去處理的,只是這裏經過wait引入了異步的概念,可是在某些場景能夠取得更大自由度程度的控制。

fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use asyncio+requests+ThreadPoolExecutor cost: 3.614989995956421

Semaphore

爬蟲一次性的產生過多的請求帳號/IP很快就會被封掉,能夠考慮使用Semaphore控制同時的併發量,與咱們熟悉的threading模塊中的Semaphore(信號量)用法相似。

import aiohttp
import asyncio

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)


async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return data['args']['a']


async def print_result(a):
    with (await sema):
        r = await fetch_async(a)
        print('fetch({}) = {}'.format(a, r))


loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)

能夠到後臺看到併發受到了信號量的限制,同一時刻通常只處理三個請求。

References

DOCUMENTATION OF ASYNCIO1
DOCUMENTATION OF ASYNCIO2
COROUTINES AND ASYNC/AWAIT
GOLD-XITU1
GOLD-XITU2
STACKOVERFLOW
PyMOTW-3
500LINES

Contact

GitHub: https://github.com/ziwenxie
Blog: https://www.ziwenxie.site

本文爲做者原創,轉載請於開頭明顯處聲明博客出處:)

相關文章
相關標籤/搜索