threadinghtml
【這篇文章的閱讀量愈來愈多了… 所以我以爲有必要聲明下,文章的性質是我我的的學習記錄和總結,並不是教程,文中不免有表達不嚴謹,甚至錯誤的地方。若是您只是相對threading相關內容作個大概的瞭解,但願能對您有所參考。若是想要精密地學習,請移步正規教材、文檔以及大牛的博客】python
python程序默認是單線程的,也就是說在前一句語句執行完以前後面的語句不能繼續執行(不知道我理解得對不對)多線程
先感覺一下線程,通常狀況下:併發
def testa(): sleep(1) print "a" def testb(): sleep(1) print "b" testa() testb() #先隔出一秒打印出a,再過一秒打出b
可是若是用了threading的話:app
ta = threading.Thread(target=testa) tb = threading.Thread(target=testb) for t in [ta,tb]: t.start() for t in [ta,tb]: t.join() print "DONE" #輸出是ab或者ba(緊貼着的)而後空一行再來DONE的結果。
獲得這樣的結果是由於這樣的,在start以後,ta首先開始跑,可是主線程(腳本自己)沒有等其完成就繼續開始下一輪循環,而後tb也開始了,在以後的一段時間裏,ta和tb兩條線程(分別表明了testa和testb這兩個過程)共同執行。相對於一個個迭代而言,這樣作無疑是大大提升了運行的速度。dom
Thread類爲線程的抽象類,其構造方法的參數target指向一個函數對象,即該線程的具體操做。此外還能夠有args=<tuple>來給target函數傳參數。須要注意的是當傳任何一個序列進去的話Thread會自動把它分解成單個單個的元素而後分解傳給target函數。我估計在定義的時候確定是*args了。ide
join方法是個很tricky的東西,至今還不是很清楚地懂這是個什麼玩意兒。join([timeout])方法阻塞了主線程,直到調用此方法的子線程完成以後主線程才繼續往下運行。(以前我糊里糊塗地把join就牢牢接在start後面寫了,若是這麼寫了的話那麼多線程在速度上就毫無優點,和單線程同樣了= =)。而像上面這個示例同樣,先一個遍歷把全部線程 都啓動起來,再用一個遍歷把全部線程都join一遍彷佛是比較通行的作法。函數
● 關於線程鎖工具
多線程程序涉及到一個問題,那就是當不一樣線程要對同一個資源進行修改或利用時會出現混亂,因此有必要引入線程鎖。性能
(經網友提醒,補充一下相關例子)好比下面這一段程序:
import threading class MyThread(threading.Thread): def __init__(self,counter,name): self.counter = counter self.name = name def run(self): self.counter[0] += 1 print self.counter[0] if __name__ == '__main__': counter = [0] for i in range(1,11): t = MyThread(counter,i) t.start()
這裏併發了10個線程,在沒有混亂的狀況下,很明顯一個線程的name和通過它處理事後的counter中的數字應該相同。由於沒有鎖可能引起混亂,想象中,咱們可能認爲,當某個線程要打印counter中的數字時,別的線程對其做出了改變,從而致使打印出的counter中的數字不符合預期。實際上,這段代碼的運行結果很大機率是很整齊的1\n2\n3....10。若是要解釋一下,1. 雖然稱併發10個線程。可是實際上線程是不可能真的在同一個時間點開始,好比在這個例子中t1啓動後,要將循環進入下一輪,建立新的線程對象t2,而後再讓t2啓動。這段時間雖然很短很短,可是確實是存在的。而這段時間的長度,足夠讓t1的run中,進行自增而且打印的操做。最終,整個結果看上去彷佛沒什麼毛病。
若是咱們想要看到「混亂」的狀況,顯然兩個方法。要麼縮短for i in range以及建立線程對象的時間,使得線程在自增以後來不及打印時counter被第二個線程自增,這個比較困難;另外一個方法就是延長自增後到打印前的這段時間。天然想到,最簡單的,用time.sleep(1)睡一秒便可。此時結果多是10\n10\n...。主要看第一行的結果。再也不是1而是10了。說明在自增操做結束,打印數字以前睡的這一秒裏,到第10個線程都成功自增了counter,所以即便是第一個線程,打印到的也是通過第10個線程修改的counter了。
上述結果雖然數的值上改變了,可是極可能輸出仍然是整齊的一行行的。有時候好幾個數字會擠在一塊兒輸出,尤爲是把併發量調大,好比調到100或1000,尤其明顯。擠在一塊兒主要是由於,time.sleep(1)並非精精確確地睡1秒。有多是0.999或者1.001(具體差別可能更小,打個比方)。此時可能tk線程睡了1.001秒而tk+1線程睡了0.999秒,致使二者打印時內容被雜亂地一塊兒寫入緩衝區,因此打印出來的就凌亂了。根據時間偏差的不一樣,甚至有可能出現大數字先打印出來的狀況。
能夠經過Thread.Lock類來建立簡單的線程鎖。lock = threading.Lock()便可。在某線程start中,處理會被爭搶的資源以前,讓lock.acquire(),且lock在acquire()以後不能再acquire,不然會報錯。當線程結束處理後調用lock.release()來釋放鎖就行了。通常而言,有鎖的多線程場景能夠提高一部分效率,但在寫文件等時機下會有阻塞等待的狀況。
爲了說明簡單的lock,咱們改一下上面那段程序:
import threading import time class MyThread(threading.Thread): def __init__(self,lock,name): threading.Thread.__init__(self) self.lock = lock self.name = name def run(self): time.sleep(1) # self.lock.acquire() print self.name # self.lock.release() if __name__ == '__main__': lock = threading.Lock() for i in range(1,10): t = MyThread(lock,i) t.start()
根據啓動的順序,每一個線程有了name屬性。而後啓動,在沒有鎖的狀況下可能會出現擠在一塊兒,而且數字亂序輸出的狀況。把兩句註釋去掉,加上鎖以後,獲得的輸出確定是一行一行的,可是數字仍然有多是亂序的。分析一下,加上鎖以後,每次進行print,實際上是線程對於sys.stdout寫入內容,有多個線程都要print就造成了競爭,所以就會致使擠在一塊兒。加上鎖,acquire以後,本線程擁有了對sys.stdout的獨享,所以能夠正確輸出內容+換行,再解開鎖供下一個須要打印的線程使用。那爲何亂序問題仍是沒有解決呢?這個就是(推測)由於前面提到的time.sleep的不精確性。有可能6號線程sleep了稍微久而7號稍微短了些,致使7號先於6號得到鎖。天然7就比6先打印出來了。若是稍微有意思地改動一下,好比sleep的秒數時間錯開來,1號線程睡1秒,2號線程睡2秒這樣子的話,時間上的錯開使得沒有了對資源的競爭的狀況,所以即便沒有鎖也不會亂。
總結一下,1. 對於run過程當中對於可能有競爭的資源以前所作的操做,花費時間越是接近,越有可能發生資源競爭從而致使混亂。(廢話…)2. 當run中有print或者相似操做時須要注意,其實隱含着要對stdout作出競爭的意義
相比之下,無所多線程場景能夠進一步提高效率,可是可能會引發讀寫衝突等問題,因此要慎用。必定要確認各個線程間沒有共同的資源之類的問題後再實行無鎖多線程。
和Lock類相似的還有一個RLock類,與Lock類的區別在於RLock類鎖能夠嵌套地acquire和release。也就是說在同一個線程中acquire以後再acquire也不會報錯,而是將鎖的層級加深一層。只有當每一層鎖從下到上依次都release開這個鎖纔算是被解開。
● 更增強大的鎖——Condition
上面提到的threading.Lock類提供了最爲簡單的線程鎖的功能。除了Lock和RLock之外,其實threading還補充了其餘一些不少的帶有鎖功能的類。Condition就是其中最爲強大的類之一。
在說Condition以前還須要明確一下線程的幾個概念。線程的阻塞和掛起,線程的這兩個狀態乍一看都是線程暫停再也不繼續往前運行,可是引發的緣由不太同樣。阻塞是指線程間互相的制約,當一個線程得到了鎖,其餘的線程就被阻塞了,而掛起是出於統一調度的考慮。換句話說,掛起是一種主動的行爲,在程序中咱們主動掛起某個線程而後能夠主動放下讓線程繼續運行;而阻塞更多時候是被動發生的,當有線程操做衝突了那麼必然是有一方要被阻塞的。從層級上看,掛起操做是高於阻塞的,也就說一個線程能夠在阻塞的時候被掛起,而後被喚醒後依然是阻塞狀態。若是在掛起過程當中具有了運行條件(即再也不阻塞),線程也不會往前運行。
再來看看Condition類的一些方法。首先是acquire和release,Condition內部也維護了一把鎖,默認是RLock類,全部關聯了同一個Condition對象的線程也都會遵照這把鎖規定的來進行運行。
Condition.wait([timeout]) 這個方法必定要在獲取鎖定以後調用,調用這個方法的Condition對象所在的線程會被掛起而且釋放這個線程得到着的全部鎖,直到接到通知被喚醒或者超時(若是設置了Timeout的話),當被喚醒以後線程將從新獲取鎖定。
Condition.notify() notify就是上面所說的通知,調用這個方法以後會喚醒一個被掛起的線程。線程的選擇尚不明確,彷佛是隨機的。須要注意的是notify方法只進行掛起的喚醒而不涉及鎖的釋放
Condition.notify_all() 喚醒全部掛起的線程
基於上面這幾個方法,就能夠作出比較好的線程管理的demo了,好比下面這段網上常見的一個捉迷藏的模擬程序:
import threading,time class Seeker(threading.Thread): def __init__(self,cond,name): Thread.__init__(self) self.cond = cond self.name = name def run(self): time.sleep(1) #1.確保seeker晚於hider開始執行 self.cond.acquire() #4. hider的鎖釋放了因此這裏得到了鎖 print '我把眼睛蒙上了' self.cond.notify() #5.蒙上眼後通知hider,hider線程此時被喚醒並試圖獲取鎖,可是鎖還在seeker身上,因此hider被阻塞,seeker繼續往下 self.cond.wait() #6. seeker鎖被釋放而且掛起,hider就獲取鎖開始繼續往下運行了 print '我找到你了' self.cond.notify() #9.找到了以後通知hider,hider意圖獲取鎖但不行因此被阻塞,seeker往下 self.cond.release() #10.釋放鎖 print '我贏了' class Hider(threading.Thread): def __init__(self,cond,name): Thread.__init__(self) self.cond = cond self.name = name def run(self): self.cond.acquire() #2.hider獲取鎖 self.cond.wait() #3.hider被掛起而後釋放鎖 print '我已經藏好了' self.cond.notify() #7.藏好後通知seeker,seeker意圖獲取鎖,可是鎖在hider身上因此seeker被阻塞 self.cond.wait() #8.hider被掛起,釋放鎖,seeker獲取鎖,seeker繼續往下運行 self.cond.release() #11. 在此句以前一點,seeker釋放了鎖(#10),hider獲得鎖,隨即這句hider釋放鎖 print '被你找到了' cond = threading.Condition() seeker = Seeker(cond,'seeker') hider = Hider(cond,'hider') seeker.start() hider.start() ''' 結果: 我把眼睛蒙上了 我已經藏好了 我找到你了 我贏了 被你找到了 '''
這裏須要注意的是self.cond.release方法不能省,不然會引發死鎖。
● 以上的包裝線程的方式是一種面向過程的方法,下面介紹一下如何面向對象地來抽象線程
面向對象地抽象線程須要自定義一個類繼承Thread類。好比自定義class MyThread(Thread)。這個類的一個實例就是表明了一個線程,而後經過重載這個類中的run方法(是run,不是start!!但start的動做確實就是調用run)來執行具體的操做。此時鎖能夠做爲一個構造方法的參數,將一個鎖傳進不一樣的實例中以實現線程鎖控制。好比:
引用自http://www.cnblogs.com/tkqasn/p/5700281.html
#方法二:從Thread繼承,並重寫run() class MyThread(threading.Thread): def __init__(self,arg): super(MyThread, self).__init__()#注意:必定要顯式的調用父類的初始化函數。 self.arg=arg def run(self):#定義每一個線程要運行的函數 time.sleep(1) print 'the arg is:%s\r' % self.arg for i in xrange(4): t =MyThread(i) t.start() print 'main thread end!'
Thread類還有如下的一些方法,自定義的類也能夠調用
getName()
setName(...) //其實Thread類在構造方法中有一個name參數,能夠爲相應的線程取一個名字。這兩個方法就是相關這個名字屬性的
isAlive() 一個線程從start()開始到run()結束的過程當中沒有異常,則其實alive的。
setDaemon(True/False) 是否設置一個線程爲守護線程。當你設置一個線程爲守護線程以後,程序不會等待這個線程結束再退出程序,可參考http://blog.csdn.net/u012063703/article/details/51601579
● 除了Thread類,threading中還有如下一些屬性,簡單介紹一下:
Timer類,Timer(int,target=func) 和Thread類相似,只不過它在int秒事後才以target指定的函數開始線程運行
currentThread() 得到當前線程對象
activeCount() 得到當前活動的線程總個數
enumerate() 得到全部活動線程的列表
settrace(func) 設置一跟蹤函數,在run執行前執行
setprofile(func) 設置一跟蹤函數,在run執行完畢以後執行
以上內容是目前我所能駕馭的,而threading類還有不少很NB的東西好比RLock類,Condition類,Event類等等。沒什麼時間再仔細研究它們,先寫到這裏爲止。
Queue
Queue用於創建和操做隊列,常和threading類一塊兒用來創建一個簡單的線程隊列。
首先,隊列有不少種,根據進出順序來分類,能夠分紅
Queue.Queue(maxsize) FIFO(先進先出隊列)
Queue.LifoQueue(maxsize) LIFO(先進後出隊列)
Queue.PriorityQueue(maxsize) 爲優先級越高的越先出來,對於一個隊列中的全部元素組成的entries,優先隊列優先返回的一個元素是sorted(list(entries))[0]。至於對於通常的數據,優先隊列取什麼東西做爲優先度要素進行判斷,官方文檔給出的建議是一個tuple如(priority, data),取priority做爲優先度。
若是設置的maxsize小於1,則表示隊列的長度無限長
FIFO是經常使用的隊列,其一些經常使用的方法有:
Queue.qsize() 返回隊列大小
Queue.empty() 判斷隊列是否爲空
Queue.full() 判斷隊列是否滿了
Queue.get([block[,timeout]]) 從隊列頭刪除並返回一個item,block默認爲True,表示當隊列爲空卻去get的時候會阻塞線程,等待直到有有item出現爲止來get出這個item。若是是False的話代表當隊列爲空你卻去get的時候,會引起異常。在block爲True的狀況下能夠再設置timeout參數。表示當隊列爲空,get阻塞timeout指定的秒數以後尚未get到的話就引起Full異常。
Queue.put(...[,block[,timeout]]) 向隊尾插入一個item,一樣若block=True的話隊列滿時就阻塞等待有空位出來再put,block=False時引起異常。同get的timeout,put的timeout是在block爲True的時候進行超時設置的參數。
Queue.task_done() 從場景上來講,處理完一個get出來的item以後,調用task_done將向隊列發出一個信號,表示本任務已經完成
Queue.join() 監視全部item並阻塞主線程,直到全部item都調用了task_done以後主線程才繼續向下執行。這麼作的好處在於,假如一個線程開始處理最後一個任務,它從任務隊列中拿走最後一個任務,此時任務隊列就空了但最後那個線程還沒處理完。當調用了join以後,主線程就不會由於隊列空了而擅自結束,而是等待最後那個線程處理完成了。
結合threading和Queue能夠構建出一個簡單的生產者-消費者模型,好比:
下面的代碼引用自http://blog.csdn.net/l1902090/article/details/24804085
import threading import Queue import time class worker(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue=queue self.thread_stop=False def run(self): while not self.thread_stop: print("thread%d %s: waiting for tast" %(self.ident,self.name)) try: task=q.get(block=True, timeout=20)#接收消息 except Queue.Empty: print("Nothing to do!i will go home!") self.thread_stop=True break print("task recv:%s ,task No:%d" % (task[0],task[1])) print("i am working") time.sleep(3) print("work finished!") q.task_done()#完成一個任務 res=q.qsize()#判斷消息隊列大小 if res>0: print("fuck!There are still %d tasks to do" % (res)) def stop(self): self.thread_stop = True if __name__ == "__main__": q=Queue.Queue(3) worker=worker(q) worker.start() q.put(["produce one cup!",1], block=True, timeout=None)#產生任務消息 q.put(["produce one desk!",2], block=True, timeout=None) q.put(["produce one apple!",3], block=True, timeout=None) q.put(["produce one banana!",4], block=True, timeout=None) q.put(["produce one bag!",5], block=True, timeout=None) print("***************leader:wait for finish!") q.join()#等待全部任務完成 print("***************leader:all task finished!")
(嗯。。姑且不論他的F-word哈哈哈,開玩笑的,這例子還能夠,至少很清晰地說明了如何把這兩個模塊結合起來用)
輸出是這樣的:
thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one cup! ,task No:1 i am working work finished! fuck!There are still 3 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one desk! ,task No:2 i am workingleader:wait for finish! work finished! fuck!There are still 3 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one apple! ,task No:3 i am working work finished! fuck!There are still 2 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one banana! ,task No:4 i am working work finished! fuck!There are still 1 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one bag! ,task No:5 i am working work finished! thread139958685849344 Thread-1: waiting for tast 1 ***************leader:all task finished! Nothing to do!i will go home!
運行一下就知道,上例中並無性能的提高(畢竟仍是隻有一個線程在跑)。線程隊列的意義並非進一步提升運行效率,而是使線程的併發更加有組織。能夠看到,在增長了線程隊列以後,程序對於線程的併發數量就有了控制。新線程想要加入隊列開始執行,必須等一個既存的線程完成以後才能夠。舉個例子,好比
for i in range(x): t = MyThread(queue) t.start()
x在這裏是個變量,咱們不知道這個循環會觸發多少線程併發,若是多的話就會很冒險。可是有了隊列以後,把一個隊列做爲全部線程構建線程對象時的一個參數,讓線程必須按照這個隊列規定的大小來執行的話,就不擔憂過多線程帶來的危險了。
■ 線程池實現
不得不說一年前仍是太simple。。 一年後再來補充點內容吧
首先咱們要明確,線程池,線程,隊列這幾個概念之間的區別和聯繫。
舉一個不太恰當的例子。好比有五個很餓的人去吃旋轉壽司。旋轉壽司店裏有一個傳送帶,將壽司運送到他們面前。他們一字排開坐好準備好吃,當壽司過來,食客可能會選擇一個喜歡的口味開吃。在吃的過程當中,他一般就不會再去「吃着碗裏看着傳送帶上的」了。之因此是很餓的人,由於咱們假定他們一旦吃完一盤就會馬上着手下一盤,絕不停歇。
在這個場景中,五我的組成的集體是線程池,每一個人就是一個線程,而旋轉壽司的傳送帶是隊列,每盤壽司就是一個隊列中的任務。之因此說這個例子不太恰當,是由於場景中食客能夠本身選擇想吃的壽司而線程池-隊列中,隊列纔是任務分配的主導。就比如是傳送帶發現某個食客說他已經吃完一盤壽司,還想再來一盤的時候,會不顧食客的喜愛,強行將一盤壽司推到一個空閒的食客面前讓他吃。
更加抽象點來講,線程在這個語境中其實就像是一個工具,而線程池就是一個工具的集合。因爲一般一個線程池面向的是一類任務,因此線程池中的線程基本上也是同質的。即上述的五個食客是五胞胎(誤hh)。另外一方面,之因此說面向的是一類任務,是由於隊列中的任務一般是具備某些共性的。共性程度高低取決於隊列以及線程池的具體實現,可是確定是有的。這就比如壽司能夠有握り,巻き而上面的具能夠有いくら、マグロ、ウニ可是歸根結底確定仍是要有米飯的。
在正式的開發中,隊列一般是由第三方服務提供好比RabbitMQ,Redis等。而線程池一般由程序本身實現。下面這段代碼則是在一個python程序中,基於Queue加上自制的建議線程池創建起來的模型。
# -*- coding:utf-8 -*- import threading import Queue import time import random from faker import Faker class MyThread(threading.Thread): ''' 線程模型 ''' def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue self.start() # 由於做爲一個工具,線程必須永遠「在線」,因此不如讓它在建立完成後直接運行,免得咱們手動再去start它 def run(self): while True: # 除非確認隊列中已經無任務,不然時刻保持線程在運行 try: task = self.queue.get(block=False) # 若是隊列空了,直接結束線程。根據具體場景不一樣可能不合理,能夠修改 time.sleep(random.random()) # 假設處理了一段時間 print 'Task %s Done' % task # 提示信息而已 self.queue.task_done() except Exception,e: break class MyThreadPool(): def __init__(self,queue,size): self.queue = queue self.pool = [] for i in range(size): self.pool.append(MyThread(queue)) def joinAll(self): for thd in self.pool: if thd.isAlive(): thd.join() if __name__ == '__main__': q = Queue.Queue(10) fake = Faker() for i in range(5): q.put(fake.word()) pool = MyThreadPool(queue=q,size=2) pool.joinAll()
網上有一部分示例,將隊列做爲一個屬性維護在了線程池類中,也不失爲一種辦法,我這裏爲了可以條理清晰,沒有放在類裏面。這段程序首先生成了一個maxsize是10的隊列。fake.word()能夠隨機生成一個單詞,這裏僅做測試用。因此向隊列中添加了5個task。
這裏有個坑: 若是put的數量大於隊列最大長度,並且put沒有設置block=False的話,那麼顯然程序會阻塞在put這邊。此時ThreadPool未被創建,也就是說工做線程都尚未啓動,所以會引發這樣一個死鎖。若是把線程池的創建放到put以前也不行,此時線程發現隊列爲空,因此全部線程都會直接結束(固然這是線程中get的block是False的時候,若是爲True那麼也是死鎖),最終隊列中的task沒人處理,程序輸出爲空。解決這個坑的辦法,一個是像上面同樣保持最開始put的量小於隊列長度;第二個就是乾脆不要限制隊列長度,用q = Queue.Queue()生產隊列便可。
好的,繼續往下,進入了線程池的生成。線程池內部的列表纔是真·線程池,另外其關聯了queue對象,因此在建立的時候能夠將隊列對象傳遞給線程對象。線程對象在建立時就啓動了,而且被添加到線程池的那個列表中。線程池的大小由參數給出,線程啓動後會去隊列裏面get任務,而且進行處理。處理完成後進行task_done聲明而且再次去嘗試get。若是隊列爲空那麼就直接拋出異常,也就是跳出循環,線程結束。
經過這樣一個模型,根據線程池的大小,這才真正地給線程併發作了一個限制,可促進較大程度的資源利用。
● 進一步地…
在上面這個示例中,實際上處理任務的實際邏輯是被寫在了MyThread類裏面。若是咱們想要一個通用性更加高的工具類,那麼勢必要想一想如何將這個線程類解耦具體邏輯。另外一方面,隊列中的任務的內容,不只僅能夠是字符串,也能夠是任何python對象。這就使得靈活性大大提升。
好比咱們能夠在隊列中put內容是(func, args, kwargs)這樣一個元組。其中func是一個函數對象,描述了任務的處理邏輯過程,args是一個元組,表明全部func函數的匿名參數,kwargs則是func函數的全部具名參數。如此,能夠將線程類的run方法改寫成這樣:
def run(self): while True: try: func,args,kwargs = self.queue.get() try: func(*args,**kwargs) except Exception,e: raise ('bad execution: %s' % str(e)) self.queue.task_done() except Exception,e: break
這樣一個run就能夠作到很大程度的解耦了。
相似的思想,線程池類和線程類也沒必要是一一對應的。能夠將線程類做爲一個參數傳遞給線程池類。這樣一個線程池類就能夠做爲容器容納各類各樣的線程了。具體實例就不寫了。