協程實現了在單線程下的併發,每一個協程共享線程的幾乎全部的資源,除了協程本身私有的上下文棧;協程的切換屬於程序級別的切換,對於操做系統來講是無感知的,所以切換速度更快、開銷更小、效率更高,在有多IO操做的業務中能極大提升效率。html
python併發編程之asyncio協程(三)github
asyncio在python3.4後被內置在python中,使得python的協程建立變得更加方便。session
import asyncio import os # async 關鍵字定義一個協程 async def target_func1(): print('the func start') print(os.getpid()) print('the func end') def run(): # 建立一個協程對象 coroutine = target_func1() # 建立一個事件循環 loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) # 將協程對象添加到事件循環,運行直到結束 print(os.getpid()) loop.close() # 關閉事件循環 def run1(): # 建立一個事件循環 loop = asyncio.get_event_loop() # 建立一個協程對象 coroutine = target_func1(loop) loop.create_task(coroutine) # 建立一個任務並添加到事件循環中 loop.run_forever() # 開啓無限循環,須要在異步函數中調用stop()使中止 loop.close() if __name__ == '__main__': run() # 結果 the func start 4876 the func end 4876
以上可知,全部的代碼段都是在一個進程的單線程中執行。多線程
被async修飾的函數調用後會生成協程函數,能夠經過send喚醒執行。併發
async def target_func1(): print('the func start') print(os.getpid()) print('the func end') coroutine = target_func1() try: coroutine.send(None) # 喚醒協程 except StopIteration: print('xx') coroutine.close() # 關閉
async關鍵字能夠定義一個協程對象,被async修飾的函數變成了一個協程對象而不是一個普通的函數。
async def target_func1(): pass coroutine = target_func1() print(coroutine)
await用於控制事件的執行順序,它只能在異步函數中使用,即被async關鍵字定義的協程函數,不然報錯。當執行到await時,當前協程掛起,轉而去執行await後面的協程,完畢後再回到當前協程繼續往下。
# async 關鍵字定義一個協程 async def target_func1(): print('the func start') x = await target_func2() # 當前協程掛起 print(x) print('the func end') return 1 async def target_func2(): """ 目標函數2 :return: """ time.sleep(2) print('the func end2') return 0
asyncio.get_event_loop():建立一個事件循環,全部的異步函數都須要在事件循環中運行; asyncio.ensure_future():建立一個任務 asyncio.gather(*fs):添加並行任務 asyncio.wait(fs):添加並行任務,能夠是列表 loop.run_until_complete(func):添加協程函數同時啓動阻塞直到結束 loop.run_forever():運行事件無限循環,直到stop被調用 loop.create_task():建立一個任務並添加到循環 loop.close():關閉循環 loop.time():循環開始後到當下的時間 loop.stop():中止循環 loop.is_closed() # 判斷循環是否關閉 loop.create_future():建立一個future對象,推薦使用這個函數而不要直接建立future實例 loop.call_soon() # 設置回調函數,不能接受返回的參數,須要用到future對象,當即回調 loop.call_soon_threadsafe() # 線程安全的對象 loop.call_later() # 異步返回後開始算起,延遲迴調 loop.call_at() # 循環開始多少s回調 loop.call_exception_handler() # 錯誤處理
Future:主要用來保存任務的狀態; Task:Future的子類,擴展了Future的功能;
# Future from asyncio import Future # future = Future() # future.result() # 獲取任務的結果 # future.remove_done_callback(fn) # 刪除全部的回調函數並返回個數 # future.set_result('result') # 設置任務的結果,必須在result()以前執行,不然報錯 # future.exception() # 獲取任務的錯誤信息 # future.set_exception('bad') # 設置任務的錯誤信息 # future.add_done_callback('fn') # 添加回調函數 # Task current_task():返回循環當前的任務,類方法 all_tasks():返回事件循環全部的任務 get_stack():獲取其餘協程的堆棧列表 print_stack:輸出其餘協程的堆棧列表 cancel:取消任務
async def target_func3(name): """ :return: """ await asyncio.sleep(1) print(name) return 0 def run1(): # 建立一個事件循環 loop = asyncio.get_event_loop() x = loop.run_until_complete(asyncio.gather(target_func3('A'),target_func3('B'),target_func3('C'),)) print(x) # 等待返回結果,一個列表,按照事件添加的順序,可是計算的順序是不定的 loop.close() if __name__ == '__main__': run1()
run_forever()不能直接獲得異步函數的返回結果,須要使用Future類來做爲第三方保存結果,同時設置回調函數;
from asyncio import Future from functools import partial async def target_func0(name, future): """ 目標函數2 :return: """ time.sleep(1) print(name) future.set_result(name) # 設置返回結果 def got_result(loop, future): print(future.result()) # 處理結果 loop.stop() # 循環中止 def run(): loop = asyncio.get_event_loop() future = Future(loop=loop) res = asyncio.ensure_future(target_func0('A', future)) # 生成一個Task任務 print(res) future.add_done_callback(partial(got_result, loop)) # 回調函數默認只能有一個參數future,必須使用偏函數 # print(future.result()) # future上下文必須先調用future.set_result。 loop.run_forever() loop.close() if __name__ == '__main__': run()
協程裏調用等待另外的協程完成後才能返回。
import asyncio import time # async 關鍵字定義一個協程 async def target_func1(): print('the func start') x = await target_func2() # 等待協程完成,控制執行順序 print(x) print('the func end') return 1 async def target_func2(): """ 目標函數2 :return: """ time.sleep(2) print('the func end2') return 0 def run1(): # 建立一個事件循環 loop = asyncio.get_event_loop() x = loop.run_until_complete(target_func1()) print(x) loop.close() if __name__ == '__main__': run()
import asyncio import time from functools import partial # async 關鍵字定義一個協程 async def target_func1(): print('the func end') return 1 def get_res(loop): print('xxxx') loop.stop() def run1(): # 建立一個事件循環 loop = asyncio.get_event_loop() loop.create_task(target_func1()) # loop.call_soon(partial(get_res, loop)) # 設置回調函數,不能接受返回的參數,須要用到future對象 # loop.call_soon_threadsafe() # 線程安全的對象 # loop.call_later(delay=5, callback=partial(get_res, loop)) # 異步返回後開始算起,延遲5秒回調 # loop.call_at(when=8000,callback=partial(get_res, loop)) # 循環開始第8秒回調 # loop.call_exception_handler() # 錯誤處理 loop.run_forever() loop.close() if __name__ == '__main__': run1()
使用協程的目的是在系統發生io阻塞的時候,能夠交出CUP的控制權,讓其去執行其餘的任務。實際使用時通常的場景有本地IO和網絡IO。
# 使用asyncio+aiohttp,若是想異步化,網絡請求須要拋棄requests包 import asyncio import time from aiohttp import ClientSession async def target2(): print('start2') async with ClientSession() as session: async with session.get(url='http://www.baidu.com') as rsp: data = await rsp.read() print('end2') return data def run1(): # 建立一個事件循環 loop = asyncio.get_event_loop() tasks = [target2() for i in range(100)] ts = asyncio.gather(*tasks) t = time.time() loop.run_until_complete(ts) print(time.time()-t) loop.close() if __name__ == '__main__': run1()
核心思想:將文件讀寫的while循環換成事件循環。
可參考:https://github.com/lyyyuna/script_collection/blob/master/aysncfile/asyncfile.py
asyncio模塊也有本身的queue實現生產消費模式,只要有三種隊列:Queue(先進先出),PriorityQueue(優先級隊列),LifoQueue(棧),可是Queue不是線程安全的類,也就是說在多進程或多線程的狀況下不要使用這個隊列。
import asyncio import time from asyncio import Queue # async 關鍵字定義一個協程 async def target_func1(q:Queue): for i in range(100): await q.put(i) async def target_func2(q:Queue): for i in range(100): x = await q.get() print(x) def run1(): # 建立一個事件循環 loop = asyncio.get_event_loop() q = Queue(100) task = asyncio.gather(target_func1(q), target_func2(q)) loop.run_until_complete(task) loop.close() if __name__ == '__main__': run1()
Queue的get(),join(),put()方法返回的都是協程,須要使用await關鍵字。