Asyncio之EventLoop筆記

使用事件循環

Python3.4 採用了一個強大的框架來支持代碼的併發執行: asyncio。這個框架使用事件循環來編排回調和異步任務。
事件循環位於事件循環策略的上下文中-這是 asyncio 所特有的概念。
下圖是協程,事件循環和策略之間的相互做用
Coroutines, event loops, and policies
協程能夠被認爲是能夠在明確標記有某種語法元素的階段「暫停」的函數.
經過任務對象跟蹤協程的狀態,由相應的事件循環實例化。 事件循環跟蹤當前正在運行的任務,並將 CPU 時間從空閒協程委派給待處理協議。在本章中,咱們將更多地瞭解事件循環接口及其生命週期。將討論事件循環策略-以及全局 asyncio API 對它們的影響。或者和其餘異步工做單元(callbacks, promises/futures, and coroutines), 不一樣的事件循環,可是事件循環是區別於操做系統的。html

定位當前正在運行的循環

存在問題

因爲各類緣由,併發框架必須可以告訴您事件循環當前是否正在運行以及它是哪個。例如,您的代碼可能必須斷言只有一個特定的循環實現正在運行您的任務。所以,只有一個任務能夠改變某些共享資源或確保將調度您的回調python

解決方案

使用全局 asyncio.get_event_loop 和 asyncio.get_running_loop 的 api。
代碼示例 1windows

import asyncio
loop = asyncio.get_event_loop()

輸出api

<_UnixSelectorEventLoop running=False closed=False debug=False>
#windows 輸出
<_WindowsSelectorEventLoop running=False closed=False debug=False>

代碼示例 2promise

import asyncio
try:
    loop = asyncio.get_running_loop()
except RuntimeError:
    print("No loop running")

在 Python 3.7 中,有兩種有效的方法來獲取當前正在運行的循環實例。
咱們能夠調用 asyncio.get_event_loop 或 asyncio.get_running_loop
但 asyncio.get_event_loop 內部是作了什麼?大概下面幾點
1.檢查在調用函數時是否有循環運行
2.返回其 pid 與當前進程 pid 匹配的運行循環(若是有)
3.若是沒有,獲取存儲在 asynci omodule 中的全局變量中的線程全局 LoopPolicy 實例。
4.若是沒有設置它,則使用鎖用 DefaultLoopPolicy 實例化它。(_init_event_loop_policy 方法)
5.注意,DefaultLoopPolicy 是依賴於操做系統的子類 BaseDefaultEventLoopPolicy,它提供了一個默認的循環實現。獲取被調用的事件循環
6.這是有個問題:僅在主線程上實例化循環並將其分配給線程局部變量時纔會使用 loop_policy.get_event_loop 方法。
若是你不在主線程上而且沒有經過其餘方式實例化運行循環,則會引起 RuntimeError併發

這個過程有一些問題app

  • get_event_loop 檢查是否存在並返回當前運行的循環
  • 事件循環策略是全局存儲線程,而循環實例是本地存儲線程
  • 若是你在主線程上,get_event_loop 方法將實例化該循環並在策略中本地保存實例線程。
  • 若是你不在主線程上,它將引起 RuntimeError
    asyncio.get_running_loop 的工做方式不一樣。 若是有一個正在運行,它將始終返回當前正在運行的循環實例。 若是沒有,則會引起 RuntimeError。框架

    建立一個新的循環實例

    存在問題

    因爲 asyncio 中的循環與循環策略的概念緊密耦合,所以不建議經過循環構造函數建立循環實例。
    不然,咱們可能會遇到範圍問題,由於全局 asyncio.get_event_loop 函數只檢索本身建立的循環或經過 asyncio.set_event_loop 設置的循環。dom

    解決方案

    要建立一個新的事件循環實例,咱們將使用 asyncio.new_event_loop 的 API
    注意:此 api 不會更改當前安裝的事件循環,但會初始化(asyncio)全局事件循環策略 - 若是以前未初始化的話。
    另外一個問題是咱們將新建立的循環附加到事件循環策略的觀察程序,以確保咱們的事件循環監視 UNIX 系統上新生成的子進程的終止
import asyncio
import sys
loop = asyncio.new_event_loop()
print(loop)  # Print the loop
asyncio.set_event_loop(loop)
if sys.platform != "win32":
    watcher = asyncio.get_child_watcher()
    watcher.attach_loop(loop)

上面的代碼怎麼運行的呢
若是從主線程調用,那麼 asyncio.get_event_loop 應用程序接口僅實例化該循環
下面是一個循環綁定到線程的例子異步

import asyncio
import threading
from functools import partial


def _worker(worker, *args, **kwargs):
    # 循環存在於循環策略的上下文中。DefaultLoopPolicy 對每一個線程的循環進行限定,
    # 不容許經過 asyncio.get_event_loop 在主線程以外建立循環
    # 所以,咱們必須經過 asyncio.set_event_loop(asyncio.new_event_loop())建立一個線程本地事件循環。
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(worker(*args, **kwargs))
    finally:
        loop.close()


