python 限制函數執行時間 代碼實現

# coding=utf-8
import signal
import time


def set_timeout(num, callback):
def wrap(func):
def handle(signum, frame): # 收到信號 SIGALRM 後的回調函數,第一個參數是信號的數字,第二個參數是the interrupted stack frame.
raise RuntimeError

def to_do(*args, **kwargs):
try:
signal.signal(signal.SIGALRM, handle) # 設置信號和回調函數
signal.alarm(num) # 設置 num 秒的鬧鐘
print('start alarm signal.')
r = func(*args, **kwargs)
print('close alarm signal.')
signal.alarm(0) # 關閉鬧鐘
return r
except RuntimeError as e:
callback()

return to_do

return wrap


if __name__ == '__main__':
def after_timeout(): # 超時後的處理函數
print("do something after timeout.")


@set_timeout(2, after_timeout) # 限時 2 秒
def connect(): # 要執行的函數
time.sleep(1) # 函數執行時間,寫大於2的值,可測試超時
return 'connect success.'


print(connect())

注意事項:
  

  電腦系統是win10 64位,在使用python的signal模塊時報錯:「AttributeError: module 'signal' has no attribute 'SIGALRM'」,這是由於signal模塊能夠在linux下正常使用,但在windows下卻有一些限制,在python文檔https://docs.python.org/2/library/signal.html#signal.signal找到了以下解釋:html

"On Windows, signal() can only be called with SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, or SIGTERM. A ValueError will be raised in any other case."python

也就是說在windows下只能使用這幾個信號:linux

  SIGABRT
  SIGFPE
  SIGILL
  SIGINT
  SIGSEGV
  SIGTERM

windows

 

也可經過線程的方式 設置超時 ,主線程設置超時時間 , 殺死子線程api

import timefrom threading import Threadimport inspectimport ctypesdef _async_raise(tid, exctype):    """raises the exception, performs cleanup if needed"""    tid = ctypes.c_long(tid)    if not inspect.isclass(exctype):        exctype = type(exctype)    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))    if res == 0:        raise ValueError("invalid thread id")    elif res != 1:        # """if it returns a number greater than one, you're in trouble,        # and you should call it again with exc=NULL to revert the effect"""        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)        raise SystemError("PyThreadState_SetAsyncExc failed")def stop_thread(thread):    _async_raise(thread.ident, SystemExit)def to_do():    print('子線程開始了')    time.sleep(50)    print('結束了')if __name__ == '__main__':    t = Thread(target=to_do)    t.start()    stop_thread(t)    print('主線程結束')下面這種方案 ,缺陷是,設置好超時後,子線程結束,主線程也不會中止,有可能會下降效率能夠將stop_thread(t)  換爲    t.join( num  秒 )  來控制  超時kill  
相關文章
相關標籤/搜索