Python多線程學習

 1、Python中的線程使用:html

    Python中使用線程有兩種方式:函數或者用類來包裝線程對象。python

一、  函數式:調用thread模塊中的start_new_thread()函數來產生新線程。以下例:安全

 

  1. import time  
  2. import thread  
  3. def timer(no, interval):  
  4.     cnt = 0  
  5.     while cnt<10:  
  6.         print 'Thread:(%d) Time:%s\n'%(no, time.ctime())  
  7.         time.sleep(interval)  
  8.         cnt+=1  
  9.     thread.exit_thread()  
  10.      
  11.    
  12. def test(): #Use thread.start_new_thread() to create 2 new threads  
  13.     thread.start_new_thread(timer, (1,1))  
  14.     thread.start_new_thread(timer, (2,2))  
  15.    
  16. if __name__=='__main__':  
  17.     test()  
 

 

    上面的例子定義了一個線程函數timer,它打印出10條時間記錄後退出,每次打印的間隔由interval參數決定。thread.start_new_thread(function, args[, kwargs])的第一個參數是線程函數(本例中的timer方法),第二個參數是傳遞給線程函數的參數,它必須是tuple類型,kwargs是可選參數。多線程

    線程的結束能夠等待線程天然結束,也能夠在線程函數中調用thread.exit()thread.exit_thread()方法。dom

二、  建立threading.Thread的子類來包裝一個線程對象,以下例:函數

 

  1. import threading  
  2. import time  
  3. class timer(threading.Thread): #The timer class is derived from the class threading.Thread  
  4.     def __init__(self, num, interval):  
  5.         threading.Thread.__init__(self)  
  6.         self.thread_num = num  
  7.         self.interval = interval  
  8.         self.thread_stop = False  
  9.    
  10.     def run(self): #Overwrite run() method, put what you want the thread do here  
  11.         while not self.thread_stop:  
  12.             print 'Thread Object(%d), Time:%s\n' %(self.thread_num, time.ctime())  
  13.             time.sleep(self.interval)  
  14.     def stop(self):  
  15.         self.thread_stop = True  
  16.          
  17.    
  18. def test():  
  19.     thread1 = timer(11)  
  20.     thread2 = timer(22)  
  21.     thread1.start()  
  22.     thread2.start()  
  23.     time.sleep(10)  
  24.     thread1.stop()  
  25.     thread2.stop()  
  26.     return  
  27.    
  28. if __name__ == '__main__':  
  29.     test()  
 

 

   

    就我我的而言,比較喜歡第二種方式,即建立本身的線程類,必要時重寫threading.Thread類的方法,線程的控制能夠由本身定製。工具

threading.Thread類的使用:ui

1,在本身的線程類的__init__裏調用threading.Thread.__init__(self, name = threadname)spa

Threadname爲線程的名字.net

2 run(),一般須要重寫,編寫代碼實現作須要的功能。

3getName(),得到線程對象名稱

4setName(),設置線程對象名稱

5start(),啓動線程

6jion([timeout]),等待另外一線程結束後再運行。

7setDaemon(bool),設置子線程是否隨主線程一塊兒結束,必須在start()以前調用。默認爲False

8isDaemon(),判斷線程是否隨主線程一塊兒結束。

9isAlive(),檢查線程是否在運行中。

    此外threading模塊自己也提供了不少方法和其餘的類,能夠幫助咱們更好的使用和管理線程。能夠參看http://www.python.org/doc/2.5.2/lib/module-threading.html


假設兩個線程對象t1t2都要對num=0進行增1運算,t1t2都各對num修改10次,num的最終的結果應該爲20。可是因爲是多線程訪問,有可能出現下面狀況:在num=0時,t1取得num=0。系統此時把t1調度爲」sleeping」狀態,把t2轉換爲」running」狀態,t2頁得到num=0。而後t2對獲得的值進行加1並賦給num,使得num=1。而後系統又把t2調度爲」sleeping」,把t1轉爲」running」。線程t1又把它以前獲得的01後賦值給num。這樣,明明t1t2都完成了1次加1工做,但結果仍然是num=1

    上面的case描述了多線程狀況下最多見的問題之一:數據共享。當多個線程都要去修改某一個共享數據的時候,咱們須要對數據訪問進行同步。

