深刻Asyncio(十一)優雅地開始與結束

Startup and Shutdown Graceful

大部分基於asyncio的程序都是須要長期運行、基於網絡的應用,處理這種應用的正確開啓與關閉存在驚人的複雜性。html

開啓相對來講更簡單點,常規作法是建立一個task,而後調用loop.run_forever(),就如第三章QuickStart中的例子同樣。python

一個例外是當啓動監聽服務器時須要通過兩個階段:shell

  1. 爲服務器的啓動建立一個coroutine,而後調用run_until_complete()來初始化並啓動服務器自己;
  2. 經過調用loop.run_forever()來調用main函數。

一般啓動是很簡單的,碰到上述例外狀況,查看官方示例編程

關閉就要複雜得多,以前講過run_forever()調用會阻塞主線程,當執行關閉時,會解除阻塞並執行後續代碼,此時就須要:安全

  1. 收集全部還沒有完成的task對象;
  2. 將他們彙集到一個group任務中;
  3. 取消group任務(須要捕捉CancelledError);
  4. 經過run_until_complete()來等待執行完畢。

在這以後關閉纔算完成,初學者在寫異步代碼時老是極力擺脫的一些錯誤信息好比task還未等待就被關閉了,主要緣由就是遺失了上述步驟中的一個或多個,用個例子來講明。bash

import asyncio

async def f(delay):
    await asyncio.sleep(delay)

loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))    # 任務1執行1秒
t2 = loop.create_task(f(2))    # 任務2執行2秒
loop.run_until_complete(t1)    # 只有任務1被執行完成
loop.close()
λ python3 taskwaring.py
Task was destroyed but it is pending!
task: <Task pending coro=<f() running at taskwaring.py:4> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0312D6D0>()]>>

這個錯誤是說有些任務在loop關閉時還沒完成,這也就是爲何規範的關閉過程要將全部的task收集到一個task中,取消它們而後在loop關閉以前等待取消完成。服務器

再多看些比QuickStart代碼更細節的例子,此次用官方文檔中的echo服務器代碼做爲服務器,經過客戶端代碼來深刻學習。網絡

from asyncio import (
    get_event_loop,
    start_server,
    CancelledError,
    StreamReader,
    StreamWriter,
    Task,
    gather
    )

async def echo(reader: StreamReader, writer: StreamWriter):    # 1
    print('New connection.')
    try:
        while True:    # 2
            data: bytes = await reader.readlines()  # 3
            if data in [b'', b'quit']:
                break
            writer.write(data.upper())  # 4
            await writer.drain()
        print('Leaving Connection.')
    except CancelledError:  # 5
        writer.write_eof()
        print('Cancelled')
    finally:
        writer.close()

loop = get_event_loop()
coro = start_server(echo, '127.0.0.1', 8888, loop=loop)    # 6
server = loop.run_until_complete(coro)  # 7

try:
    loop.run_forever()  # 8
except KeyboardInterrupt:
    print('Shutting Down!')

server.close()  # 9
loop.run_until_complete(server.wait_closed())   # 10

tasks = Task.all_tasks()    # 11
group = gather(*tasks, return_exceptions=True)  # 12
group.cancel()
loop.run_until_complete(group)  # 13
loop.close()
  1. 這個協程用於爲每一個創建的鏈接建立一個協程,使用了Stream的API;app

  2. 爲了保持鏈接,用死循環獲取消息;異步

  3. 從服務器獲取信息;

  4. 將消息的字符所有大寫返回;

  5. 此到處理退出,進行環境退出的清理工做;

  6. 這裏是程序開始的地方,服務器須要單獨循行,start_server方法返回一個corountine,必須在run_until_complete中執行;

  7. 運行coroutine來啓動TCP服務器;

  8. 如今纔開始程序的監聽部分,爲鏈接到服務器的每一個TCP生成一個coroutine來執行echo例程函數,惟一能打斷loop的只能是KeyboardInterrupt異常;

  9. 程序運行到這裏的話,關閉操做已經開始,從如今開始要讓服務器中止接受新的鏈接,第一步是調用server.close();

  10. 第二步是調用server.wait_closed()來關閉那些仍在等待鏈接創建的socket,仍處於活躍狀態的鏈接不會受影響;

  11. 開始關閉task,先收集當前全部等待狀態的task;

  12. 將task彙集到一個group中,而後調用cancel方法,此處的return_exceptions參數後面講;

  13. 運行group這個協程。


要注意的一點是,若是在一個coroutine內部捕捉了一個CancelledError,要注意在異常捕捉代碼中不要建立任何coroutine,all_tasks()沒法感知在run_until_complete()運行階段建立的任何新任務。

