Python中的多線程編程,線程安全與鎖(二) Python中的多線程編程,線程安全與鎖(一)

在個人上篇博文Python中的多線程編程,線程安全與鎖(一)中,咱們熟悉了多線程編程與線程安全相關重要概念, Threading.Lock實現互斥鎖的簡單示例兩種死鎖(迭代死鎖和互相等待死鎖)狀況及處理。今天咱們將聚焦於Python的Threading模塊總結線程同步問題。html

1. Threading模塊總結python

1.1 Threading模塊概覽編程

threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。python當前版本的多線程庫沒有實現優先級、線程組,線程也不能被中止、暫停、恢復、中斷。安全

threading模塊提供的類:  
  Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。多線程

threading 模塊提供的經常使用方法: 
  threading.currentThread(): 返回當前的線程變量。 
  threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 
  threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。app

threading 模塊提供的常量:函數

  threading.TIMEOUT_MAX 設置threading全局超時時間。post

1.2 Thread類ui

Thread是線程類,有兩種使用方法,直接傳入要運行的方法或從Thread繼承並覆蓋run()。推薦使用方法一,將目標函數做爲target參數傳入,很是簡單實用url

# coding:utf-8
import threading
import time
#方法一:將要執行的方法做爲參數傳給Thread的構造方法
def action(arg):
    time.sleep(1)
    print 'the arg is:%s\r' %arg

for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.start()

print 'main thread end!'

#方法二:從Thread繼承,並重寫run()
class MyThread(threading.Thread):
    def __init__(self,arg):
        super(MyThread, self).__init__()#注意:必定要顯式的調用父類的初始化函數。
        self.arg=arg
    def run(self):#定義每一個線程要運行的函數
        time.sleep(1)
        print 'the arg is:%s\r' % self.arg

for i in xrange(4):
    t =MyThread(i)
    t.start()

print 'main thread end!'

相關方法:

構造方法: 
Thread(group=None, target=None, name=None, args=(), kwargs={}) 

  group: 線程組,目前尚未實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 線程名; 
  args/kwargs: 要傳入方法的參數。

實例方法: 
  isAlive(): 返回線程是否在運行。正在運行指啓動後、終止前。 
  get/setName(name): 獲取/設置線程名。 

  start():  線程準備就緒,等待CPU調度
  is/setDaemon(bool): 將該子線程設置爲父線程的守護線程(默認爲非守護線程(False))。(須要在線程start以前設置)

    關於「守護」的含義,咱們能夠這樣理解,子線程爲父線程的守護線程,意思是說子線程要守着父線程,一旦父線程執行完畢,也就不須要「守護」了,因此此時子線程就要結束。

    True: 設置該子進程爲父進程的守護進程,即後臺線程。主線程執行過程當中,子線程也在進行,主線程執行完畢後,子線程不論成功與否,主線程和子線程均中止。
         False:設置該子進程爲父進程的非守護進程,即前臺進程。主線程執行過程當中,子線程也在進行,主線程代碼執行完畢後,仍須要等待子線程也執行完成後,主線程纔會中止。
  start(): 啓動線程。 
  join([timeout]): 阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。

1.2.1 關鍵參數setDaemon

該參數是設置線程屬性,規定當前線程是否屬於守護線程(默認爲非守護線程(False))。(須要在線程start以前設置)

  • True: 設置該子進程爲父進程的守護進程,即後臺線程。主線程執行過程當中,子線程也在進行,主線程執行完畢後,子線程不論成功與否,主線程和子線程均中止。
  • False:設置該子進程爲父進程的非守護進程,即前臺進程。主線程執行過程當中,子線程也在進行,主線程代碼執行完畢後,仍須要等待子線程也執行完成後,主線程纔會中止。

關於setDaemon,默認子線程屬於非守護線程,即主線程要等待全部子線程執行完以後,才中止程序。

# coding:utf-8
import threading
import time

def action(arg):
    time.sleep(1)
    print  'sub thread start!the thread name is:%s\r' % threading.currentThread().getName()
    print 'the arg is:%s\r' %arg
    time.sleep(1)