一、  簡單的同步

最簡單的同步機制就是「鎖」。鎖對象由threading.RLock類建立。線程可使用鎖的acquire()方法得到鎖,這樣鎖就進入「locked」狀態。每次只有一個線程能夠得到鎖。若是當另外一個線程試圖得到這個鎖的時候,就會被系統變爲「blocked」狀態,直到那個擁有鎖的線程調用鎖的release()方法來釋放鎖,這樣鎖就會進入「unlocked」狀態。「blocked」狀態的線程就會收到一個通知,並有權利得到鎖。若是多個線程處於「blocked」狀態,全部線程都會先解除「blocked」狀態,而後系統選擇一個線程來得到鎖,其餘的線程繼續沉默(「blocked」)。

Python中的thread模塊和Lock對象是Python提供的低級線程控制工具,使用起來很是簡單。以下例所示:

 

  1. import thread  
  2. import time  
  3. mylock = thread.allocate_lock()  #Allocate a lock  
  4. num=0  #Shared resource  
  5.   
  6. def add_num(name):  
  7.     global num  
  8.     while True:  
  9.         mylock.acquire() #Get the lock   
  10.         # Do something to the shared resource  
  11.         print 'Thread %s locked! num=%s'%(name,str(num))  
  12.         if num >= 5:  
  13.             print 'Thread %s released! num=%s'%(name,str(num))  
  14.             mylock.release()  
  15.             thread.exit_thread()  
  16.         num+=1  
  17.         print 'Thread %s released! num=%s'%(name,str(num))  
  18.         mylock.release()  #Release the lock.  
  19.   
  20. def test():  
  21.     thread.start_new_thread(add_num, ('A',))  
  22.     thread.start_new_thread(add_num, ('B',))  
  23.   
  24. if __name__== '__main__':  
  25.     test()  

 

Python thread的基礎上還提供了一個高級的線程控制庫,就是以前提到過的threadingPythonthreading module是在創建在thread module基礎之上的一個module,在threading module中,暴露了許多thread module中的屬性。在thread module中,python提供了用戶級的線程同步工具「Lock」對象。而在threading module中,python又提供了Lock對象的變種: RLock對象。RLock對象內部維護着一個Lock對象,它是一種可重入的對象。對於Lock對象而言,若是一個線程連續兩次進行acquire操做,那麼因爲第一次acquire以後沒有release,第二次acquire將掛起線程。這會致使Lock對象永遠不會release,使得線程死鎖。RLock對象容許一個線程屢次對其進行acquire操做,由於在其內部經過一個counter變量維護着線程acquire的次數。並且每一次的acquire操做必須有一個release操做與之對應,在全部的release操做完成以後,別的線程才能申請該RLock對象。

下面來看看如何使用threadingRLock對象實現同步。

 

  1. import threading  
  2. mylock = threading.RLock()  
  3. num=0  
  4.    
  5. class myThread(threading.Thread):  
  6.     def __init__(self, name):  
  7.         threading.Thread.__init__(self)  
  8.         self.t_name = name  
  9.           
  10.     def run(self):  
  11.         global num  
  12.         while True:  
  13.             mylock.acquire()  
  14.             print '\nThread(%s) locked, Number: %d'%(self.t_name, num)  
  15.             if num>=4:  
  16.                 mylock.release()  
  17.                 print '\nThread(%s) released, Number: %d'%(self.t_name, num)  
  18.                 break  
  19.             num+=1  
  20.             print '\nThread(%s) released, Number: %d'%(self.t_name, num)  
  21.             mylock.release()  
  22.               
  23. def test():  
  24.     thread1 = myThread('A')  
  25.     thread2 = myThread('B')  
  26.     thread1.start()  
  27.     thread2.start()  
  28.    
  29. if __name__== '__main__':  
  30.     test()  

 