return_exceptions=True參數是幹什麼的?

gather()方法有個默認參數是return_exceptions=False,經過默認設置來關閉異常處理是有問題的,很難直接解釋清楚,能夠經過一系列事實來講明:
1. run_until_complete()方法執行Future對象,在關閉期間,執行由gather()方法返回的Future對象;
2. 若是這個Future對象拋出了一個異常,那麼這個異常會繼續向上拋出,致使loop中止;
3. 若是run_until_complete()被用來執行一個group Future對象,任何group內子任務未處理而拋出的異常都會被向上拋出,也包含CancelledError;
4. 若是一部分子任務處理了CancelledError異常,另外一部分未處理,則未處理的那部分的異常也會致使loop中止,這意味着loop在全部tasks完成前就中止了;
5. 在關閉loop時,不但願上述特性被觸發,只是想要全部在group中的task儘快執行結束,也不理會某些task是否拋出異常;
6. 使用gather(*, return_exceptions=True)可讓group將子任務中的異常看成返回值處理,所以不會影響run_until_complete()的執行。

關於捕獲異常不合人意的一點就是某些異常在group內被處理了而沒有被拋出,這對經過結果查找異常、寫logging形成了困難。

import asyncio

async def f(delay):
    await asyncio.sleep(1/delay)    # 傳入值是0就很噁心了
    return delay

loop = asyncio.get_event_loop()
for i in range(10):
    loop.create_task(f(i))
pending = asyncio.Task.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f'Results: {results}')
loop.close()

不設置參數的話就會致使異常被向上拋出,而後loop中止並致使其餘task沒法完成。安全退出是網絡編程最難的問題之一,這對asyncio也是同樣的。

Signals

在上一個例子中演示瞭如何經過KeyboardInterrupt來退出loop,這個異常有效地結束了run_forever()的阻塞,並容許後續代碼得以執行。

KeyboardInterrupt異常等同於SIGINT信號,在網絡服務中最經常使用的中止信號實際上是SIGTERM,而且也是在UNIX shell環境中使用kill指令發出的默認信號。

在UNIX系統中kill指令其實就是發送信號給進程,不加參數地調用就會發送TERM信號使進程安全退出或被忽視掉,一般這不是個好辦法,由於若是進程沒有退出,kill就會發送KILL信號來強制退出,這會致使你的程序沒法可控地結束。

asyncio原生支持處理進程信號,但處理通常信號的複雜度過高(不是針對asyncio),本文不會深刻講解,只會挑一些常見信號來舉例。先看下例:

# shell_signal01.py
import asyncio

async def main():   # 這裏是應用的主體部分,簡單的用一個死循環來表示程序運行
    while True:
        print('<Your app is running>')
        await asyncio.sleep(1)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())    # 這裏與前幾個例子同樣,將coroutine添加到loop中
    try:
        loop.run_forever()
    except KeyboardInterrupt:   # 在本例中,只有Ctrl-C會終止loop,而後像前例中進行善後工做
        print('<Got signal: SIGINT, shutting down.>')
    tasks = asyncio.Task.all_tasks()
    group = asyncio.gather(*tasks, return_exceptions=True)
    group.cancel()
    loop.run_until_complete(group)
    loop.close()

這些很簡單,下面思考一些複雜的功能:
1. 產品須要將SIGINT和SIGTERM都看成中止信號;
2. 須要在應用的main()中處理CancelledError,而且處理異常的代碼也須要一小段時間來運行(例若有一堆網絡鏈接須要關閉);
3. 應用屢次接收中止信號不會出現異常,在接收到一次中止信號後,後續的信號都不做處理。

asyncio提供了足夠粒度的API來處理這些場景。

# shell_signal02.py
import asyncio
from signal import SIGINT, SIGTERM    # 從標準庫中導入信號值

async def main():
    try:
        while True:
            print('<Your app is running>')
            await asyncio.sleep(1)
    except asyncio.CancelledError:  # 1
        for i in range(3):
            print('<Your app is shtting down...>')
            await asyncio.sleep(1)

