論 Python 裝飾器控制函數 Timeout 的正確姿式

一、問題背景

以前在這篇《Python RPC 遠程調用腳本之 RPyC 實踐》中實現過一個小 Demo,經過 RPyC 能夠實現一個簡單的分佈式程序,可是,有過開發經驗的同窗應該一眼就能看出這個程序有個致命缺陷:假如用戶執行了一個很是耗時或者耗資源的程序,那客戶端將永遠沒法獲取結果甚至致使服務端直接宕掉,所以咱們須要對命令的執行時長作出限制,引入 Timeout 機制加強程序健壯性和用戶體驗。html

二、so easy:裝飾器!

若是你剛好看過我以前的這篇《深刻淺出 Python 裝飾器:16 步輕鬆搞定 Python 裝飾器》,那應該很天然的想到,Python 裝飾器最適合這種業務場景了:對函數進行額外功能性包裝,又不侵入主體業務邏輯。python

Timeout 裝飾器的代碼以下:編程

# coding=utf-8
# 測試utf-8編碼
import sys
 
reload(sys)
sys.setdefaultencoding('utf-8')
 
import signal, functools
 
 
class TimeoutError(Exception): pass
 
 
def timeout(seconds, error_message="Timeout Error: the cmd 30s have not finished."):
    def decorated(func):
        result = ""
 
        def _handle_timeout(signum, frame):
            global result
            result = error_message
            raise TimeoutError(error_message)
 
        def wrapper(*args, **kwargs):
            global result
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
 
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
                return result
            return result
 
        return functools.wraps(func)(wrapper)
 
    return decorated
 
 
@timeout(5)  # 限定下面的slowfunc函數若是在5s內不返回就強制拋TimeoutError Exception結束
def slowfunc(sleep_time):
    a = 1
    import time
    time.sleep(sleep_time)
    return a
 
 
# slowfunc(3) #sleep 3秒,正常返回 沒有異常
 
 
print slowfunc(11)  # 被終止

測試用例也正常,可是把這個裝飾器用在文初提到的 RPC 代碼中時,拋了異常:多線程

Traceback (most recent call last):
  File "exec_cmd.py", line 79, in <module>
    exec_cmd(cmd_str)
  File "exec_cmd.py", line 53, in exec_cmd
    results = pool.map(rpc_client, host_port_list)
  File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
ValueError: signal only works in main thread

========= Remote Traceback (1) =========
Traceback (most recent call last):
  File "/opt/soft/python-2.7.10/lib/python2.7/site-packages/rpyc/core/protocol.py", line 305, in _dispatch_request
    res = self._HANDLERS[handler](self, *args)
  File "/opt/soft/python-2.7.10/lib/python2.7/site-packages/rpyc/core/protocol.py", line 535, in _handle_call
    return self._local_objects[oid](*args, **dict(kwargs))
  File "flumeFileMonitor_RPC_Server.py", line 39, in wrapper
    signal.signal(signal.SIGALRM, _handle_timeout)
ValueError: signal only works in main thread

爲了更簡單說明問題,咱們把測試代碼再簡化下:app

# coding=utf-8
#測試utf-8編碼

from time import sleep, time
import sys, threading

reload(sys)
sys.setdefaultencoding('utf-8')

from multiprocessing.dummy import Pool as ThreadPool

@timeout(1)
def processNum(num):
    num_add = num + 1
    # results.append(str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add))
    sleep(2)
    return str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)

def main():
    ts = time()
    pool = ThreadPool(4)
    results = pool.map(processNum, range(4))
    pool.close()
    pool.join()
    for _ in results:
        print _
    print("cost time is: {:.2f}s".format(time() - ts))


if __name__ == "__main__":
    main()

能夠看到報錯是由於 signal 只能用在主線程中,不能用在多線程環境下的子線程中,並且 signal 只能用在 *nix 環境下,不能跨平臺,看到這裏,彷佛這個問題又不那麼容易解決了,看來我們得另闢蹊徑。python2.7

三、另闢蹊徑:線程控制超時

