在個人上篇博文Python中的多線程編程,線程安全與鎖(一)中,咱們熟悉了多線程編程與線程安全相關重要概念, Threading.Lock實現互斥鎖的簡單示例,兩種死鎖(迭代死鎖和互相等待死鎖)狀況及處理。今天咱們將聚焦於Python的Threading模塊總結和線程同步問題。html
1. Threading模塊總結python
1.1 Threading模塊概覽編程
threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。python當前版本的多線程庫沒有實現優先級、線程組,線程也不能被中止、暫停、恢復、中斷。安全
threading模塊提供的類:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。多線程
threading 模塊提供的經常使用方法:
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。app
threading 模塊提供的常量:函數
threading.TIMEOUT_MAX 設置threading全局超時時間。post
1.2 Thread類ui
Thread是線程類,有兩種使用方法,直接傳入要運行的方法或從Thread繼承並覆蓋run()。推薦使用方法一,將目標函數做爲target參數傳入,很是簡單實用。url
# coding:utf-8 import threading import time #方法一:將要執行的方法做爲參數傳給Thread的構造方法 def action(arg): time.sleep(1) print 'the arg is:%s\r' %arg for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.start() print 'main thread end!' #方法二:從Thread繼承,並重寫run() class MyThread(threading.Thread): def __init__(self,arg): super(MyThread, self).__init__()#注意:必定要顯式的調用父類的初始化函數。 self.arg=arg def run(self):#定義每一個線程要運行的函數 time.sleep(1) print 'the arg is:%s\r' % self.arg for i in xrange(4): t =MyThread(i) t.start() print 'main thread end!'
相關方法:
構造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
group: 線程組,目前尚未實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 線程名;
args/kwargs: 要傳入方法的參數。
實例方法:
isAlive(): 返回線程是否在運行。正在運行指啓動後、終止前。
get/setName(name): 獲取/設置線程名。
start(): 線程準備就緒,等待CPU調度
is/setDaemon(bool): 將該子線程設置爲父線程的守護線程(默認爲非守護線程(False))。(須要在線程start以前設置)
關於「守護」的含義,咱們能夠這樣理解,子線程爲父線程的守護線程,意思是說子線程要守着父線程,一旦父線程執行完畢,也就不須要「守護」了,因此此時子線程就要結束。
True: 設置該子進程爲父進程的守護進程,即後臺線程。主線程執行過程當中,子線程也在進行,主線程執行完畢後,子線程不論成功與否,主線程和子線程均中止。
False:設置該子進程爲父進程的非守護進程,即前臺進程。主線程執行過程當中,子線程也在進行,主線程代碼執行完畢後,仍須要等待子線程也執行完成後,主線程纔會中止。
start(): 啓動線程。
join([timeout]): 阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。
1.2.1 關鍵參數setDaemon
該參數是設置線程屬性,規定當前線程是否屬於守護線程(默認爲非守護線程(False))。(須要在線程start以前設置)
關於setDaemon,默認子線程屬於非守護線程,即主線程要等待全部子線程執行完以後,才中止程序。
# coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s\r' % threading.currentThread().getName() print 'the arg is:%s\r' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.start() print 'main_thread end!' main_thread end! sub thread start!the thread name is:Thread-2 the arg is:1 the arg is:0 sub thread start!the thread name is:Thread-4 the arg is:2 the arg is:3 Process finished with exit code 0 能夠看出,建立的4個「前臺」線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止
該例子驗證了setDeamon(False)(默認)非守護線程,主線程執行過程當中,子線程也在進行,主線程執行完畢後,等待子線程也執行完成後,主線程中止。
設置setDeamon=True時:
# coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s\r' % threading.currentThread().getName() print 'the arg is:%s\r' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True)#設置線程爲後臺線程 t.start() print 'main_thread end!' main_thread end! 能夠看出,主線程執行完畢後,後臺線程不論是成功與否,主線程均中止
驗證了setDeamon(True)守護線程,主線程執行過程當中,守護線程也在進行,主線程執行完畢後,子線程不論成功與否,均與主線程一塊兒中止。
1.2.2 關鍵參數join
阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。即當子進程的join()函數被調用時,主線程就被阻塞住了,意思爲再也不繼續往下執行。
值得注意的是,因爲join()會阻塞其餘函數,若是咱們要用for循環觸發多個線程的執行,start()要和join()分開(用兩個for循環,先用第一個for循環將所有子線程start(),再用第二個for循環將所有子線程join),否則會讓多線程並行執行,變成多線程依次執行:
#coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s ' % threading.currentThread().getName() print 'the arg is:%s ' %arg time.sleep(1) thread_list = [] #線程存放列表 for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True) thread_list.append(t) for t in thread_list: t.start() for t in thread_list: t.join()
print("main_thread end!")
#Output: sub thread start!the thread name is:Thread-2 the arg is:1 sub thread start!the thread name is:Thread-3 the arg is:2 sub thread start!the thread name is:Thread-1 the arg is:0 sub thread start!the thread name is:Thread-4 the arg is:3 main_thread end! Process finished with exit code 0 設置join以後,主線程等待子線程所有執行完成後或者子線程超時後,主線程纔會從被阻塞的地方繼續執行。
驗證了 join()阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout,即便設置了setDeamon(True)主線程依然要等待子線程結束。
使用例子(join不穩當的用法,使多線程編程順序執行)
#coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s ' % threading.currentThread().getName() print 'the arg is:%s ' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True) t.start() t.join() print 'main_thread end!' join不穩當用法 sub thread start!the thread name is:Thread-1 the arg is:0 sub thread start!the thread name is:Thread-2 the arg is:1 sub thread start!the thread name is:Thread-3 the arg is:2 sub thread start!the thread name is:Thread-4 the arg is:3 main_thread end! Process finished with exit code 0 能夠看出此時,程序只能順序執行,每一個線程都被上一個線程的join阻塞,使得「多線程」失去了多線程意義。 運行結果
1.3 Timer類
Timer(定時器)是Thread的派生類,用於在指定時間後調用一個方法。
構造方法:
Timer(interval, function, args=[], kwargs={})
interval: 指定的時間
function: 要執行的方法
args/kwargs: 方法的參數
實例方法:
Timer從Thread派生,沒有增長實例方法。
# encoding: UTF-8
import threading
def func(): print (hello timer!) if __name__ == "__main__" timer = threading.Timer(5, func) timer.start()
該例子效果爲延遲5秒執行
2 線程同步
線程同步是藉由threading模塊的如下類實現的:Condition,Event,Local.
2.1 Condition類
咱們先關注一下Condition類通常用於什麼場景:
線程A須要等某個條件成立才能繼續往下執行,如今這個條件不成立,線程A就阻塞等待,而線程B在執行過程當中使這個條件成立了,就喚醒線程A繼續執行。在pthread庫中經過條件變量(Condition Variable)來阻塞等待一個條件,或者喚醒等待這個條件的線程。
通俗的講,Condition類適合於生產者,消費者模型。 即Condition很適合那種主動休眠,被動喚醒的場景。 Condition使用難度要高於mutex,一不注意就會被死鎖,因此很考驗對condition的理解。
首先咱們知道python下的線程是真實的線程,底層用的是pthread。pthread內部Condition條件變量有兩個關鍵函數, await和signal方法,對應python threading Condition是wait和notify方法。
構造方法:
Condition([lock/rlock])
實例方法:
acquire([timeout])/release(): 調用關聯的鎖的相應方法。
wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試得到鎖定(進入鎖定池);其餘線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
notifyAll(): 調用這個方法將通知等待池中全部的線程,這些線程都將進入鎖定池嘗試得到鎖定。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
notify(signal)方法:
注: notify ( signal ) 能夠把wait隊列的那些線程給喚醒起來。
下面給一個生產者-消費者模型的例子:
#/usr/bin/python3 # encoding: UTF-8 import threading import time # 商品 product = None # 條件變量 con = threading.Condition() # 生產者方法 def produce(): global product if con.acquire(): while True: if product is None: print("produce the product...") product = 'anything' # 通知消費者,商品已經生產 con.notify() else: print("Producer: product is not None") # 等待通知 con.wait() print("Producer: Resume from wait...") time.sleep(2) # 消費者方法 def consume(): global product if con.acquire(): while True: if product is not None: print("consume the product...") product = None # 通知生產者,商品已經沒了 con.notify() else: print("Cosumer: product is None") # 等待通知 con.wait() print("Cosumer: Resume from wait...") time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
結果爲
Cosumer: product is None
produce the product Cosumer: Resume from wait...
cosume the product
Producuer: Resume from wait...
produce the product
Cosumer: Resume from wait...
cosume the product
注意:con.wait()必定要在if判斷的外面,由於一開始的時候,兩個子線程都得到了鎖。若是沒有con.wait()釋放鎖並等待通知,則另外一個暫時沒有得到鎖權限的線程,會一直被阻塞。整個生產者-消費者模型也就跑不起來了。下面是跑步起來的例子:
#/usr/bin/python3 # encoding: UTF-8 import threading import time # 商品 product = None # 條件變量 con = threading.Condition() # 生產者方法 def produce(): global product if con.acquire(): while True: if product is None: print("produce the product...") product = 'anything' # 通知消費者,商品已經生產 con.notify() con.wait() print("Producer: Resume from wait...") time.sleep(2) else: print("Producer: product is not None") # 等待通知 #con.wait() #print("Producer: Resume from wait...") # time.sleep(2) # 消費者方法 def consume(): global product if con.acquire(): while True: if product is not None: print("consume the product...") product = None # 通知生產者,商品已經沒了 con.notify() con.wait() print("Cosumer: Resume from wait...") time.sleep(2) else: print("Cosumer: product is None") # 等待通知 #con.wait() # print("Cosumer: Resume from wait...") # time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
結果爲:
Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None
2.2 Event類
Event(事件)是最簡單的線程通訊機制之一:一個線程通知事件,其餘線程等待事件。Event內置了一個初始爲False的標誌,當調用set()時設爲True,調用clear()時重置爲 False。wait()將阻塞線程至等待阻塞狀態。
Event其實就是一個簡化版的 Condition。Event沒有鎖,沒法使線程進入同步阻塞狀態。
構造方法:
Event()
實例方法:
isSet(): 當內置標誌爲True時返回True。
set(): 將標誌設爲True,並通知全部處於等待阻塞狀態的線程恢復運行狀態。
clear(): 將標誌設爲False。
wait([timeout]): 若是標誌爲True將當即返回,不然阻塞線程至等待阻塞狀態,等待其餘線程調用set()。
下面是例子:
# encoding: UTF-8 import threading import time event = threading.Event() def func(): # 等待事件,進入等待阻塞狀態 print '%s wait for event...' % threading.currentThread().getName() event.wait() # 收到事件後進入運行狀態 print '%s recv event.' % threading.currentThread().getName()
if __name__ == "__main__":
t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2)
print 'MainThread set event.'
event.set()
結果爲
Thread-1 wait for event... Thread-2 wait for event... #2秒後。。。 MainThread set event. Thread-1 recv event. Thread-2 recv event.
2.3 Local類
local是一個小寫字母開頭的類,用於管理 thread-local(線程局部的)數據。對於同一個local,線程沒法訪問其餘線程設置的屬性;線程設置的屬性不會被其餘線程設置的同名屬性替換。
能夠把local當作是一個「線程-屬性字典」的字典,local封裝了從自身使用線程做爲 key檢索對應的屬性字典、再使用屬性名做爲key檢索屬性值的細節。
# encoding: UTF-8 import threading local = threading.local() local.tname = 'main' def func(): local.tname = 'notmain' print local.tname if __name__ == "__main__": t1 = threading.Thread(target=func) t1.start() t1.join() print(local.tname)