咱們把修改共享數據的代碼成爲「臨界區」。必須將全部「臨界區」都封閉在同一個鎖對象的acquirerelease之間。

二、  條件同步

鎖只能提供最基本的同步。假如只在發生某些事件時才訪問一個「臨界區」,這時須要使用條件變量Condition

Condition對象是對Lock對象的包裝,在建立Condition對象時,其構造函數須要一個Lock對象做爲參數,若是沒有這個Lock對象參數,Condition將在內部自行建立一個Rlock對象。在Condition對象上,固然也能夠調用acquirerelease操做,由於內部的Lock對象自己就支持這些操做。可是Condition的價值在於其提供的waitnotify的語義。

條件變量是如何工做的呢?首先一個線程成功得到一個條件變量後,調用此條件變量的wait()方法會致使這個線程釋放這個鎖,並進入「blocked」狀態,直到另外一個線程調用同一個條件變量的notify()方法來喚醒那個進入「blocked」狀態的線程。若是調用這個條件變量的notifyAll()方法的話就會喚醒全部的在等待的線程。

若是程序或者線程永遠處於「blocked」狀態的話,就會發生死鎖。因此若是使用了鎖、條件變量等同步機制的話,必定要注意仔細檢查,防止死鎖狀況的發生。對於可能產生異常的臨界區要使用異常處理機制中的finally子句來保證釋放鎖。等待一個條件變量的線程必須用notify()方法顯式的喚醒,不然就永遠沉默。保證每個wait()方法調用都有一個相對應的notify()調用,固然也能夠調用notifyAll()方法以防萬一。


生產者與消費者問題是典型的同步問題。這裏簡單介紹兩種不一樣的實現方法。

1,  條件變量

 

  1. import threading  
  2.   
  3. import time  
  4.   
  5. class Producer(threading.Thread):  
  6.   
  7.     def __init__(self, t_name):  
  8.   
  9.         threading.Thread.__init__(self, name=t_name)  
  10.   
  11.    
  12.   
  13.     def run(self):  
  14.   
  15.         global x  
  16.   
  17.         con.acquire()  
  18.   
  19.         if x > 0:  
  20.   
  21.             con.wait()  
  22.   
  23.         else:  
  24.   
  25.             for i in range(5):  
  26.   
  27.                 x=x+1  
  28.   
  29.                 print "producing..." + str(x)  
  30.   
  31.             con.notify()  
  32.   
  33.         print x  
  34.   
  35.         con.release()  
  36.   
  37.    
  38.   
  39. class Consumer(threading.Thread):  
  40.   
  41.     def __init__(self, t_name):  
  42.   
  43.         threading.Thread.__init__(self, name=t_name)  
  44.   
  45.     def run(self):  
  46.   
  47.         global x  
  48.   
  49.         con.acquire()  
  50.   
  51.         if x == 0:  
  52.   
  53.             print 'consumer wait1'  
  54.   
  55.             con.wait()  
  56.   
  57.         else:  
  58.   
  59.             for i in range(5):  
  60.   
  61.                 x=x-1  
  62.   
  63.                 print "consuming..." + str(x)  
  64.   
  65.             con.notify()  
  66.   
  67.         print x  
  68.   
  69.         con.release()  
  70.   
  71.    
  72.   
  73. con = threading.Condition()  
  74.   
  75. x=0  
  76.   
  77. print 'start consumer'  
  78.   
  79. c=Consumer('consumer')  
  80.   
  81. print 'start producer'  
  82.   
  83. p=Producer('producer')  
  84.   
  85.    
  86.   
  87. p.start()  
  88.   
  89. c.start()  
  90.   
  91. p.join()  
  92.   
  93. c.join()  
  94.   
  95. print x  
  

 

 

    上面的例子中,在初始狀態下,Consumer處於wait狀態,Producer連續生產(對x執行增1操做)5次後,notify正在等待的ConsumerConsumer被喚醒開始消費(對x執行減1操做) 