大致邏輯以下:我們啓動新子線程執行指定的方法,主線程等待子線程的運行結果,若在指定時間內子線程還未執行完畢,則判斷爲超時,拋出超時異常,並殺掉子線程;不然未超時,返回子線程所執行的方法的返回值。可是python默認模塊裏是沒有方法能夠殺掉線程的,怎麼辦呢?發現有人已經實現了該KThread類,它繼承了threading.Thread,並添加了kill方法,讓咱們能殺掉子線程。async

先上代碼,而後我會簡述下 KThread類的設計思路:分佈式

from time import sleep, time
import sys, threading


class KThread(threading.Thread):
    """A subclass of threading.Thread, with a kill()

    method.



    Come from:

    Kill a thread in Python:

    http://mail.python.org/pipermail/python-list/2004-May/260937.html

    """

    def __init__(self, *args, **kwargs):

        threading.Thread.__init__(self, *args, **kwargs)

        self.killed = False

    def start(self):

        """Start the thread."""

        self.__run_backup = self.run

        self.run = self.__run  # Force the Thread to install our trace.

        threading.Thread.start(self)

    def __run(self):

        """Hacked run function, which installs the

        trace."""

        sys.settrace(self.globaltrace)

        self.__run_backup()

        self.run = self.__run_backup

    def globaltrace(self, frame, why, arg):

        if why == 'call':

            return self.localtrace

        else:

            return None

    def localtrace(self, frame, why, arg):

        if self.killed:

            if why == 'line':
                raise SystemExit()

        return self.localtrace

    def kill(self):

        self.killed = True


class Timeout(Exception):
    """function run timeout"""


def timeout(seconds):
    """超時裝飾器,指定超時時間

    若被裝飾的方法在指定的時間內未返回,則拋出Timeout異常"""

    def timeout_decorator(func):

        """真正的裝飾器"""

        def _new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs):

            result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs))

        def _(*args, **kwargs):

            result = []

            new_kwargs = {  # create new args for _new_func, because we want to get the func return val to result list

                'oldfunc': func,

                'result': result,

                'oldfunc_args': args,

                'oldfunc_kwargs': kwargs

            }

            thd = KThread(target=_new_func, args=(), kwargs=new_kwargs)

            thd.start()

            thd.join(seconds)

            alive = thd.isAlive()

            thd.kill()  # kill the child thread

            if alive:
                # raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
                try:

                    raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
                finally:
                    return u'function run too long, timeout %d seconds.' % seconds

            else:

                return result[0]

        _.__name__ = func.__name__

        _.__doc__ = func.__doc__

        return _

    return timeout_decorator

而後根據上面的代碼測試結果以下:ide

@timeout(1)
def processNum(num):
    num_add = num + 1
    # results.append(str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add))
    sleep(2)
    return str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)
    
//
function run too long, timeout 1 seconds.
function run too long, timeout 1 seconds.
function run too long, timeout 1 seconds.
function run too long, timeout 1 seconds.
cost time is: 1.17s

看了代碼我們再來聊聊上述 KThread 的設計思路:函數

關鍵點在那個threading.settrace(self.globaltrace),它是用來設置跟蹤調試threading。

看下threading.settrace文檔。須要在線程調用run前設置好,threading.settrace只起一箇中轉做用,它會在線程運行前將self.globaltrace傳給sys.settrace。

threading.settrace(func)

Set a trace function for all threads started from the threading module. The func will be passed to sys.settrace() for each thread, before its run() method is called.

New in version 2.3.

再看下sys.settrace的文檔,英文文檔說明有點長,參照上面代碼看起來應沒什麼問題。

分析下上面的代碼:

    def start(self):
        threading.settrace(self.globaltrace) #線程運行前設置跟蹤過程self.globaltrace
        threading.Thread.start(self)#運行線程

    def globaltrace(self,frame,why,arg):
        if why=='call': #將會調用一個子過程
            return self.localtrace #返回調用子過程的跟蹤過程self.localtrace,並使用子過程跟蹤過程self.localtrace跟蹤子過程運行
        else:
            return None

    def localtrace(self,frame,why,arg):
        if self._willKill and why=='line': #self._willKill本身設置的中斷標識,why爲跟蹤的事件,其中line爲執行一行或多行python代碼
            raise SystemExit() #當中斷標識爲True及將會執行下一行python代碼時,使用SystemExit()中斷線程
        return self.localtrace

