python 的threading模塊

其中Threadpython

是你主要的線程類,能夠建立進程實例。該類提供的函數包括:算法

getName(self) 返回線程的名字編程

isAlive(self) 布爾標誌,表示這個線程是否還在運行中安全

isDaemon(self) 返回線程的daemon標誌,將線程放入後臺執行多線程

join(self, timeout=None) 程序掛起,直到線程結束,若是給出timeout,則最多阻塞timeout主線程需等待子線程完成app

run(self) 定義線程的功能函數函數

setDaemon(self, daemonic) 把線程的daemon標誌設爲daemonicui

setName(self, name) 設置線程的名字spa

start(self) 開始線程執行線程

 

其中Queue提供的類

Queue隊列

LifoQueue後入先出(LIFO)隊列

PriorityQueue 優先隊列

 

接下來,咱們將會用一個一個示例來展現threading的各個功能,包括但不限於:兩種方式起線程、threading.Thread類的重要函數、使用Lock互斥及RLock實現重入鎖、使用Condition實現生產者和消費者模型、使用EventSemaphore多線程通訊

 

兩種方式起線程

Python中咱們主要是經過threadthreading這兩個模塊來實現的,其中Pythonthreading模塊是對thread作了一些包裝的,能夠更加方便的被使用,因此咱們使用threading模塊實現多線程編程。通常來講,使用線程有兩種模式,一種是建立線程要執行的函數,把這個函數傳遞進Thread對象裏,讓它來執行;另外一種是直接從Thread繼承,建立一個新的class,把線程執行的代碼放到這個新的 class裏。

 

方法一:

例一:

#!/usr/bin/python

import threading

import time

 

def worker():

    print "worker"

    time.sleep(1)

    return

 

for i in xrange(5):

t = threading.Thread(target=worker)

#threading.setDaemon()的使用。設置後臺進程。

t.setDaemon(True)

t.start()

#threading.activeCount()的使用,此方法返回當前進程中線程的個數

print "current has %d threads" % (threading.activeCount() - 1)

#threading.enumerate()的使用。此方法返回當前運行中的Thread對象列表。

for item in threading.enumerate():

print item

 

例二:

import threading def thread_fun(num):    for n in range(0, int(num)):        print " I come from %s, num: %s" %( threading.currentThread().getName(), n) def main(thread_num):    thread_list = list();    # 先建立線程對象    for i in range(0, thread_num):        thread_name = "thread_%s" %i        thread_list.append(threading.Thread(target = thread_fun, name = thread_name, args = (20,)))     # 啓動全部線程    for thread in thread_list:        thread.start()     主線程中等待全部子線程退出    for thread in thread_list:        thread.join() if __name__ == "__main__":    main(3)

程序啓動了3個線程,而且打印了每個線程的線程名字,這個比較簡單吧,處理重複任務就派出用場了,下面介紹使用繼承threading的方式;

 

方法二:

繼承自threading.Thread:

import threading class MyThread(threading.Thread):    def __init__(self):

#MyThread的初始化和threading.Thread同樣        threading.Thread.__init__(self);     def run(self):#重載run方法        print "I am %s" %self.name if __name__ == "__main__":    for thread in range(0, 5):        t = MyThread()#自動調用類的run函數        t.start()

threading.Thread類的重要函數

介紹threading模塊中的主類Thread的一些主要方法,實例代碼以下:

import threading class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        print "I am %s" % (self.name) if __name__ == "__main__":    for i in range(0, 5):        my_thread = MyThread()        my_thread.start()

1name相關
你能夠爲每個thread指定name,默認的是Thread-No形式的,如上述實例代碼打印出的同樣:

    I am Thread-1    I am Thread-2    I am Thread-3    I am Thread-4    I am Thread-5

固然你能夠指定每個threadname,這個經過setName方法,代碼:

def __init__(self):     threading.Thread.__init__(self)     self.setName("new" + self.name)

2join方法
join方法原型以下,這個方法是用來阻塞當前上下文,直至該線程運行結束:


def join(self, timeout=None):

timeout能夠設置超時時間

三、setDaemon方法
當咱們在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程就分兵兩路,當主線程完成想退出時,會檢驗子線程是否完成。若是子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是,只要主線程完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠用setDaemon方法,並設置其參數爲True

 

使用Lock互斥鎖

如今咱們考慮這樣一個問題:假設各個線程須要訪問同一公共資源,咱們的代碼該怎麼寫?

import threadingimport time counter = 0 class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global counter        time.sleep(1);        counter += 1        print "I am %s, set counter:%s" % (self.name, counter) if __name__ == "__main__":    for i in range(0, 200):        my_thread = MyThread()        my_thread.start()

解決上面的問題,咱們興許會寫出這樣的代碼,咱們假設跑200個線程,可是這200個線程都會去訪問counter這個公共資源,並對該資源進行處理(counter += 1),代碼看起來就是這個樣了,可是咱們看下運行結果:

I am Thread-69, set counter:64I am Thread-73, set counter:66I am Thread-74, set counter:67I am Thread-75, set counter:68I am Thread-76, set counter:69I am Thread-78, set counter:70I am Thread-77, set counter:71I am Thread-58, set counter:72I am Thread-60, set counter:73I am Thread-62, set counter:74I am Thread-66, set counter:75I am Thread-70, set counter:76I am Thread-72, set counter:77I am Thread-79, set counter:78I am Thread-71, set counter:78

打印結果我只貼了一部分,從中咱們已經看出了這個全局資源(counter)被搶佔的狀況,問題產生的緣由就是沒有控制多個線程對同一資源的訪問,對數據形成破壞,使得線程運行的結果不可預期。這種現象稱爲「線程不安全」。在開發過程當中咱們必需要避免這種狀況,那怎麼避免?這就用到了咱們在綜述中提到的互斥鎖了。

互斥鎖概念

Python編程中,引入了對象互斥鎖的概念,來保證共享數據操做的完整性。每一個對象都對應於一個可稱爲」 互斥鎖」 的標記,這個標記用來保證在任一時刻,只能有一個線程訪問該對象。在Python中咱們使用threading模塊提供的Lock類。
咱們對上面的程序進行整改,爲此咱們須要添加一個互斥鎖變量mutex = threading.Lock(),而後在爭奪資源的時候以前咱們會先搶佔這把鎖mutex.acquire(),對資源使用完成以後咱們在釋放這把鎖mutex.release()。代碼以下:

import threadingimport time counter = 0mutex = threading.Lock() class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global counter, mutex        time.sleep(1);        if mutex.acquire():            counter += 1            print "I am %s, set counter:%s" % (self.name, counter)            mutex.release() if __name__ == "__main__":    for i in range(0, 100):        my_thread = MyThread()        my_thread.start()

同步阻塞
當一個線程調用Lock對象的acquire()方法得到鎖時,這把鎖就進入「locked」狀態。由於每次只有一個線程1能夠得到鎖,因此若是此時另外一個線程2試圖得到這個鎖,該線程2就會變爲「block「同步阻塞狀態。直到擁有鎖的線程1調用鎖的release()方法釋放鎖以後,該鎖進入「unlocked」狀態。線程調度程序從處於同步阻塞狀態的線程中選擇一個來得到鎖,並使得該線程進入運行(running)狀態。

進一步考慮

經過對公共資源使用互斥鎖,這樣就簡單的到達了咱們的目的,可是若是咱們又遇到下面的狀況:

1、遇到鎖嵌套的狀況該怎麼辦,這個嵌套是指當我一個線程在獲取臨界資源時,又須要再次獲取;

2、若是有多個公共資源,在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源;

上述這兩種狀況會直接形成程序掛起,即死鎖,下面咱們會談死鎖及可重入鎖RLock

死鎖的造成

前一篇文章Python:使用threading模塊實現多線程編程四[使用Lock互斥鎖]咱們已經開始涉及到如何使用互斥鎖來保護咱們的公共資源了,如今考慮下面的狀況–
若是有多個公共資源,在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,這會引發什麼問題?

死鎖概念
所謂死鎖: 是指兩個或兩個以上的進程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。 因爲資源佔用是互斥的,當某個進程提出申請資源後,使得有關進程在無外力協助下,永遠分配不到必需的資源而沒法繼續運行,這就產生了一種特殊現象死鎖。

import threading counterA = 0counterB = 0 mutexA = threading.Lock()mutexB = threading.Lock() class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        self.fun1()        self.fun2()     def fun1(self):        global mutexA, mutexB        if mutexA.acquire():            print "I am %s , get res: %s" %(self.name, "ResA")             if mutexB.acquire():                print "I am %s , get res: %s" %(self.name, "ResB")                mutexB.release()         mutexA.release()     def fun2(self):        global mutexA, mutexB        if mutexB.acquire():            print "I am %s , get res: %s" %(self.name, "ResB")             if mutexA.acquire():                print "I am %s , get res: %s" %(self.name, "ResA")                mutexA.release()         mutexB.release() if __name__ == "__main__":    for i in range(0, 100):        my_thread = MyThread()        my_thread.start()

代碼中展現了一個線程的兩個功能函數分別在獲取了一個競爭資源以後再次獲取另外的競爭資源,咱們看運行結果:

I am Thread-1 , get res: ResAI am Thread-1 , get res: ResBI am Thread-2 , get res: ResAI am Thread-1 , get res: ResB

能夠看到,程序已經掛起在那兒了,這種現象咱們就稱之爲」死鎖「。

避免死鎖
避免死鎖主要方法就是:正確有序的分配資源,避免死鎖算法中最有表明性的算法是Dijkstra E.W 1968年提出的銀行家算法。

可重入鎖RLock

考慮這種狀況:若是一個線程遇到鎖嵌套的狀況該怎麼辦,這個嵌套是指當我一個線程在獲取臨界資源時,又須要再次獲取。
根據這種狀況,代碼以下:

import threadingimport time counter = 0mutex = threading.Lock() class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global counter, mutex        time.sleep(1);        if mutex.acquire():            counter += 1            print "I am %s, set counter:%s" % (self.name, counter)            if mutex.acquire():                counter += 1                print "I am %s, set counter:%s" % (self.name, counter)                mutex.release()            mutex.release() if __name__ == "__main__":    for i in range(0, 200):        my_thread = MyThread()        my_thread.start()

這種狀況的代碼運行狀況以下:

I am Thread-1, set counter:1

以後就直接掛起了,這種狀況造成了最簡單的死鎖。
那有沒有一種狀況能夠在某一個線程使用互斥鎖訪問某一個競爭資源時,能夠再次獲取呢?在Python中爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

代碼只需將上述的:

mutex = threading.Lock()

替換成:

mutex = threading.RLock()

 

使用Condition實現複雜同步

目前咱們已經會使用Lock去對公共資源進行互斥訪問了,也探討了同一線程可使用RLock去重入鎖,可是儘管如此咱們只不過才處理了一些程序中簡單的同步現象,咱們甚至還不能很合理的去解決使用Lock鎖帶來的死鎖問題。因此咱們得學會使用更深層的解決同步問題。

Python提供的Condition對象提供了對複雜線程同步問題的支持。Condition被稱爲條件變量,除了提供與Lock相似的acquirerelease方法外,還提供了waitnotify方法。

使用Condition的主要方式爲:線程首先acquire一個條件變量,而後判斷一些條件。若是條件不知足則wait;若是條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後會從新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。

下面咱們經過很著名的「生產者-消費者」模型來來演示下,在Python中使用Condition實現複雜同步。

import threadingimport time condition = threading.Condition()products = 0 class Producer(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global condition, products        while True:            if condition.acquire():                if products < 10:                    products += 1;                    print "Producer(%s):deliver one, now products:%s" %(self.name, products)                    condition.notify()                else:                    print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)                    condition.wait();                condition.release()                time.sleep(2) class Consumer(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global condition, products        while True:            if condition.acquire():                if products > 1:                    products -= 1                    print "Consumer(%s):consume one, now products:%s" %(self.name, products)                    condition.notify()                else:                    print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)                    condition.wait();                condition.release()                time.sleep(2) if __name__ == "__main__":    for p in range(0, 2):        p = Producer()        p.start()     for c in range(0, 10):        c = Consumer()        c.start()

代碼中主要實現了生產者和消費者線程,雙方將會圍繞products來產生同步問題,首先是2個生成者生產products ,而接下來的10個消費者將會消耗products,代碼運行以下:

Producer(Thread-1):deliver one, now products:1Producer(Thread-2):deliver one, now products:2Consumer(Thread-3):consume one, now products:1Consumer(Thread-4):only 1, stop consume, products:1Consumer(Thread-5):only 1, stop consume, products:1Consumer(Thread-6):only 1, stop consume, products:1Consumer(Thread-7):only 1, stop consume, products:1Consumer(Thread-8):only 1, stop consume, products:1Consumer(Thread-10):only 1, stop consume, products:1Consumer(Thread-9):only 1, stop consume, products:1Consumer(Thread-12):only 1, stop consume, products:1Consumer(Thread-11):only 1, stop consume, products:1

另外:Condition對象的構造函數能夠接受一個Lock/RLock對象做爲參數,若是沒有指定,則Condition對象會在內部自行建立一個RLock;除了notify方法外,Condition對象還提供了notifyAll方法,能夠通知waiting池中的全部線程嘗試acquire內部鎖。因爲上述機制,處於waiting狀態的線程只能經過notify方法喚醒,因此notifyAll的做用在於防止有線程永遠處於沉默狀態。

使用Event實現線程間通訊

使用threading.Event能夠實現線程間相互通訊,以前的Python:使用threading模塊實現多線程編程七[使用Condition實現複雜同步]咱們已經初步實現了線程間通訊的基本功能,可是更爲通用的一種作法是使用threading.Event對象。
使用threading.Event可使一個線程等待其餘線程的通知,咱們把這個Event傳遞到線程對象中,Event默認內置了一個標誌,初始值爲False。一旦該線程經過wait()方法進入等待狀態,直到另外一個線程調用該Eventset()方法將內置標誌設置爲True時,該Event會通知全部等待狀態的線程恢復運行。

import threadingimport time class MyThread(threading.Thread):    def __init__(self, signal):        threading.Thread.__init__(self)        self.singal = signal     def run(self):        print "I am %s,I will sleep ..."%self.name        self.singal.wait()        print "I am %s, I awake..." %self.name if __name__ == "__main__":    singal = threading.Event()    for t in range(0, 3):        thread = MyThread(singal)        thread.start()     print "main thread sleep 3 seconds... "    time.sleep(3)     singal.set()

運行效果以下:

I am Thread-1,I will sleep ...I am Thread-2,I will sleep ...I am Thread-3,I will sleep ...main thread sleep 3 seconds...I am Thread-1, I awake...I am Thread-2, I awake... I am Thread-3, I awake...

相關文章
相關標籤/搜索