Python中的併發處理之使用asyncio包

導語:本文章記錄了本人在學習Python基礎之控制流程篇的重點知識及我的心得,打算入門Python的朋友們能夠來一塊兒學習並交流。

本文重點:python

一、瞭解asyncio包的功能和使用方法;
二、瞭解如何避免阻塞型調用;
三、學會使用協程避免回調地獄。

1、使用asyncio包作併發編程

一、併發與並行

併發:一次處理多件事。
並行:一次作多件事。
併發用於制定方案,用來解決可能(但未必)並行的問題。併發更好。web

二、asyncio概述

瞭解asyncio的4個特色:編程

  • asyncio包使用事件循環驅動的協程實現併發。
  • 適合asyncio API的協程在定義體中必須使用yield from,而不能使用yield。
  • 使用asyncio處理的協程,需在定義體上使用@asyncio.coroutine裝飾。裝飾的功能在於凸顯協程,同時當協程不產出值,協程會被垃圾回收。
  • Python3.4起,asyncio包只直接支持TCP和UDP協議。若是想使用asyncio實現HTTP客戶端和服務器時,常使用aiohttp包。

在協程中使用yield from須要注意兩點:api

  • 使用yield froml連接的多個協程最終必須由不是協程的調用方驅動,調用方顯式或隱式在最外層委派生成器上調用next()函數或 .send()方法。
  • 鏈條中最內層的子生成器必須是簡單的生成器(只使用yield)或可迭代的對象。

但在asyncio包的API中使用yield from還需注意兩個細節:服務器

  • asyncio包中編寫的協程鏈條始終經過把最外層委派生成器傳給asyncio包API中的某個函數驅動,例如loop.run_until_complete()。即不經過調用next()函數或 .send()方法驅動協程。
  • 編寫的協程鏈條最終經過yield from把職責委託給asyncio包中的某個協程函數或協程方法。即最內層的子生成器是庫中真正執行I/O操做的函數,而不是咱們本身編寫的函數。

實例——經過asyncio包和協程以動畫形式顯示文本式旋轉指針:多線程

import asyncio
import itertools
import sys

@asyncio.coroutine  # 交給 asyncio 處理的協程要使用 @asyncio.coroutine 裝飾
def spin(msg):
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        print(status)
        try:
            yield from asyncio.sleep(.1)  # 使用 yield from asyncio.sleep(.1) 代替 time.sleep(.1),這樣的休眠不會阻塞事件循環。
        except asyncio.CancelledError:  # 若是 spin 函數甦醒後拋出 asyncio.CancelledError 異常,其緣由是發出了取消請求,所以退出循環。
            break

@asyncio.coroutine
def slow_function():  # slow_function 函數是協程,在用休眠僞裝進行 I/O 操做時,使用 yield from 繼續執行事件循環。
    # 僞裝等待I/O一段時間
    yield from asyncio.sleep(3)  # yield from asyncio.sleep(3) 表達式把控制權交給主循環,在休眠結束後恢復這個協程。
    return 42

@asyncio.coroutine
def supervisor():  # supervisor 函數也是協程
    spinner = asyncio.async(spin('thinking!'))  # asyncio.async(...) 函數排定 spin 協程的運行時間,使用一個 Task 對象包裝spin 協程,並當即返回。
    print('spinner object:', spinner)
    result = yield from slow_function()  # 驅動 slow_function() 函數。結束後,獲取返回值。
# 同時,事件循環繼續運行,由於slow_function 函數最後使用 yield from asyncio.sleep(3) 表達式把控制權交回給了主循環。
    spinner.cancel()  # Task 對象能夠取消;取消後會在協程當前暫停的 yield 處拋出 asyncio.CancelledError 異常。協程能夠捕獲這個異常,也能夠延遲取消,甚至拒絕取消。
    return result

if __name__ == '__main__':
    loop = asyncio.get_event_loop()  # 獲取事件循環的引用
    result = loop.run_until_complete(supervisor())  # 驅動 supervisor 協程,讓它運行完畢;這個協程的返回值是此次調用的返回值。
    loop.close()
    print('Answer:', result)

三、線程與協程對比

線程調度程序在任什麼時候候都能中斷線程。必須記住保留鎖。去保護程序中的重要部分,防止多步操做在執行的過程當中中斷,防止數據處於無效狀態。
協程默認會作好全方位保護,以防止中斷。對協程來講無需保留鎖,在多個線程之間同步操做,協程自身就會同步,由於在任意時刻只有一個協程運行。併發

四、從期物、任務和協程中產出

在asyncio包中,期物和協程關係緊密,由於可使用yield from從asyncio.Future對象中產出結果。這意味着,若是foo是協程函數,抑或是返回Future或Task實例的普通函數,那麼能夠這樣寫:res=yield from foo()。這是asyncio包中不少地方能夠互換協程與期物的緣由之一。異步

2、避免阻塞型調用

一、有兩種方法能避免阻塞型調用停止整個應用程序的進程:

  • 在單獨的線程中運行各個阻塞型操做。
  • 把每一個阻塞型操做轉換成非阻塞的異步調用。

使用多線程處理大量鏈接時將耗費過多的內存,故此一般使用回調來實現異步調用。async

二、使用Executor對象防止阻塞事件循環:

  • 使用loop.run_in_executor把阻塞的做業(例如保存文件)委託給線程池作。
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        loop = asyncio.get_event_loop()  # 獲取事件循環對象的引用
        loop.run_in_executor(None,  # None 使用默認的 TrreadPoolExecutor 實例
                save_flag, image, cc.lower() + '.gif')  # 傳入可調用對象
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)

asyncio 的事件循環背後維護一個 ThreadPoolExecutor 對象,咱們能夠調用 run_in_executor 方法, 把可調用的對象發給它執行。異步編程

3、從回調到期物和協程

回調地獄:若是一個操做須要依賴以前操做的結果,那就得嵌套回調。
Python 中的回調地獄:

def stage1(response1):
    request2 = step1(response1)
    api_call2(request2, stage2)

def stage2(response2):
    request3 = step2(response2)
    api_call3(request3, stage3)

def stage3(response3):
    step3(response3)

api_call1(request1, step1)

使用 協程 和 yield from 結構作異步編程,無需用回調:

@asyncio.coroutine
def three_stages(request1):
    response1 = yield from api_call1()
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    request3 = step2(response2)
    response3 = yield from api_call3(request3)
    step3(response3)

loop.create_task(three_stages(request1))
# 協程不能直接調用,必須用事件循環顯示指定協程的執行時間,或者在其餘排定了執行時間的協程中使用 yield from 表達式把它激活

4、使用asyncio包編寫服務器

  • 使用asyncio包能實現TCP和HTTP服務器
  • Web服務將成爲asyncio包的重要使用場景。
相關文章
相關標籤/搜索