在Python中使用asyncio進行異步編程

對於來自JavaScript編碼者來講,異步編程不是什麼新東西,但對於Python開發者來講,async函數和future(相似JS的promise)可不是那麼容易能理解的。python

Concurrency vs Parallelism

Concurrency和Parallelism聽起來同樣,但在實際編程裏它們有着較大的不一樣。編程

想象下你在作飯的時候寫書,看起來好像你在同一時間作兩件事情,實際你只是在兩項事情中相互切換,當你在等水開的時候你就能夠去寫書,但你切菜時你要暫停寫做。這就叫作concurrency。惟一一種使用parallel作這兩項工做的辦法就是得有兩我的,一我的寫做,一我的作飯,這就是多核CPU的工做方式了。promise

alt

爲何asyncio

異步編程容許你在單個線程中併發執行代碼。對比多線程處理方式,該方式由你來決定如何由一個任務切換到另外一個任務,tasks之間共享數據也更加容易和簡單。session

def queue_push_back(x):
        if len(list) < max_size:
            list.append(x)

若是咱們在多線程執行上面的額代碼,有可能第二行代碼在同一時間被執行,那麼同一時間就有兩個元素被加入到列表中,那實際列表長度就會操做max_size多線程

另外一個異步編程的好處是內存使用。每次一個新的線程建立,也須要開闢一些新內存用來進行上下文切換。若是使用了異步編程,這在單線程中就不存在該問題。併發

如何在Python編程async代碼

Asyncio包含三個主要組件:coroutine, event loop和futureapp

Coroutine

coroutine是異步函數,經過在函數定義def前使用async關鍵字。dom

async def my_task(args):
    pass

my_coroutine = my_task(args)

咱們使用了async關鍵字定義了一個函數,該函數並無執行,返回了一個coroutine對象。異步

有兩種從一個coroutine中獲取異步函數的結果async

第一種使用await關鍵字,僅只能在async函數中用來等待coroutine結束返回結果

result = await my_task(args)

第二種是將它加入到event loop中,接下來咱們作詳盡討論。

Event loop

event loop對象負責執行異步代碼以及決定異步函數如何進行切換。在建立了event loop後,咱們就能夠添加多個coroutines給它,coroutines將會調用了run_until_complete或者run_forever執行。

# create loop
loop = asyncio.new_event_loop()
# add coroutine to the loop
future = loop.create_task(my_coroutine)
# stop the program and execute all coroutine added
# to the loop concurrently
loop.run_until_complete(future)
loop.close()

Future

future相似一個佔位對象用來存放異步函數結果,提供函數狀態。當coroutine添加到event lop時建立future.有兩種方式建立:

future1 = loop.create_task(my_coroutine)
# or
future2 = asyncio.ensure_future(my_coroutine)

第一個方法是增長一個coroutine到loop中,返回一個task,它是future的子類。第二種方法很是相似,它接收一個coroutine,並加入到了默認loop中,惟一的區別是,它也能夠接收一個future參數,它將不會作任何事情,直接將futrue返回。

一個簡單的程序

import asyncio

async def my_task(args):
    pass

def main():
    loop = asyncio.new_event_loop()
    coroutine1 = my_task()
    coroutine2 = my_task()
    task1 = loop.create_task(coroutine1)
    task2 = loop.create_task(coroutine2)
    loop.run_until_complete(asycnio.wait([task1, task2]))
    print('task1 result:', task1.result())
    print('task2 result:', task2.result())
    loop.close()

就讓如你所看見的,咱們在執行異步函數前須要先創一個coroutine,而後咱們將建立future/task,把它添加到event loop。到如今病沒有如何的異步函數被執行,只有當咱們調用loop.run_until_completed,event loop開始執行全部的經過loop.createt_task或者asyncio.ensure_future添加的coroutines。loop.run_until_completed將會阻塞應用程序,僅當全部的future執行完畢後。在本例中,咱們使用asyncio.wait()建立future,當傳遞的全部future執行完後咱們就獲取到了future全部的結果。

異步函數