2,  同步隊列

Python中的Queue對象也提供了對線程同步的支持。使用Queue對象能夠實現多個生產者和多個消費者造成的FIFO的隊列。

生產者將數據依次存入隊列,消費者依次從隊列中取出數據。

 

 

  1. # producer_consumer_queue  
  2.   
  3. from Queue import Queue  
  4.   
  5. import random  
  6.   
  7. import threading  
  8.   
  9. import time  
  10.   
  11.    
  12.   
  13. #Producer thread  
  14.   
  15. class Producer(threading.Thread):  
  16.   
  17.     def __init__(self, t_name, queue):  
  18.   
  19.         threading.Thread.__init__(self, name=t_name)  
  20.   
  21.         self.data=queue  
  22.   
  23.     def run(self):  
  24.   
  25.         for i in range(5):  
  26.   
  27.             print "%s: %s is producing %d to the queue!\n" %(time.ctime(), self.getName(), i)  
  28.   
  29.             self.data.put(i)  
  30.   
  31.             time.sleep(random.randrange(10)/5)  
  32.   
  33.         print "%s: %s finished!" %(time.ctime(), self.getName())  
  34.   
  35.    
  36.   
  37. #Consumer thread  
  38.   
  39. class Consumer(threading.Thread):  
  40.   
  41.     def __init__(self, t_name, queue):  
  42.   
  43.         threading.Thread.__init__(self, name=t_name)  
  44.   
  45.         self.data=queue  
  46.   
  47.     def run(self):  
  48.   
  49.         for i in range(5):  
  50.   
  51.             val = self.data.get()  
  52.   
  53.             print "%s: %s is consuming. %d in the queue is consumed!\n" %(time.ctime(), self.getName(), val)  
  54.   
  55.             time.sleep(random.randrange(10))  
  56.   
  57.         print "%s: %s finished!" %(time.ctime(), self.getName())  
  58.   
  59.    
  60.   
  61. #Main thread  
  62.   
  63. def main():  
  64.   
  65.     queue = Queue()  
  66.   
  67.     producer = Producer('Pro.', queue)  
  68.   
  69.     consumer = Consumer('Con.', queue)  
  70.   
  71.     producer.start()  
  72.   
  73.     consumer.start()  
  74.   
  75.     producer.join()  
  76.   
  77.     consumer.join()  
  78.   
  79.     print 'All threads terminate!'  
  80.   
  81.    
  82.   
  83. if __name__ == '__main__':  
  84.   
  85.     main()  

 

 

在上面的例子中,Producer在隨機的時間內生產一個「產品」,放入隊列中。Consumer發現隊列中有了「產品」,就去消費它。本例中,因爲Producer生產的速度快於Consumer消費的速度,因此每每Producer生產好幾個「產品」後,Consumer才消費一個產品。

Queue模塊實現了一個支持多producer和多consumerFIFO隊列。當共享信息須要安全的在多線程之間交換時,Queue很是有用。Queue的默認長度是無限的,可是能夠設置其構造函數的maxsize參數來設定其長度。Queueput方法在隊尾插入,該方法的原型是:

put( item[, block[, timeout]])

若是可選參數blocktrue而且timeoutNone(缺省值),線程被block,直到隊列空出一個數據單元。若是timeout大於0,在timeout的時間內,仍然沒有可用的數據單元,Full exception被拋出。反之,若是block參數爲false(忽略timeout參數),item被當即加入到空閒數據單元中,若是沒有空閒數據單元,Full exception被拋出。

Queueget方法是從隊首取數據,其參數和put方法同樣。若是block參數爲truetimeoutNone(缺省值),線程被block,直到隊列中有數據。若是timeout大於0,在timeout時間內,仍然沒有可取數據,Empty exception被拋出。反之,若是block參數爲false(忽略timeout參數),隊列中的數據被當即取出。若是此時沒有可取數據,Empty exception也會被拋出。

相關文章
相關標籤/搜索