Python 協程(Coroutine)體驗

概述

本文經過運行一段Python小程序,模擬一個真實的任務。比較在多線程(Multi-thread)和在多協程(Coroutine)環境下的編程實現。發現和解釋一些有趣的現象。以期爲你們帶來一些對協程的直觀感覺,加深對這種新鮮事物的理解。python

Python中的協程

協程(coroutine)是一個有很長曆史的概念,它是計算機程序的一類組件,推廣了協做式多任務的子程序。其詳細的概念和歷史請參照維基百科中的條目:https://en.wikipedia.org/wiki...
Python天生支持的生成器(generator)其實就是協程的一種實現,生成器容許執行被掛起與被恢復。可是因爲缺少更多語法上的支持,以及缺少利用生成器實現異步編程的成熟模式,限制了生成器做爲協程參與協做式多任務編程的用途。不過如今狀況發生的改變,Python自3.6版本開始添加了async/await的語法直接支持協程的異步編程,同時在asyncio庫中提供了協程編程的接口以及必要的基礎實現。社區也在不斷努力爲現有的IO庫提供異步的版本以便用於協程開發環境,例如http client目前至少在aiohttp以及tornado中都提供了可用於協程的異步版本。咱們知道IO操做天生是異步的,爲了適應普遍應用的同步編程模式,不少的IO庫都採用阻塞調用者的方式來實現同步。這樣雖然簡化了編程,可也帶來的並行度不高的問題。在一些有大量耗時IO操做的環境裏,應用不得不忍受串行操做形成的漫長等待,或是轉向多進程(Multi-Process)多線程編程以期提升並行程度。而多進程多線程編程又會引入爭用、通信,同步、保護等棘手的問題。並且咱們知道即便是做爲輕量級的線程也會對應一個獨立的運行棧。線程的調度和切換不可避免地包括運行棧的切換和加載。若是在一個進程中有成百上千的線程,那麼相應的調度開銷會急劇上升到難以忍受的程度。並且線程之間的同步和互鎖也將成爲一個噩夢。除去boss級別的死鎖問題,其餘任何的bug或是缺陷在多線程環境下都難於重現和追蹤,這是由於線程的調度有很大的隨機性。編程

一個Python小程序

下面是一個Python的小程序,能夠在Python3.8或者更新的版本上運行。小程序

import threading
import time
import asyncio


def gen():
    s = 0
    while s < 1000:
        yield s
        s += 1


def unsafe_thread_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            time.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")


async def wrong_coroutine_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            time.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")
        

async def starter_with_wrong_workers():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(wrong_coroutine_worker(g))
        tasks.append(task)
    await asyncio.gather(*tasks)
    

async def right_coroutine_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            await asyncio.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")
        

async def starter_with_right_workers():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(right_coroutine_worker(g))
        tasks.append(task)
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    
    print('----------------- Sequence  -----------------')
    g = gen()
    started_at = time.monotonic()
    t = 0
    for v in g:
        time.sleep(0.01)
        t += v
    print(t)
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')
    
    print('----------------- Unsafe threading  -----------------')
    g = gen()
    started_at = time.monotonic()
    threads =[]
    for _ in range(10):
        w = threading.Thread(target=unsafe_thread_worker, args=[g])
        w.start()
        threads.append(w)
    for w in threads:
        w.join()
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')

    print('----------------- Async with wrong coroutine  -----------------')
    g = gen()
    started_at = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(starter_with_wrong_workers())
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')
            
    print('----------------- Async with right coroutine  -----------------')
    g = gen()
    started_at = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(starter_with_right_workers())
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')

一個典型的運行輸出看起來像是這個樣子的:服務器

----------------- Sequence  -----------------
499500
total time consumed: 10.53 seconds
----------------- Unsafe threading  -----------------
 49804  49609 

 50033 
 49682 
 49574 
 50005 
 50143 
 50069  50219 

 50362 
total time consumed: 1.09 seconds
----------------- Async with wrong coroutine  -----------------
 499500 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
total time consumed: 10.55 seconds
----------------- Async with right coroutine  -----------------
 49500 
 49967 
 49973 
 50100 
 49965 
 50000 
 49968 
 49963 
 49964 
 50100 
total time consumed: 1.16 seconds

這個小程序實際上模擬了一個常見的真實任務。設想咱們經過一個http的數據API一頁一頁地獲取一個比較大地數據集。每頁數據經過一個帶有頁號或是起始位置的URL予以標識,而後經過向API服務器發送一個http request,並解析返回的http response中所包含的數據。其中的http訪問顯然是一個耗時的IO操做。返回數據的解析和處理是一個計算密集型的操做,相比IO等待,其消耗的時間不值一提。那個生成器gen能夠看做是一個數據頁面URL的生成器,也就是任務生成器。而後咱們使用sleep來模擬一個耗時的IO操做,使用加法來模擬數據的合併與分析。你也能夠把這個小程序想象成爲一個網絡爬蟲,咱們在一個全局的列表裏保存了全部目標網站的地址,而後或串行或並行地訪問全部地目標,取回咱們感興趣的數據存儲併合並分析。
總之,咱們有1000個比較獨立的小任務。因爲任務之間的沒有依賴性,因此多個任務是能夠並行執行的。每一個任務又分爲領取並明確任務,獲取數據(這是一個耗時0.01秒的IO操做),返回數據的存儲和處理幾個步驟。在一個任務內部,各個步驟間均有依賴,不能並行執行。
如今讓咱們來看看主函數,其代碼分爲4段,分別對應了4種不一樣得實現方法。方法一是最爲傳統的串行方式,經過一個簡單的循環,一個一個地獲取並完成任務,在一個任務完成後再領取下一個任務。不出所料,因爲IO操做是主要的耗時操做,串行執行的時間等於每一個任務耗時的總和,0.01*1000 = 10秒。方法二使用了多線程,模擬了一個有10個線程的線程池,池中的每一個線程均獨立地像方法一那樣工做。因爲全部的線程都是並行運行的,因此總的耗時幾乎是串行方法的1/10。方法三使用了協程,也是模擬了一個有10個協程的協程池,但是因爲使用了錯誤的IO操做,致使多個協程事實上不能並行執行,其總的耗時和方法一至關,咱們稍後會仔細分析比較。方法四修正了方法三的錯誤,使得協程可以並行運行,其總耗時與方法二至關。網絡

相關文章
相關標籤/搜索