http://www.javashuo.com/article/p-ymqfsmua-bh.html 的升級版
能夠知道當前是卡在哪個 task 甚至是多少行html
import asyncio import os import queue import signal import time import threading import logging # logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(filename)s : %(levelname)s %(message)s", ) logging.basicConfig(level=logging.DEBUG) # 魔改部分 # 魔改緣由, 在 loop.debug 爲 True 的時候纔會給 loop 設置 _current_handle from asyncio import events class Handle(events.Handle): def _run(self): self._loop._current_handle = self super()._run() events.Handle = Handle # 魔改結束 async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def test(): for i in range(100): print("sleep--", i) time.sleep(1) def handler(signum, frame): print('Signal handler called with signal', signum) raise Exception("Kill the task") signal.signal(signal.SIGTERM, handler) async def main(): await asyncio.gather( test(), factorial("A", 2), factorial("B", 3), factorial("C", 4), return_exceptions=True ) def check(co_name, threshold: int = 60) -> bool: """連續的記錄超過閾值""" i = 0 for item in q: if item == co_name: i += 1 else: break if i >= threshold: return True else: return False def asyncio_monitor(loop, step: int = 1): while not stop: if hasattr(loop._current_handle, "_callback"): callback = loop._current_handle._callback task = getattr(callback, "__self__") # Task co_name = task._coro.cr_code.co_name # task._coro.cr_code.co_name # coro name if check(co_name, 10): # 長時間堵塞, 拋出異常讓 task 結束 if pid != None: os.system(f"kill -{signal.SIGTERM} {pid}") if task._state == "PENDING": q.appendleft(co_name) # info 爲一個的回顯字符串 info = str(getattr(callback, "__self__", callback)) print(info) # # coro = task._coro.co_name # coro name # task._state # futures/_base.py:25 # time.sleep(step) def run_asyncio(loop): global stop # loop.set_debug(True) loop.run_until_complete(main()) stop = True if __name__ == '__main__': pid = os.getpid() print(pid) stop = False q = queue.deque(maxlen=100) loop = asyncio.get_event_loop() t1 = threading.Thread(target=asyncio_monitor, args=(loop,)) t1.start() run_asyncio(loop)