大部分基於asyncio的程序都是須要長期運行、基於網絡的應用,處理這種應用的正確開啓與關閉存在驚人的複雜性。html
開啓相對來講更簡單點,常規作法是建立一個task,而後調用loop.run_forever(),就如第三章QuickStart中的例子同樣。python
一個例外是當啓動監聽服務器時須要通過兩個階段:shell
run_until_complete()
來初始化並啓動服務器自己;loop.run_forever()
來調用main函數。一般啓動是很簡單的,碰到上述例外狀況,查看官方示例。編程
關閉就要複雜得多,以前講過run_forever()
調用會阻塞主線程,當執行關閉時,會解除阻塞並執行後續代碼,此時就須要:安全
run_until_complete()
來等待執行完畢。在這以後關閉纔算完成,初學者在寫異步代碼時老是極力擺脫的一些錯誤信息好比task還未等待就被關閉了,主要緣由就是遺失了上述步驟中的一個或多個,用個例子來講明。bash
import asyncio async def f(delay): await asyncio.sleep(delay) loop = asyncio.get_event_loop() t1 = loop.create_task(f(1)) # 任務1執行1秒 t2 = loop.create_task(f(2)) # 任務2執行2秒 loop.run_until_complete(t1) # 只有任務1被執行完成 loop.close()
λ python3 taskwaring.py Task was destroyed but it is pending! task: <Task pending coro=<f() running at taskwaring.py:4> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0312D6D0>()]>>
這個錯誤是說有些任務在loop關閉時還沒完成,這也就是爲何規範的關閉過程要將全部的task收集到一個task中,取消它們而後在loop關閉以前等待取消完成。服務器
再多看些比QuickStart代碼更細節的例子,此次用官方文檔中的echo服務器代碼做爲服務器,經過客戶端代碼來深刻學習。網絡
from asyncio import ( get_event_loop, start_server, CancelledError, StreamReader, StreamWriter, Task, gather ) async def echo(reader: StreamReader, writer: StreamWriter): # 1 print('New connection.') try: while True: # 2 data: bytes = await reader.readlines() # 3 if data in [b'', b'quit']: break writer.write(data.upper()) # 4 await writer.drain() print('Leaving Connection.') except CancelledError: # 5 writer.write_eof() print('Cancelled') finally: writer.close() loop = get_event_loop() coro = start_server(echo, '127.0.0.1', 8888, loop=loop) # 6 server = loop.run_until_complete(coro) # 7 try: loop.run_forever() # 8 except KeyboardInterrupt: print('Shutting Down!') server.close() # 9 loop.run_until_complete(server.wait_closed()) # 10 tasks = Task.all_tasks() # 11 group = gather(*tasks, return_exceptions=True) # 12 group.cancel() loop.run_until_complete(group) # 13 loop.close()
這個協程用於爲每一個創建的鏈接建立一個協程,使用了Stream的API;app
爲了保持鏈接,用死循環獲取消息;異步
從服務器獲取信息;
將消息的字符所有大寫返回;
此到處理退出,進行環境退出的清理工做;
這裏是程序開始的地方,服務器須要單獨循行,start_server方法返回一個corountine,必須在run_until_complete中執行;
運行coroutine來啓動TCP服務器;
如今纔開始程序的監聽部分,爲鏈接到服務器的每一個TCP生成一個coroutine來執行echo例程函數,惟一能打斷loop的只能是KeyboardInterrupt異常;
程序運行到這裏的話,關閉操做已經開始,從如今開始要讓服務器中止接受新的鏈接,第一步是調用server.close();
第二步是調用server.wait_closed()來關閉那些仍在等待鏈接創建的socket,仍處於活躍狀態的鏈接不會受影響;
開始關閉task,先收集當前全部等待狀態的task;
將task彙集到一個group中,而後調用cancel方法,此處的return_exceptions參數後面講;
運行group這個協程。
要注意的一點是,若是在一個coroutine內部捕捉了一個CancelledError,要注意在異常捕捉代碼中不要建立任何coroutine,all_tasks()
沒法感知在run_until_complete()
運行階段建立的任何新任務。
return_exceptions=True
參數是幹什麼的?gather()
方法有個默認參數是return_exceptions=False,經過默認設置來關閉異常處理是有問題的,很難直接解釋清楚,能夠經過一系列事實來講明:
1. run_until_complete()
方法執行Future對象,在關閉期間,執行由gather()
方法返回的Future對象;
2. 若是這個Future對象拋出了一個異常,那麼這個異常會繼續向上拋出,致使loop中止;
3. 若是run_until_complete()
被用來執行一個group Future對象,任何group內子任務未處理而拋出的異常都會被向上拋出,也包含CancelledError;
4. 若是一部分子任務處理了CancelledError異常,另外一部分未處理,則未處理的那部分的異常也會致使loop中止,這意味着loop在全部tasks完成前就中止了;
5. 在關閉loop時,不但願上述特性被觸發,只是想要全部在group中的task儘快執行結束,也不理會某些task是否拋出異常;
6. 使用gather(*, return_exceptions=True)
可讓group將子任務中的異常看成返回值處理,所以不會影響run_until_complete()
的執行。
關於捕獲異常不合人意的一點就是某些異常在group內被處理了而沒有被拋出,這對經過結果查找異常、寫logging形成了困難。
import asyncio async def f(delay): await asyncio.sleep(1/delay) # 傳入值是0就很噁心了 return delay loop = asyncio.get_event_loop() for i in range(10): loop.create_task(f(i)) pending = asyncio.Task.all_tasks() group = asyncio.gather(*pending, return_exceptions=True) results = loop.run_until_complete(group) print(f'Results: {results}') loop.close()
不設置參數的話就會致使異常被向上拋出,而後loop中止並致使其餘task沒法完成。安全退出是網絡編程最難的問題之一,這對asyncio也是同樣的。
在上一個例子中演示瞭如何經過KeyboardInterrupt
來退出loop,這個異常有效地結束了run_forever()
的阻塞,並容許後續代碼得以執行。
KeyboardInterrupt
異常等同於SIGINT
信號,在網絡服務中最經常使用的中止信號實際上是SIGTERM
,而且也是在UNIX shell環境中使用kill
指令發出的默認信號。
在UNIX系統中kill
指令其實就是發送信號給進程,不加參數地調用就會發送TERM
信號使進程安全退出或被忽視掉,一般這不是個好辦法,由於若是進程沒有退出,kill
就會發送KILL信號來強制退出,這會致使你的程序沒法可控地結束。
asyncio原生支持處理進程信號,但處理通常信號的複雜度過高(不是針對asyncio),本文不會深刻講解,只會挑一些常見信號來舉例。先看下例:
# shell_signal01.py import asyncio async def main(): # 這裏是應用的主體部分,簡單的用一個死循環來表示程序運行 while True: print('<Your app is running>') await asyncio.sleep(1) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.create_task(main()) # 這裏與前幾個例子同樣,將coroutine添加到loop中 try: loop.run_forever() except KeyboardInterrupt: # 在本例中,只有Ctrl-C會終止loop,而後像前例中進行善後工做 print('<Got signal: SIGINT, shutting down.>') tasks = asyncio.Task.all_tasks() group = asyncio.gather(*tasks, return_exceptions=True) group.cancel() loop.run_until_complete(group) loop.close()
這些很簡單,下面思考一些複雜的功能:
1. 產品須要將SIGINT和SIGTERM都看成中止信號;
2. 須要在應用的main()
中處理CancelledError
,而且處理異常的代碼也須要一小段時間來運行(例若有一堆網絡鏈接須要關閉);
3. 應用屢次接收中止信號不會出現異常,在接收到一次中止信號後,後續的信號都不做處理。
asyncio提供了足夠粒度的API來處理這些場景。
# shell_signal02.py import asyncio from signal import SIGINT, SIGTERM # 從標準庫中導入信號值 async def main(): try: while True: print('<Your app is running>') await asyncio.sleep(1) except asyncio.CancelledError: # 1 for i in range(3): print('<Your app is shtting down...>') await asyncio.sleep(1) def handler(sig): # 2 loop.stop() # 3 print(f'Got signal: {sig}, shtting down.') loop.remove_signal_handler(SIGTERM) # 4 loop.add_signal_handler(SIGINT, lambda: None) # 5 if __name__ == "__main__": loop = asyncio.get_event_loop() for sig in (SIGINT, SIGTERM): # 6 loop.add_signal_handler(sig, handler, sig) loop.create_task(main()) loop.run_forever() tasks = asyncio.Task.all_tasks() group = asyncio.gather(*tasks, return_exceptions=True) group.cancel() loop.run_until_complete(group) loop.close()
如今在coroutine內部處理中止業務,在調用group.cancel()時收到取消信號,在處理關閉loop的run_until_complete階段,main將繼續運行一段時間;
這是收到信號後的回調函數,它經過add_signal_handler()修改了loop的配置;
在回調函數開始執行時,首先要中止loop,這使得關閉業務代碼開始執行;
此時已經開始中止代碼業務,所以移除SIGTERM來忽視後續的中止信號,不然會使中止代碼業務也被終止;
原理與上面相似,但SIGINT不能簡單地remove,由於KeyboardInterrupt默認是SIGINT信號的handler,須要將SIGINT的handler置空;
在這裏配置信號的回調函數,都指向handler,所以配置了SIGINT的handler,會覆蓋掉默認的KeyboardInterrupt。
在QuickStart中有一段代碼使用了阻塞的sleep()
調用,當時說明了一個狀況即若是該阻塞調用耗時比loop的執行耗時長時會發生什麼,如今來討論,先放結論,若是不進行人工干預將會獲得一系列errors。
import time import asyncio async def main(): print(f'{time.ctime()} Hello!') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye!') loop.stop() def blocking(): time.sleep(1.5) print(f"{time.ctime()} Hello from a thread!") loop = asyncio.get_event_loop() loop.create_task(main()) loop.run_in_executor(None, blocking) loop.run_forever() tasks = asyncio.Task.all_tasks(loop=loop) group = asyncio.gather(*tasks, return_exceptions=True) loop.run_until_complete(group) loop.close()
λ python3 quickstart.py Sun Sep 30 14:11:57 2018 Hello! Sun Sep 30 14:11:58 2018 Goodbye! Sun Sep 30 14:11:59 2018 Hello from a thread! exception calling callback for <Future at 0x36cff70 state=finished returned NoneType> Traceback (most recent call last): ... raise RuntimeError('Event loop is closed') RuntimeError: Event loop is closed
來看下背後發生了什麼,run_in_executor()
返回的是Future而不是Task,這說明它不能被asyncio.Task.all_tasks()
感知,因此後續的run_until_complete()
也就不會等待這個Future執行完畢。
有三個解決思路,都通過了不一樣程度的權衡,下面逐個過一遍,從不一樣視角觀察事件loop的內涵,思考在程序中相互調用的全部coroutine、線程、子進程的生命週期管理。
第一個思路,將executor放到coroutine中並以此創建一個task。
# OPTION-A import time import asyncio async def main(): print(f'{time.ctime()} Hello!') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye!') loop.stop() def blocking(): time.sleep(2.0) print(f"{time.ctime()} Hello from a thread!") async def run_blocking(): # 1 await loop.run_in_executor(None, blocking) loop = asyncio.get_event_loop() loop.create_task(main()) loop.create_task(run_blocking()) # 2 loop.run_forever() tasks = asyncio.Task.all_tasks(loop=loop) group = asyncio.gather(*tasks, return_exceptions=False) loop.run_until_complete(group) loop.close()
這個想法是run_in_executor返回的Future而不是task,雖然沒法用all_tasks()捕獲,但能夠用await等待一個Future,因此用一個新的coroutine來await在executor中的阻塞調用,這個新的coroutine將被做爲task添加到loop;
就像運行main同樣將這個coroutine添加到loop中。
上述代碼看起來不錯,除了不能執行任務取消。能夠發現代碼中少了group.cancel()
,假若加回來又會獲得Event loop is closed
錯誤,甚至不能在run_blocking()
中處理CancelledError以便從新await Future,不管作什麼該task都會被取消,但executor會將其內部的sleep執行完。
第二個思路,收集還沒有完成的task,僅取消它們,但在調用run_until_complete()
以前要將run_in_executor()
生成的Future添加進去。
# OPTION-B import time import asyncio async def main(): print(f'{time.ctime()} Hello!') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye!') loop.stop() def blocking(): time.sleep(2.0) print(f"{time.ctime()} Hello from a thread!") loop = asyncio.get_event_loop() loop.create_task(main()) future = loop.run_in_executor(None, blocking) # 1 loop.run_forever() tasks = asyncio.Task.all_tasks(loop=loop) # 2 group_tasks = asyncio.gather(*tasks, return_exceptions=True) group_tasks.cancel() # 取消tasks group = asyncio.gather(group_task, future) # 3 loop.run_until_complete(group) loop.close()
記錄返回的Future;
此處loop已中止,先得到全部task,注意這裏面沒有executor的Future;
建立了一個新的group來合併tasks和Future,在這種狀況下executor也能正常退出,而tasks仍然經過正常的cancel來取消。
這個解決辦法在關閉時比較友好,但仍然有缺陷。一般來講,在整個程序中經過某種方式收集全部的executor返回的Future對象,而後與tasks合併,而後等待執行完成,這十分不方便,雖然有效,但還有更好的解決辦法。
# OPTION-C import time import asyncio from concurrent.futures import ThreadPoolExecutor as Executor async def main(): print(f'{time.ctime()} Hello!') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye!') loop.stop() def blocking(): time.sleep(2.0) print(f"{time.ctime()} Hello from a thread!") loop = asyncio.get_event_loop() executor = Executor() # 1 loop.set_default_executor(executor) # 2 loop.create_task(main()) future = loop.run_in_executor(None, blocking) # 3 loop.run_forever() tasks = asyncio.Task.all_tasks(loop=loop) group = asyncio.gather(*tasks, return_exceptions=True) group.cancel() loop.run_until_complete(group) executor.shutdown(wait=True) # 4 loop.close()
創建本身的executor實例;
將其設定爲loop的默認executor;
像之前同樣;
明確地在loop關閉前等待executor的全部Future執行完,這能夠避免"Event loop is closed"這樣的錯誤信息,能這樣作是由於得到了使用executor的權限,而asyncio默認的executor沒有開放相應的接口調用。
如今能夠在任何地方調用run_in_executor()
,而且程序能夠優雅地退出了。