def create_event_loop_thread(worker, *args, **kwargs):
    return threading.Thread(target=partial(_worker, worker), args=args, kwargs=kwargs)


async def print_coro(*args, **kwargs):
    print(f"Inside the print coro on {threading.get_ident()}:", (args, kwargs))


def start_threads(*threads):
    [t.start() for t in threads if isinstance(t, threading.Thread)]


def join_threads(*threads):
    [t.join() for t in threads if isinstance(t, threading.Thread)]


def main():
    workers = [create_event_loop_thread(print_coro) for i in range(10)]
    start_threads(*workers)
    join_threads(*workers)


if __name__ == '__main__':
    main()

將循環附加到進程

使用更高級的 multiprocessing 模塊,咱們能夠構建一個跨平臺的解決方案,在流程本地事件循環中運行多個協程。
這樣咱們就能夠規避 GIL 強加的 CPython 限制,並利用 asyncio 來提升 I/O 密集型任務的單核 CPU 使用率。

###協程附加到進程
import asyncio
import os
import random
import typing
from multiprocessing import Process

processes = []


def cleanup():
    global processes
    while processes:
        proc = processes.pop()
        try:
            proc.join()
        except KeyboardInterrupt:
            # Ctrl+C 終止進程
            proc.terminate()


async def worker():
    random_delay = random.randint(0, 3)
    result = await asyncio.sleep(random_delay, result=f"Working in process: {os.getpid()}")
    print(result)


def process_main(coro_worker: typing.Callable, num_of_coroutines: int, ):
    """
    在單獨的進程中運行多個協程的進程類。將在每一個進程中運行的函數
    建議使用 asyncio.run 而不是實例化本身的事件循環。
     此示例僅用於說明如何在不一樣進程中實例化事件循環!
    :param coro_worker:
    :param num_of_coroutines:
    :return:
    """
    loop = asyncio.new_event_loop()
    try:
        workers = [coro_worker() for _ in range(num_of_coroutines)]
        loop.run_until_complete(asyncio.gather(*workers, loop=loop))
    except KeyboardInterrupt:
        print(f"Stoping {os.getpid()}")
        loop.stop()
    finally:
        loop.close()


def main(processes, num_procs, num_coros, process_main):
    for _ in range(num_procs):
        proc = Process(target=process_main, args=(worker, num_coros))
        processes.append(proc)
        proc.start()


if __name__ == '__main__':
    try:
        main(processes, 10, 2, process_main, )
    except KeyboardInterrupt:
        print("Ctrl+C 中止運行")
    finally:
        cleanup()
        print("CleanUp finished")

此示例說明如何編寫使用多處理的應用程序。

運行異步代碼而不用擔憂循環

若是不想費心修改循環策略和清理異步生成器以後的代碼(您將在下一章中瞭解它們),請使用如下代碼。
若是你只有一個線程和進程,而且只有一個協程須要從頭至尾運行,這也很好。

import asyncio
async def main():    
    pass
asyncio.run(main())

在 Python3.6 你可使用如下方法

import asyncio


async def main():
    pass


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    try:
        # 清理任何沒有徹底消耗的異步生成器。
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        loop.close()

若是代碼可能運行在線程中,須要使用下面的方式

import asyncio
import sys


async def main():
    pass


loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if sys.platform != "win32":
    # 返回當前策略的當前子監視器。
    watcher = asyncio.get_child_watcher()
    # 給一個事件循環綁定監視器。
    # 若是監視器以前已綁定另外一個事件循環,那麼在綁定新循環前會先解綁原來的事件循環。
    watcher.attach_loop(loop)
    try:
        loop.run_forever()
    finally:
        try:
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            loop.close()

判斷是否只有一個事件循環

import asyncio
async def main(loop):
    assert loop == asyncio.get_running_loop()
    print("ok")

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

或者下面這種

import asyncio

async def main():
    pass

loop = asyncio.get_event_loop()
# 經過使用 loop.create_task API,可確保協程將在特定循環上運行。
task = loop.create_task(main())
task.add_done_callback(lambda fut: loop.stop())
loop.run_forever()

中止和關閉循環

import asyncio
import functools


async def main(loop):
    print("Print in main")


def stop_loop(fut, *, loop):
    loop.call_soon_threadsafe(loop.stop)


loop = asyncio.get_event_loop()
tasks = [loop.create_task(main(loop)) for _ in range(10)]
# 爲了可以正確地中止循環,咱們須要確保已經消耗了全部任務,所以咱們經過調用 asyncio.gather 來包裝它們並向其 add_done_callback,這將關閉咱們的循環。
asyncio.gather(*tasks).add_done_callback(functools.partial(stop_loop, loop=loop))
try:
    loop.run_forever()