for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.start()

print 'main_thread end!'

main_thread end!
sub thread start!the thread name is:Thread-2
the arg is:1
the arg is:0
sub thread start!the thread name is:Thread-4
the arg is:2
the arg is:3
Process finished with exit code 0
能夠看出,建立的4個「前臺」線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止

該例子驗證了setDeamon(False)(默認)非守護線程,主線程執行過程當中,子線程也在進行,主線程執行完畢後,等待子線程也執行完成後,主線程中止。

設置setDeamon=True時:

# coding:utf-8
import threading
import time

def action(arg):
    time.sleep(1)
    print  'sub thread start!the thread name is:%s\r' % threading.currentThread().getName()
    print 'the arg is:%s\r' %arg
    time.sleep(1)

for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.setDaemon(True)#設置線程爲後臺線程
    t.start()

print 'main_thread end!'



main_thread end!

能夠看出,主線程執行完畢後,後臺線程不論是成功與否,主線程均中止

驗證了setDeamon(True)守護線程,主線程執行過程當中,守護線程也在進行,主線程執行完畢後,子線程不論成功與否,均與主線程一塊兒中止。

1.2.2 關鍵參數join

阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。即當子進程的join()函數被調用時,主線程就被阻塞住了,意思爲再也不繼續往下執行。

值得注意的是,因爲join()會阻塞其餘函數,若是咱們要用for循環觸發多個線程的執行,start()要和join()分開(用兩個for循環,先用第一個for循環將所有子線程start(),再用第二個for循環將所有子線程join),否則會讓多線程並行執行,變成多線程依次執行:

  • 由於若是其餘線程尚未start(),那麼因爲start()操做屬於主線程的調用,那麼start()會被阻塞,咱們本來想要的多線程並行執行會變成多線程依次執行。
  • 若是此時其餘線程已經start()了,那麼join()函數因爲是對子線程的操做,不屬於主線程,則不會被阻塞。
#coding:utf-8
import threading
import time

def action(arg):
    time.sleep(1)
    print  'sub thread start!the thread name is:%s    ' % threading.currentThread().getName()
    print 'the arg is:%s   ' %arg
    time.sleep(1)

thread_list = []    #線程存放列表
for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.setDaemon(True)
    thread_list.append(t)

for t in thread_list:
    t.start()

for t in thread_list:
    t.join()
print("main_thread end!")

#Output: sub thread start!the thread name
is:Thread-2 the arg is:1 sub thread start!the thread name is:Thread-3 the arg is:2 sub thread start!the thread name is:Thread-1 the arg is:0 sub thread start!the thread name is:Thread-4 the arg is:3 main_thread end! Process finished with exit code 0 設置join以後,主線程等待子線程所有執行完成後或者子線程超時後,主線程纔會從被阻塞的地方繼續執行。

驗證了 join()阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout,即便設置了setDeamon(True)主線程依然要等待子線程結束

使用例子(join不穩當的用法,使多線程編程順序執行)

#coding:utf-8
import threading
import time

def action(arg):
    time.sleep(1)
    print  'sub thread start!the thread name is:%s    ' % threading.currentThread().getName()
    print 'the arg is:%s   ' %arg
    time.sleep(1)


for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.setDaemon(True)
    t.start()
    t.join()

print 'main_thread end!'

join不穩當用法

sub thread start!the thread name is:Thread-1    
the arg is:0   
sub thread start!the thread name is:Thread-2    
the arg is:1   
sub thread start!the thread name is:Thread-3    
the arg is:2   
sub thread start!the thread name is:Thread-4    
the arg is:3   
main_thread end!

Process finished with exit code 0
能夠看出此時,程序只能順序執行,每一個線程都被上一個線程的join阻塞,使得「多線程」失去了多線程意義。

運行結果

 

1.3 Timer類

Timer(定時器)是Thread的派生類,用於在指定時間後調用一個方法。

構造方法: 
Timer(interval, function, args=[], kwargs={}) 
  interval: 指定的時間 
  function: 要執行的方法 
  args/kwargs: 方法的參數

實例方法: 
Timer從Thread派生,沒有增長實例方法。