這就是中斷線程的整個過程。只是在線程每執行一行代碼將都檢查一下中斷標識,若是須要中斷則返回,不然繼續執行。

四、缺陷

  • 總體的執行效率會慢一點。由於每次執行一句python語句,都會有一個判斷的過程。

  • 由於其本質是使用將函數使用重載的線程來控制,一旦被添加裝飾器的函數內部使用了線程或者子進程等複雜的結構,而這些線程和子進程實際上是沒法得到超時控制的,因此可能致使外層的超時控制無效。

五、函數超時在多線程場景下 2 個常見誤區

  • sleep、wait、join 不能直接用來實現或替代超時功能

尤爲是 join(timeout) 方法裏的 timeout 很容易讓初學者誤解,覺得調用了  join(n) 就是 n 秒後線程超時結束

我們先看下文檔:

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call isAlive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

能夠看到其實 timeout 只是將主線程阻塞,它只告訴join等待子線程運行多久,若是超時後,主線程和子線程仍是各自向下繼續運行,所以你必須調用 isAlive() 來決定是否超時發生——若是子線程還活着, 表示本次 join() 調用超時了。

  • 舉個例子吧:

假設有 10 個線程,每一個線程業務邏輯是 sleep 3s,如今須要整體控制在 2s 內執行完畢,不少初學者可能寫出這樣的代碼:

for i in range(10):
    t = ThreadTest(i)
    thread_arr.append(t)

for i in range(10):
    thread_arr[i].start()

for i in range(10):
    thread_arr[i].join(2)

其實最後你會發現,這段代碼會耗時 20s,由於每一個 join(2) 都是順序執行的,並且沒有真正的超時結束功能。

仍是上一份完整的代碼供你們測試學習使用吧:

# coding=utf-8
# 測試utf-8編碼

from time import sleep, time
import sys, threading
from Queue import Queue
from threading import Thread

reload(sys)
sys.setdefaultencoding('utf-8')


def processNum(num):
    num_add = num + 1
    sleep(3)
    print str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)


class ProcessWorker(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            num = self.queue.get()
            processNum(num)
            self.queue.task_done()


thread_arr = []


def main():
    ts = time()
    queue = Queue()
    for x in range(10):
        worker = ProcessWorker(queue)
        worker.daemon = True
        worker.start()
        thread_arr.append(worker)
    for num in range(10):
        queue.put(num)
    # queue.join()
    for _ in thread_arr:
        _.join(2)
    print("cost time is: {:.2f}s".format(time() - ts))


if __name__ == "__main__":
    main()

好了,今天就先聊到這兒吧,多線程是個永恆的話題,路漫漫其修遠兮~

Refer:

[1] 深刻淺出 Python 裝飾器:16 步輕鬆搞定 Python 裝飾器

http://my.oschina.net/leejun2005/blog/477614?fromerr=rNBm9BiN#OSC_h2_23

[2] Python RPC 遠程調用腳本之 RPyC 實踐

http://my.oschina.net/leejun2005/blog/471624

[3] Python tips: 超時裝飾器, @timeout decorator

http://www.cnblogs.com/fengmk2/archive/2008/08/30/python_tips_timeout_decorator.html

http://bacspot.dip.jp/virtual_link/root/home/kishima/Debian/BACFlex/IVCH/BACFlex_pypage/var_pypage/pub/agent/KThread.py

[4] 可停止的線程

https://sites.google.com/site/mypynotes/skill/cankillthread

[5] Python模塊學習:threading 多線程控制和處理

http://python.jobbole.com/81546/

[6] 一文學會Python多線程編程

http://bit.ly/2c8LF2R

[7] 一文學會Python多進程編程

http://bit.ly/2bLVznb

相關文章
相關標籤/搜索