finally:
    try:
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        loop.close()

添加循環信號處理程序

咱們經過 loop.add_signal_handler 添加一個新的信號處理程序。添加信號處理器。它相似於信號 API, 在這種狀況下,咱們決定在每一個處理程序結束時中止循環。
若是要爲示例添加另外一個處理程序,只需將信號名稱添加到 SIGNAL_NAMES 以及以此方式命名的相應處理程序.

import asyncio
import functools
import os
import signal

SIGNAL_NAMES = ('SIGINT', 'SIGTERM')
SIGNAL_NAME_MESSAGE = " or ".join(SIGNAL_NAMES)


def sigint_handler(signame, *, loop, ):
    print(f"Stopped loop because of {signame}")
    loop.stop()


def sigterm_handler(signame, *, loop, ):
    print(f"Stopped loop because of {signame}")
    loop.stop()


loop = asyncio.get_event_loop()

for signame in SIGNAL_NAMES:
    loop.add_signal_handler(getattr(signal, signame),
                            functools.partial(locals()[f"{signame.lower()}_handler"], signame, loop=loop))

print("Event loop running forever, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send {SIGNAL_NAME_MESSAGE} to exit.")
try:
    loop.run_forever()
finally:
    loop.close()  # optional

爲何不直接使用 signal API 在循環迭代過程當中檢查添加到循環中的信號處理程序呢?由於,當它關閉時,不可能向循環添加信號處理程序.另外一個好處是,當循環關閉時,信號處理程序會爲您清理。

從循環生成子進程

異步生成子流程並在單獨的部分中有效地分割建立和狀態管理是使用循環生成子流程的緣由之一。
下面的解決方案對於異步子流程 api 的大多數非交互式使用已經足夠了。
經過在 Windows 系統上設置適當的事件循環策略,它具備跨平臺的優勢。

import asyncio
import shutil
import sys
from typing import Tuple, Union


async def invoke_command_async(*command, loop, encoding="UTF-8", decode=True) -> Tuple[
    Union[str, bytes], Union[str, bytes], int]:
    """
    Invoke a command asynchronously and return the stdout, stderr and the process return code.
    :param command:
    :param loop:
    :param encoding:
    :param decode:
    :return:
    """
    if sys.platform != 'win32':
        # 若是不是 windows 系統,防止有線程的使用
        asyncio.get_child_watcher().attach_loop(loop)
    process = await asyncio.create_subprocess_exec(*command,
                                                   stdout=asyncio.subprocess.PIPE,
                                                   stderr=asyncio.subprocess.PIPE,
                                                   loop=loop)
    out, err = await process.communicate()

    ret_code = process.returncode

    if not decode:
        return out, err, ret_code

    output_decoded, err_decoded = out.decode(encoding) if out else None, \
                                  err.decode(encoding) if err else None

    return output_decoded, err_decoded, ret_code


async def main(loop):
    # shutil 返回路徑 cmd 裏可執行文件的路徑。

    out, err, ret_code = await invoke_command_async(shutil.which("ping"), "-c", "1", "8.8.8.8", loop=loop)
    print(out, err, ret_code)


if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

等待子進程終止

爲了確保咱們能夠在 Windows 下等待子進程的終止,咱們將輪詢子進程以得到進程返回代碼,該代碼指示已終止的子進程。

import asyncio

# Quote from https://docs.python.org/3/library/asyncio-subprocess.html:
# 在從其餘線程執行子進程以前,必須在主線程中實例化子監視器
# 調用主線程中的 get_child_watcher()函數來實例化子監視器
import functools
import shutil
import sys

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())


def stop_loop(*args, loop, **kwargs):
    loop.stop()


async def is_windows_process_alive(process, delay=0.5):
    """
    On windows the signal API is very sparse, meaning we don't have SIGCHILD.
    So we just check if we have a return code on our process object.
    :param process:
    :param delay:
    :return:
    """
    while process.returncode is None:
        await asyncio.sleep(delay)


async def main(process_coro, *, loop):
    process = await process_coro
    print(process)
    if sys.platform != "win32":
        child_watcher: asyncio.AbstractChildWatcher = asyncio.get_child_watcher()
        # 觀察者鏈接到循環並方便地爲咱們調用 watcher.add_child_handler
        # 註冊一個新的子處理回調函數。

        child_watcher.add_child_handler(process.pid, functools.partial(stop_loop, loop=loop))
    else:
        await is_windows_process_alive(process)
        loop.stop()


loop = asyncio.get_event_loop()

process_coro = asyncio.create_subprocess_exec(shutil.which("ping"), "-c", "1", "127.0.0.1",
                                              stdout=asyncio.subprocess.DEVNULL,
                                              stderr=asyncio.subprocess.DEVNULL)

loop.create_task(main(process_coro, loop=loop))
loop.run_forever()
相關文章
相關標籤/搜索