def handler(sig):   # 2
    loop.stop()    # 3
    print(f'Got signal: {sig}, shtting down.')
    loop.remove_signal_handler(SIGTERM)    # 4
    loop.add_signal_handler(SIGINT, lambda: None)   # 5


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    for sig in (SIGINT, SIGTERM):   # 6
        loop.add_signal_handler(sig, handler, sig)
    loop.create_task(main())
    loop.run_forever()
    tasks = asyncio.Task.all_tasks()
    group = asyncio.gather(*tasks, return_exceptions=True)
    group.cancel()
    loop.run_until_complete(group)
    loop.close()
  1. 如今在coroutine內部處理中止業務,在調用group.cancel()時收到取消信號,在處理關閉loop的run_until_complete階段,main將繼續運行一段時間;

  2. 這是收到信號後的回調函數,它經過add_signal_handler()修改了loop的配置;

  3. 在回調函數開始執行時,首先要中止loop,這使得關閉業務代碼開始執行;

  4. 此時已經開始中止代碼業務,所以移除SIGTERM來忽視後續的中止信號,不然會使中止代碼業務也被終止;

  5. 原理與上面相似,但SIGINT不能簡單地remove,由於KeyboardInterrupt默認是SIGINT信號的handler,須要將SIGINT的handler置空;

  6. 在這裏配置信號的回調函數,都指向handler,所以配置了SIGINT的handler,會覆蓋掉默認的KeyboardInterrupt。


在關閉過程當中等待Executor執行

在QuickStart中有一段代碼使用了阻塞的sleep()調用,當時說明了一個狀況即若是該阻塞調用耗時比loop的執行耗時長時會發生什麼,如今來討論,先放結論,若是不進行人工干預將會獲得一系列errors。

import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()


def blocking():
    time.sleep(1.5)
    print(f"{time.ctime()} Hello from a thread!")


loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
λ python3 quickstart.py
Sun Sep 30 14:11:57 2018 Hello!
Sun Sep 30 14:11:58 2018 Goodbye!
Sun Sep 30 14:11:59 2018 Hello from a thread!
exception calling callback for <Future at 0x36cff70 state=finished returned NoneType>
Traceback (most recent call last):
    ...
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

來看下背後發生了什麼,run_in_executor()返回的是Future而不是Task,這說明它不能被asyncio.Task.all_tasks()感知,因此後續的run_until_complete()也就不會等待這個Future執行完畢。

有三個解決思路,都通過了不一樣程度的權衡,下面逐個過一遍,從不一樣視角觀察事件loop的內涵,思考在程序中相互調用的全部coroutine、線程、子進程的生命週期管理。

第一個思路,將executor放到coroutine中並以此創建一個task。

# OPTION-A
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

async def run_blocking():  # 1
    await loop.run_in_executor(None, blocking)

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.create_task(run_blocking())  # 2
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(group)
loop.close()
  1. 這個想法是run_in_executor返回的Future而不是task,雖然沒法用all_tasks()捕獲,但能夠用await等待一個Future,因此用一個新的coroutine來await在executor中的阻塞調用,這個新的coroutine將被做爲task添加到loop;

  2. 就像運行main同樣將這個coroutine添加到loop中。


上述代碼看起來不錯,除了不能執行任務取消。能夠發現代碼中少了group.cancel(),假若加回來又會獲得Event loop is closed錯誤,甚至不能在run_blocking()中處理CancelledError以便從新await Future,不管作什麼該task都會被取消,但executor會將其內部的sleep執行完。

第二個思路,收集還沒有完成的task,僅取消它們,但在調用run_until_complete()以前要將run_in_executor()生成的Future添加進去。

# OPTION-B
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()
loop.create_task(main())
future = loop.run_in_executor(None, blocking)   # 1
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)   # 2
group_tasks = asyncio.gather(*tasks, return_exceptions=True)
group_tasks.cancel()    # 取消tasks
group = asyncio.gather(group_task, future)  # 3
loop.run_until_complete(group)
loop.close()
  1. 記錄返回的Future;

  2. 此處loop已中止,先得到全部task,注意這裏面沒有executor的Future;

  3. 建立了一個新的group來合併tasks和Future,在這種狀況下executor也能正常退出,而tasks仍然經過正常的cancel來取消。


這個解決辦法在關閉時比較友好,但仍然有缺陷。一般來講,在整個程序中經過某種方式收集全部的executor返回的Future對象,而後與tasks合併,而後等待執行完成,這十分不方便,雖然有效,但還有更好的解決辦法。

# OPTION-C
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor as Executor

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()
executor = Executor()   # 1
loop.set_default_executor(executor)    # 2
loop.create_task(main())
future = loop.run_in_executor(None, blocking)   # 3
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
loop.run_until_complete(group)
executor.shutdown(wait=True)    # 4
loop.close()
  1. 創建本身的executor實例;

  2. 將其設定爲loop的默認executor;

  3. 像之前同樣;

  4. 明確地在loop關閉前等待executor的全部Future執行完,這能夠避免"Event loop is closed"這樣的錯誤信息,能這樣作是由於得到了使用executor的權限,而asyncio默認的executor沒有開放相應的接口調用。

如今能夠在任何地方調用run_in_executor(),而且程序能夠優雅地退出了。

相關文章
相關標籤/搜索