# encoding: UTF-8
import threading


def func(): print (hello timer!) if __name__ == "__main__" timer = threading.Timer(5, func) timer.start()

該例子效果爲延遲5秒執行

2 線程同步

線程同步是藉由threading模塊的如下類實現的:Condition,Event,Local.

2.1 Condition類

咱們先關注一下Condition類通常用於什麼場景:

線程A須要等某個條件成立才能繼續往下執行,如今這個條件不成立,線程A就阻塞等待,而線程B在執行過程當中使這個條件成立了,就喚醒線程A繼續執行。在pthread庫中經過條件變量(Condition Variable)來阻塞等待一個條件,或者喚醒等待這個條件的線程。

通俗的講,Condition類適合於生產者,消費者模型。 即Condition很適合那種主動休眠,被動喚醒的場景。 Condition使用難度要高於mutex,一不注意就會被死鎖,因此很考驗對condition的理解。

     首先咱們知道python下的線程是真實的線程,底層用的是pthread。pthread內部Condition條件變量有兩個關鍵函數, await和signal方法,對應python threading Condition是wait和notify方法。 

     一個 Condition實例的內部實際上維護了兩個隊列一個是等待鎖隊列(實際上mutex內部其實就是維護這個等待鎖隊列) , 另外一個隊列能夠叫等待條件隊列,在這隊列中的節點都是因爲(某些條件不知足而)線程自身調用wait方法阻塞的線程(記住是自身阻塞)。最重要的Condition方法是wait和 notify方法。另外condition還須要lock的支持, 若是你構造函數沒有指定lock,condition會默認給你配一個rlock。   

構造方法: 
Condition([lock/rlock])

實例方法: 
acquire([timeout])/release(): 調用關聯的鎖的相應方法。 
wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。 
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試得到鎖定(進入鎖定池);其餘線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。 
notifyAll(): 調用這個方法將通知等待池中全部的線程,這些線程都將進入鎖定池嘗試得到鎖定。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。

下面是這兩個方法的執行流程。
          wait方法:
                            1. 入列到條件隊列(注意這裏不是等待鎖的隊列)
                            2. 釋放鎖
                            3. 阻塞自身線程
                             ————被喚醒後執行————-
                            4. 嘗試去獲取鎖(執行到這裏時線程已不在條件隊列中,而是位於等待(鎖的)隊列中,參見signal方法)
                                4.1 成功,從await方法中返回,執行線程後面的代碼
                                4.2 失敗,阻塞本身(等待前一個節點釋放鎖時將它喚醒)
         注意: 調用wait可讓當前線程休眠,等待其餘線程的喚醒,也就是等待signal,這個過程是阻塞的。 當隊列首線程被喚醒後,會繼續執行await方法中後面的代碼。

         notify(signal)方法:

                           1. 將條件隊列的隊首節點取出,放入等待鎖隊列的隊尾
                           2. 喚醒節點對應的線程.

注: notify ( signal ) 能夠把wait隊列的那些線程給喚醒起來。  

下面給一個生產者-消費者模型的例子:

#/usr/bin/python3
# encoding: UTF-8
import threading
import time

# 商品
product = None
# 條件變量
con = threading.Condition()


# 生產者方法
def produce():
    global product

    if con.acquire():
        while True:
            if product is None:
                print("produce the product...")
                product = 'anything'

                # 通知消費者,商品已經生產
                con.notify()
            else:
                print("Producer: product is not None")
            # 等待通知
            con.wait()
            print("Producer: Resume from wait...")
            time.sleep(2)


# 消費者方法
def consume():
    global product

    if con.acquire():
        while True:
            if product is not None:
                print("consume the product...")
                product = None

                # 通知生產者,商品已經沒了
                con.notify()
            else:
                print("Cosumer: product is None")
            # 等待通知
            con.wait()
            print("Cosumer: Resume from wait...")
            time.sleep(2)


if __name__ == "__main__":
    t1 = threading.Thread(target=produce)
    t2 = threading.Thread(target=consume)
    t2.start()
    t1.start()

結果爲

