線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。python
Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊容許程序員建立和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊容許用戶建立一個能夠用於多個線程之間共享數據的隊列數據結構。
避免使用thread模塊,由於更高級別的threading模塊更爲先進,對線程的支持更爲完善,並且使用thread模塊裏的屬性有可能會與threading出現衝突;其次低級別的thread模塊的同步原語不多(實際上只有一個),而threading模塊則有不少;再者,thread模塊中當主線程結束時,全部的線程都會被強制結束掉,沒有警告也不會有正常的清除工做,至少threading模塊能確保重要的子線程退出後進程才退出。 mysql
thread模塊不支持守護線程,當主線程退出時,全部的子線程不論它們是否還在工做,都會被強行退出。而threading模塊支持守護線程,守護線程通常是一個等待客戶請求的服務器,若是沒有客戶提出請求它就在那等着,若是設定一個線程爲守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。程序員
線程的調用有兩種方式,分爲直接調用和繼承式調用,示例代碼以下:sql
1 #直接調用 2 import threading 3 import time 4 5 def sayhi(num): #定義每一個線程要運行的函數 6 7 print("running on number:%s" %num) 8 9 time.sleep(3) 10 11 if __name__ == '__main__': 12 13 t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例 14 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一個線程實例 15 16 t1.start() #啓動線程 17 t2.start() #啓動另外一個線程 18 19 print(t1.getName()) #獲取線程名 20 print(t2.getName()) 21 22 #繼承式調用 23 import threading 24 import time 25 26 27 class MyThread(threading.Thread): 28 def __init__(self,num): 29 threading.Thread.__init__(self) 30 self.num = num 31 32 def run(self):#定義每一個線程要運行的函數 33 34 print("running on number:%s" %self.num) 35 36 time.sleep(3) 37 38 if __name__ == '__main__': 39 40 t1 = MyThread(1) 41 t2 = MyThread(2) 42 t1.start() 43 t2.start()
join()方法在該線程對象啓動了以後調用線程的join()方法以後,那麼主線程將會阻塞在當前位置直到子線程執行完成才繼續往下走,若是全部子線程對象都調用了join()方法,那麼主線程將會在等待全部子線程都執行完以後再往下執行。編程
setDaemon(True)方法在子線程對象調用start()方法(啓動該線程)以前就調用的話,將會將該子線程設置成守護模式啓動,這是什麼意思呢?當子線程還在運行的時候,父線程已經執行完了,若是這個子線程設置是以守護模式啓動的,那麼隨着主線程執行完成退出時,子線程立馬也退出,若是沒有設置守護啓動子線程(也就是正常狀況下)的話,主線程執行完成以後,進程會等待全部子線程執行完成以後才退出。安全
示例代碼以下:服務器
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主線程') print(t.is_alive()) #查詢線程狀態
#1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束, #2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) #demo1 if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()以前設置 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True ''' #demo2 from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
其餘方法數據結構
Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
互斥鎖的產生是由於前面提到過多線程之間是共享同一塊內存地址的,也就是說多個不一樣的線程可以訪問同一個變量中的數據,那麼,當多個線程要修改這個變量,會產生什麼狀況呢?當多個線程修改同一個數據的時候,若是操做的時間夠短的話,能獲得咱們想要的結果,可是,若是修改數據不是原子性的(這中間的時間太長)的話。。。頗有可能形成數據的錯誤覆蓋,從而獲得咱們不想要的結果。多線程
這時候,就須要互斥鎖出場了,爲了讓臨界資源(會被多個線程同時訪問)可以實現按照咱們控制訪問,須要使用互斥鎖來鎖住臨界資源,當一個線程須要訪問臨界資源時先檢查這個資源有沒有被鎖住,若是沒有被鎖住,那麼訪問這個資源並同時給這個資源加上鎖,這樣別
的線程就沒法訪問該臨界資源了,直到這個線程訪問完了這個臨界資源以後,釋放這把鎖,其餘線程纔可以搶佔該臨界資源。這個,就是互斥鎖的概念。
from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n)
#不加鎖:併發執行,速度快,數據不安全 from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 ''' #不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全 from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的代碼併發運行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串行運行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 ''' #有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊 #沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是 #start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的 #單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高. from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 ''' )
若是公共的臨界資源比較多,而且線程間都使用互斥鎖去訪問臨界資源,那麼將有可能出現一個狀況:併發
這樣,線程1和線程2各執己見。。。結果就都卡死在這了,這就是線程死鎖的由來。。。
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程.
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['tom','jack','lucy']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
python中提供了一個方法(不止python中,基本上全部的語言中都支持這個方法)那就是遞歸鎖。遞歸鎖的建立是使用threading.RLock(),它裏面其實維護了兩個東西,一個是Lock,另外一個是counter,counter記錄了加鎖的次數,每加一把鎖,counter就會+1,釋放一次鎖counter就會減一,直到全部加的鎖都被釋放掉了以後其餘線程纔可以訪問這把鎖獲取資源。固然這個限制是對於線程之間的,同一個線程中,只要這個線程搶到了這把鎖,那麼這個線程就能夠對這把鎖加多個鎖,而不會阻塞本身的執行。這就是遞歸鎖的原理。
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['馬雲','馬化騰','馬蓉']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
from threading import Thread,Semaphore import threading import time # def func(): # if sm.acquire(): # print (threading.currentThread().getName() + ' get semaphore') # time.sleep(2) # sm.release() def func(): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
同進程的同樣
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。
import threading import time,random from threading import Thread,Event def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError('連接超時') print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1 print('<%s>連接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
使用隊列方法:
1 建立一個「隊列」對象 2 import Queue 3 q = Queue.Queue(maxsize = 10) 4 Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。 5 6 將一個值放入隊列中 7 q.put(10) 8 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲 9 1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。 10 11 將一個值從隊列中取出 12 q.get() 13 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。 14 15 Python Queue模塊有三種隊列及構造函數: 16 一、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 17 二、LIFO相似於堆,即先進後出。 class queue.LifoQueue(maxsize) 18 三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 19 20 此包中的經常使用方法(q = Queue.Queue()): 21 q.qsize() 返回隊列的大小 22 q.empty() 若是隊列爲空,返回True,反之False 23 q.full() 若是隊列滿了,返回True,反之False 24 q.full 與 maxsize 大小對應 25 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 26 q.get_nowait() 至關q.get(False) 27 非阻塞 q.put(item) 寫入隊列,timeout等待時間 28 q.put_nowait(item) 至關q.put(item, False) 29 q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 30 q.join() 實際上意味着等到隊列爲空,再執行別的操做
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''