以前在這篇《Python RPC 遠程調用腳本之 RPyC 實踐》中實現過一個小 Demo,經過 RPyC 能夠實現一個簡單的分佈式程序,可是,有過開發經驗的同窗應該一眼就能看出這個程序有個致命缺陷:假如用戶執行了一個很是耗時或者耗資源的程序,那客戶端將永遠沒法獲取結果甚至致使服務端直接宕掉,所以咱們須要對命令的執行時長作出限制,引入 Timeout 機制加強程序健壯性和用戶體驗。html
若是你剛好看過我以前的這篇《深刻淺出 Python 裝飾器:16 步輕鬆搞定 Python 裝飾器》,那應該很天然的想到,Python 裝飾器最適合這種業務場景了:對函數進行額外功能性包裝,又不侵入主體業務邏輯。python
Timeout 裝飾器的代碼以下:編程
# coding=utf-8 # 測試utf-8編碼 import sys reload(sys) sys.setdefaultencoding('utf-8') import signal, functools class TimeoutError(Exception): pass def timeout(seconds, error_message="Timeout Error: the cmd 30s have not finished."): def decorated(func): result = "" def _handle_timeout(signum, frame): global result result = error_message raise TimeoutError(error_message) def wrapper(*args, **kwargs): global result signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(seconds) try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return result return functools.wraps(func)(wrapper) return decorated @timeout(5) # 限定下面的slowfunc函數若是在5s內不返回就強制拋TimeoutError Exception結束 def slowfunc(sleep_time): a = 1 import time time.sleep(sleep_time) return a # slowfunc(3) #sleep 3秒,正常返回 沒有異常 print slowfunc(11) # 被終止
測試用例也正常,可是把這個裝飾器用在文初提到的 RPC 代碼中時,拋了異常:多線程
Traceback (most recent call last): File "exec_cmd.py", line 79, in <module> exec_cmd(cmd_str) File "exec_cmd.py", line 53, in exec_cmd results = pool.map(rpc_client, host_port_list) File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get() File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/pool.py", line 567, in get raise self._value ValueError: signal only works in main thread ========= Remote Traceback (1) ========= Traceback (most recent call last): File "/opt/soft/python-2.7.10/lib/python2.7/site-packages/rpyc/core/protocol.py", line 305, in _dispatch_request res = self._HANDLERS[handler](self, *args) File "/opt/soft/python-2.7.10/lib/python2.7/site-packages/rpyc/core/protocol.py", line 535, in _handle_call return self._local_objects[oid](*args, **dict(kwargs)) File "flumeFileMonitor_RPC_Server.py", line 39, in wrapper signal.signal(signal.SIGALRM, _handle_timeout) ValueError: signal only works in main thread
爲了更簡單說明問題,咱們把測試代碼再簡化下:app
# coding=utf-8 #測試utf-8編碼 from time import sleep, time import sys, threading reload(sys) sys.setdefaultencoding('utf-8') from multiprocessing.dummy import Pool as ThreadPool @timeout(1) def processNum(num): num_add = num + 1 # results.append(str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)) sleep(2) return str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add) def main(): ts = time() pool = ThreadPool(4) results = pool.map(processNum, range(4)) pool.close() pool.join() for _ in results: print _ print("cost time is: {:.2f}s".format(time() - ts)) if __name__ == "__main__": main()
能夠看到報錯是由於 signal 只能用在主線程中,不能用在多線程環境下的子線程中,並且 signal 只能用在 *nix 環境下,不能跨平臺,看到這裏,彷佛這個問題又不那麼容易解決了,看來我們得另闢蹊徑。python2.7
大致邏輯以下:我們啓動新子線程執行指定的方法,主線程等待子線程的運行結果,若在指定時間內子線程還未執行完畢,則判斷爲超時,拋出超時異常,並殺掉子線程;不然未超時,返回子線程所執行的方法的返回值。可是python默認模塊裏是沒有方法能夠殺掉線程的,怎麼辦呢?發現有人已經實現了該KThread類,它繼承了threading.Thread,並添加了kill方法,讓咱們能殺掉子線程。async
先上代碼,而後我會簡述下 KThread類的設計思路:分佈式
from time import sleep, time import sys, threading class KThread(threading.Thread): """A subclass of threading.Thread, with a kill() method. Come from: Kill a thread in Python: http://mail.python.org/pipermail/python-list/2004-May/260937.html """ def __init__(self, *args, **kwargs): threading.Thread.__init__(self, *args, **kwargs) self.killed = False def start(self): """Start the thread.""" self.__run_backup = self.run self.run = self.__run # Force the Thread to install our trace. threading.Thread.start(self) def __run(self): """Hacked run function, which installs the trace.""" sys.settrace(self.globaltrace) self.__run_backup() self.run = self.__run_backup def globaltrace(self, frame, why, arg): if why == 'call': return self.localtrace else: return None def localtrace(self, frame, why, arg): if self.killed: if why == 'line': raise SystemExit() return self.localtrace def kill(self): self.killed = True class Timeout(Exception): """function run timeout""" def timeout(seconds): """超時裝飾器,指定超時時間 若被裝飾的方法在指定的時間內未返回,則拋出Timeout異常""" def timeout_decorator(func): """真正的裝飾器""" def _new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs): result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs)) def _(*args, **kwargs): result = [] new_kwargs = { # create new args for _new_func, because we want to get the func return val to result list 'oldfunc': func, 'result': result, 'oldfunc_args': args, 'oldfunc_kwargs': kwargs } thd = KThread(target=_new_func, args=(), kwargs=new_kwargs) thd.start() thd.join(seconds) alive = thd.isAlive() thd.kill() # kill the child thread if alive: # raise Timeout(u'function run too long, timeout %d seconds.' % seconds) try: raise Timeout(u'function run too long, timeout %d seconds.' % seconds) finally: return u'function run too long, timeout %d seconds.' % seconds else: return result[0] _.__name__ = func.__name__ _.__doc__ = func.__doc__ return _ return timeout_decorator
而後根據上面的代碼測試結果以下:ide
@timeout(1) def processNum(num): num_add = num + 1 # results.append(str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)) sleep(2) return str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add) // function run too long, timeout 1 seconds. function run too long, timeout 1 seconds. function run too long, timeout 1 seconds. function run too long, timeout 1 seconds. cost time is: 1.17s
看了代碼我們再來聊聊上述 KThread 的設計思路:函數
關鍵點在那個threading.settrace(self.globaltrace),它是用來設置跟蹤調試threading。
看下threading.settrace文檔。須要在線程調用run前設置好,threading.settrace只起一箇中轉做用,它會在線程運行前將self.globaltrace傳給sys.settrace。
threading.settrace(func)
Set a trace function for all threads started from the threading module. The func will be passed to sys.settrace() for each thread, before its run() method is called.
New in version 2.3.
再看下sys.settrace的文檔,英文文檔說明有點長,參照上面代碼看起來應沒什麼問題。
分析下上面的代碼:
def start(self): threading.settrace(self.globaltrace) #線程運行前設置跟蹤過程self.globaltrace threading.Thread.start(self)#運行線程 def globaltrace(self,frame,why,arg): if why=='call': #將會調用一個子過程 return self.localtrace #返回調用子過程的跟蹤過程self.localtrace,並使用子過程跟蹤過程self.localtrace跟蹤子過程運行 else: return None def localtrace(self,frame,why,arg): if self._willKill and why=='line': #self._willKill本身設置的中斷標識,why爲跟蹤的事件,其中line爲執行一行或多行python代碼 raise SystemExit() #當中斷標識爲True及將會執行下一行python代碼時,使用SystemExit()中斷線程 return self.localtrace
這就是中斷線程的整個過程。只是在線程每執行一行代碼將都檢查一下中斷標識,若是須要中斷則返回,不然繼續執行。
總體的執行效率會慢一點。由於每次執行一句python語句,都會有一個判斷的過程。
由於其本質是使用將函數使用重載的線程來控制,一旦被添加裝飾器的函數內部使用了線程或者子進程等複雜的結構,而這些線程和子進程實際上是沒法得到超時控制的,因此可能致使外層的超時控制無效。
尤爲是 join(timeout) 方法裏的 timeout 很容易讓初學者誤解,覺得調用了 join(n) 就是 n 秒後線程超時結束
我們先看下文檔:
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call isAlive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
能夠看到其實 timeout 只是將主線程阻塞,它只告訴join等待子線程運行多久,若是超時後,主線程和子線程仍是各自向下繼續運行,所以你必須調用 isAlive() 來決定是否超時發生——若是子線程還活着, 表示本次 join() 調用超時了。
假設有 10 個線程,每一個線程業務邏輯是 sleep 3s,如今須要整體控制在 2s 內執行完畢,不少初學者可能寫出這樣的代碼:
for i in range(10): t = ThreadTest(i) thread_arr.append(t) for i in range(10): thread_arr[i].start() for i in range(10): thread_arr[i].join(2)
其實最後你會發現,這段代碼會耗時 20s,由於每一個 join(2) 都是順序執行的,並且沒有真正的超時結束功能。
仍是上一份完整的代碼供你們測試學習使用吧:
# coding=utf-8 # 測試utf-8編碼 from time import sleep, time import sys, threading from Queue import Queue from threading import Thread reload(sys) sys.setdefaultencoding('utf-8') def processNum(num): num_add = num + 1 sleep(3) print str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add) class ProcessWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: num = self.queue.get() processNum(num) self.queue.task_done() thread_arr = [] def main(): ts = time() queue = Queue() for x in range(10): worker = ProcessWorker(queue) worker.daemon = True worker.start() thread_arr.append(worker) for num in range(10): queue.put(num) # queue.join() for _ in thread_arr: _.join(2) print("cost time is: {:.2f}s".format(time() - ts)) if __name__ == "__main__": main()
好了,今天就先聊到這兒吧,多線程是個永恆的話題,路漫漫其修遠兮~
[1] 深刻淺出 Python 裝飾器:16 步輕鬆搞定 Python 裝飾器
http://my.oschina.net/leejun2005/blog/477614?fromerr=rNBm9BiN#OSC_h2_23
[2] Python RPC 遠程調用腳本之 RPyC 實踐
http://my.oschina.net/leejun2005/blog/471624
[3] Python tips: 超時裝飾器, @timeout decorator
http://www.cnblogs.com/fengmk2/archive/2008/08/30/python_tips_timeout_decorator.html
[4] 可停止的線程
https://sites.google.com/site/mypynotes/skill/cankillthread
[5] Python模塊學習:threading 多線程控制和處理
http://python.jobbole.com/81546/
[6] 一文學會Python多線程編程
[7] 一文學會Python多進程編程