有一件事須要注意的是在Python中使用async聲明的函數並不意味着函數會併發執行。若是使用一個普通函數,在前面加入async關鍵字,event loop並不會中斷你的函數去執行另外一個coroutine。容許event loop進行切換coroutine至關簡單,使用await關鍵字就會容許event loop能夠切換其餘註冊到loop中的coroutine。

import asyncio

async def print_numbers_async1(n, prefix):
    for i in range(n):
        print(prefix, i)

async def print_numbers_async2(n, prefix):
    for i in range(n):
        print(prefix, i)
        if i % 5 == 0:
            await asyncio.sleep(0)
            
loop1 = asyncio.new_event_loop()
count1_1 = loop1.create_task(print_numbers_async1(10, 'c1_1'))
count2_1 = loop1.create_task(print_numbers_async1(10, 'c2_1'))
loop1.run_until_complete(asyncio.wait([count1_1, count2_1]))
loop1.close()

loop2 = asyncio.new_event_loop()
count1_2 = loop2.create_task(print_numbers_async2(10, 'c1_2'))
count2_2 = loop2.create_task(print_numbers_async2(10, 'c2_2'))
loop2.run_until_complete(asyncio.wait([count1_2, count2_2]))
loop2.close()

若是咱們執行該代碼,咱們能夠看到loop1將會在c1_1徹底執行完後纔去執行c2_1。而在loop2每打印五個數值後就會進行切換。

真實案例

如今咱們Python中最進本的異步編程,如今讓咱們寫一個真實例子,咱們從互聯網下載一系列頁面,並打印出開頭三行。

import aiohttp
import asyncio

async def print_preview(url):
    # connect to the server
    async with aiohttp.ClientSession() as session:
        # create get request
        async with session.get(url) as response:
            # wait for response
            response = await response.text()

            # print first 3 not empty lines
            count = 0
            lines = list(filter(lambda x: len(x) > 0, response.split('\n')))
            print('-'*80)
            for line in lines[:3]:
                print(line)
            print()

def print_all_pages():
    pages = [
        'http://textfiles.com/adventure/amforever.txt',
        'http://textfiles.com/adventure/ballyhoo.txt',
        'http://textfiles.com/adventure/bardstale.txt',
    ]

    tasks =  []
    loop = asyncio.new_event_loop()
    for page in pages:
        tasks.append(loop.create_task(print_preview(page)))

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

if __name__ == "__main__":
    print_all_pages()

這裏的代碼也很容易理解,咱們使用異步函數下載URL,而且打印了前三行。而後咱們建立了一個函數用來構建一個頁面了列表,交給print_preview去執行,將coroutine加入到loop,把future放到了一個列表中 ,咱們執行event loop,在全部coroutine執行完後程序結束。

異步生成器

最後我想談談的是異步生成器。要實現一個異步生成器至關簡單:

import asyncio
import math
import random

async def is_prime(n):
    if n < 2:
        return True

    for i in range(2, n):
        await asyncio.sleep(0)
        if n % i == 0:
            return False

    return True


async def prime_generator(n_prime):
    counter = 0
    n = 0
    while counter < n_prime:
        n += 1
        prime = await is_prime(n)
        if prime:
            yield n
            counter += 1
    
async def check_email(limit):
    for i in range(limit):
        if random.random() > 0.8:
            print('1 new email')
        else:
            print('0 new email')
        await asyncio.sleep(2)

async def print_prime(n):
    async for prime in prime_generator(n):
        print('new prime number found:', prime)


def main():
    loop = asyncio.new_event_loop()
    prime = loop.create_task(print_prime(3000))
    email = loop.create_task(check_email(10))
    loop.run_until_complete(asyncio.wait([prime, email]))
    loop.close()

if __name__ == "__main__":
    main()

異常處理

在coroutine內部拋出異常時並不會中斷應用程序,若是你沒有處理異常的話你將看到相似以下錯誤:

Task exception was never retrieved

有兩個方法來修正,在獲取future結果時捕獲異常,或者在future調用exception方法.

深刻了解

如今你已經瞭解如何使用asyncio編寫併發代碼,若是你想深刻了解的話,查看官方文檔。

相關文章
相關標籤/搜索