Cosumer: product is None
produce the product Cosumer: Resume from wait...
cosume the product
Producuer: Resume from wait...
produce the product
Cosumer: Resume from wait...
cosume the product

 注意:con.wait()必定要在if判斷的外面,由於一開始的時候,兩個子線程都得到了鎖。若是沒有con.wait()釋放鎖並等待通知,則另外一個暫時沒有得到鎖權限的線程,會一直被阻塞。整個生產者-消費者模型也就跑不起來了。下面是跑步起來的例子:

#/usr/bin/python3
# encoding: UTF-8
import threading
import time

# 商品
product = None
# 條件變量
con = threading.Condition()


# 生產者方法
def produce():
    global product

    if con.acquire():
        while True:
            if product is None:
                print("produce the product...")
                product = 'anything'

                # 通知消費者,商品已經生產
                con.notify()
                con.wait()
                print("Producer: Resume from wait...")
                time.sleep(2)
            else:
                print("Producer: product is not None")
            # 等待通知
            #con.wait()
            #print("Producer: Resume from wait...")
           # time.sleep(2)


# 消費者方法
def consume():
    global product

    if con.acquire():
        while True:
            if product is not None:
                print("consume the product...")
                product = None

                # 通知生產者,商品已經沒了
                con.notify()
                con.wait()
                print("Cosumer: Resume from wait...")
                time.sleep(2)
            else:
                print("Cosumer: product is None")
            # 等待通知
            #con.wait()
           # print("Cosumer: Resume from wait...")
           # time.sleep(2)


if __name__ == "__main__":
    t1 = threading.Thread(target=produce)
    t2 = threading.Thread(target=consume)
    t2.start()
    t1.start()

結果爲:

Cosumer: Product is None
Cosumer: Product is None
Cosumer: Product is None
Cosumer: Product is None
Cosumer: Product is None
Cosumer: Product is None
Cosumer: Product is None
Cosumer: Product is None
Cosumer: Product is None
Cosumer: Product is None

 

2.2 Event類

Event(事件)是最簡單的線程通訊機制之一一個線程通知事件,其餘線程等待事件Event內置了一個初始爲False的標誌,當調用set()時設爲True,調用clear()時重置爲 False。wait()將阻塞線程至等待阻塞狀態。

  Event其實就是一個簡化版的 Condition。Event沒有鎖,沒法使線程進入同步阻塞狀態

構造方法: 
Event()

實例方法: 
  isSet(): 當內置標誌爲True時返回True。 
  set(): 將標誌設爲True,並通知全部處於等待阻塞狀態的線程恢復運行狀態。 
  clear(): 將標誌設爲False。 
  wait([timeout]): 若是標誌爲True將當即返回,不然阻塞線程至等待阻塞狀態,等待其餘線程調用set()。

下面是例子:

# encoding: UTF-8
import threading
import time

event = threading.Event()


def func():
    # 等待事件,進入等待阻塞狀態
    print '%s wait for event...' % threading.currentThread().getName()
    event.wait()

    # 收到事件後進入運行狀態
    print '%s recv event.' % threading.currentThread().getName()

if __name__ == "__main__":
t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2)
print 'MainThread set event.' 
event.set()

結果爲

Thread-1 wait for event...
Thread-2 wait for event...

#2秒後。。。
MainThread set event.
Thread-1 recv event.
Thread-2 recv event.

 

2.3 Local類

local是一個小寫字母開頭的類,用於管理 thread-local(線程局部的)數據。對於同一個local,線程沒法訪問其餘線程設置的屬性;線程設置的屬性不會被其餘線程設置的同名屬性替換

  能夠把local當作是一個「線程-屬性字典」的字典,local封裝了從自身使用線程做爲 key檢索對應的屬性字典、再使用屬性名做爲key檢索屬性值的細節。

# encoding: UTF-8
import threading
 
local = threading.local()
local.tname = 'main'
 
def func():
    local.tname = 'notmain'
    print local.tname

if __name__ == "__main__":
    t1 = threading.Thread(target=func)
    t1.start()
    t1.join()
    print(local.tname)
相關文章
相關標籤/搜索