本文經過運行一段Python小程序,模擬一個真實的任務。比較在多線程(Multi-thread)和在多協程(Coroutine)環境下的編程實現。發現和解釋一些有趣的現象。以期爲你們帶來一些對協程的直觀感覺,加深對這種新鮮事物的理解。python
協程(coroutine)是一個有很長曆史的概念,它是計算機程序的一類組件,推廣了協做式多任務的子程序。其詳細的概念和歷史請參照維基百科中的條目:https://en.wikipedia.org/wiki...
Python天生支持的生成器(generator)其實就是協程的一種實現,生成器容許執行被掛起與被恢復。可是因爲缺少更多語法上的支持,以及缺少利用生成器實現異步編程的成熟模式,限制了生成器做爲協程參與協做式多任務編程的用途。不過如今狀況發生的改變,Python自3.6版本開始添加了async/await的語法直接支持協程的異步編程,同時在asyncio庫中提供了協程編程的接口以及必要的基礎實現。社區也在不斷努力爲現有的IO庫提供異步的版本以便用於協程開發環境,例如http client目前至少在aiohttp以及tornado中都提供了可用於協程的異步版本。咱們知道IO操做天生是異步的,爲了適應普遍應用的同步編程模式,不少的IO庫都採用阻塞調用者的方式來實現同步。這樣雖然簡化了編程,可也帶來的並行度不高的問題。在一些有大量耗時IO操做的環境裏,應用不得不忍受串行操做形成的漫長等待,或是轉向多進程(Multi-Process)多線程編程以期提升並行程度。而多進程多線程編程又會引入爭用、通信,同步、保護等棘手的問題。並且咱們知道即便是做爲輕量級的線程也會對應一個獨立的運行棧。線程的調度和切換不可避免地包括運行棧的切換和加載。若是在一個進程中有成百上千的線程,那麼相應的調度開銷會急劇上升到難以忍受的程度。並且線程之間的同步和互鎖也將成爲一個噩夢。除去boss級別的死鎖問題,其餘任何的bug或是缺陷在多線程環境下都難於重現和追蹤,這是由於線程的調度有很大的隨機性。編程
下面是一個Python的小程序,能夠在Python3.8或者更新的版本上運行。小程序
import threading import time import asyncio def gen(): s = 0 while s < 1000: yield s s += 1 def unsafe_thread_worker(g): t = 0 try: while True: v = next(g) time.sleep(0.01) t += v except StopIteration: print(f" {t} ") async def wrong_coroutine_worker(g): t = 0 try: while True: v = next(g) time.sleep(0.01) t += v except StopIteration: print(f" {t} ") async def starter_with_wrong_workers(): tasks = [] for _ in range(10): task = asyncio.create_task(wrong_coroutine_worker(g)) tasks.append(task) await asyncio.gather(*tasks) async def right_coroutine_worker(g): t = 0 try: while True: v = next(g) await asyncio.sleep(0.01) t += v except StopIteration: print(f" {t} ") async def starter_with_right_workers(): tasks = [] for _ in range(10): task = asyncio.create_task(right_coroutine_worker(g)) tasks.append(task) await asyncio.gather(*tasks) if __name__ == '__main__': print('----------------- Sequence -----------------') g = gen() started_at = time.monotonic() t = 0 for v in g: time.sleep(0.01) t += v print(t) total_time = time.monotonic() - started_at print(f'total time consumed: {total_time:.2f} seconds') print('----------------- Unsafe threading -----------------') g = gen() started_at = time.monotonic() threads =[] for _ in range(10): w = threading.Thread(target=unsafe_thread_worker, args=[g]) w.start() threads.append(w) for w in threads: w.join() total_time = time.monotonic() - started_at print(f'total time consumed: {total_time:.2f} seconds') print('----------------- Async with wrong coroutine -----------------') g = gen() started_at = time.monotonic() loop = asyncio.get_event_loop() loop.run_until_complete(starter_with_wrong_workers()) total_time = time.monotonic() - started_at print(f'total time consumed: {total_time:.2f} seconds') print('----------------- Async with right coroutine -----------------') g = gen() started_at = time.monotonic() loop = asyncio.get_event_loop() loop.run_until_complete(starter_with_right_workers()) total_time = time.monotonic() - started_at print(f'total time consumed: {total_time:.2f} seconds')
一個典型的運行輸出看起來像是這個樣子的:服務器
----------------- Sequence ----------------- 499500 total time consumed: 10.53 seconds ----------------- Unsafe threading ----------------- 49804 49609 50033 49682 49574 50005 50143 50069 50219 50362 total time consumed: 1.09 seconds ----------------- Async with wrong coroutine ----------------- 499500 0 0 0 0 0 0 0 0 0 total time consumed: 10.55 seconds ----------------- Async with right coroutine ----------------- 49500 49967 49973 50100 49965 50000 49968 49963 49964 50100 total time consumed: 1.16 seconds
這個小程序實際上模擬了一個常見的真實任務。設想咱們經過一個http的數據API一頁一頁地獲取一個比較大地數據集。每頁數據經過一個帶有頁號或是起始位置的URL予以標識,而後經過向API服務器發送一個http request,並解析返回的http response中所包含的數據。其中的http訪問顯然是一個耗時的IO操做。返回數據的解析和處理是一個計算密集型的操做,相比IO等待,其消耗的時間不值一提。那個生成器gen能夠看做是一個數據頁面URL的生成器,也就是任務生成器。而後咱們使用sleep來模擬一個耗時的IO操做,使用加法來模擬數據的合併與分析。你也能夠把這個小程序想象成爲一個網絡爬蟲,咱們在一個全局的列表裏保存了全部目標網站的地址,而後或串行或並行地訪問全部地目標,取回咱們感興趣的數據存儲併合並分析。
總之,咱們有1000個比較獨立的小任務。因爲任務之間的沒有依賴性,因此多個任務是能夠並行執行的。每一個任務又分爲領取並明確任務,獲取數據(這是一個耗時0.01秒的IO操做),返回數據的存儲和處理幾個步驟。在一個任務內部,各個步驟間均有依賴,不能並行執行。
如今讓咱們來看看主函數,其代碼分爲4段,分別對應了4種不一樣得實現方法。方法一是最爲傳統的串行方式,經過一個簡單的循環,一個一個地獲取並完成任務,在一個任務完成後再領取下一個任務。不出所料,因爲IO操做是主要的耗時操做,串行執行的時間等於每一個任務耗時的總和,0.01*1000 = 10秒。方法二使用了多線程,模擬了一個有10個線程的線程池,池中的每一個線程均獨立地像方法一那樣工做。因爲全部的線程都是並行運行的,因此總的耗時幾乎是串行方法的1/10。方法三使用了協程,也是模擬了一個有10個協程的協程池,但是因爲使用了錯誤的IO操做,致使多個協程事實上不能並行執行,其總的耗時和方法一至關,咱們稍後會仔細分析比較。方法四修正了方法三的錯誤,使得協程可以並行運行,其總耗時與方法二至關。網絡