線程(thread)是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。python
因爲GIL(Global Interpreter Lock)的存在,python並不能真的實現並行,只能同時容許一個進程運行。GIL是CPython解釋器的概念,並非python的缺陷。安全
threading模塊多線程
python中經過threading模塊實現多線程併發
線程的2種調用方式app
直接調用dom
threading.Thread(target=sayhi, args=(1, )) target的值是函數名,args傳入的參數,元組形式函數
1 import threading 2 import time 3 4 def sayhi(num): #定義每一個線程要運行的函數 5 6 print("running on number:%s" %num) 7 8 time.sleep(3) 9 10 if __name__ == '__main__': 11 12 t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例 13 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一個線程實例 14 15 t1.start() #啓動線程 16 t2.start() #啓動另外一個線程 17 18 print(t1.getName()) #獲取線程名 19 print(t2.getName()) 20 ####### 結果 ########### 21 running on numbei: 1 22 running on numbei: 2 23 Thread-1 24 Thread-2
繼承式調用ui
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 def __init__(self,num): 7 threading.Thread.__init__(self) 8 self.num = num 9 10 def run(self):#定義每一個線程要運行的函數 11 12 print("running on number:%s" %self.num) 13 14 time.sleep(3) 15 16 if __name__ == '__main__': 17 18 t1 = MyThread(1) 19 t2 = MyThread(2) 20 t1.start() 21 t2.start()
join & Daemonspa
1 import threading 2 from time import ctime, sleep 3 4 5 def music(func): 6 for i in range(2): 7 print("Begin listening to %s. %s" % (func, ctime())) 8 sleep(2) 9 print("end listening %s" % ctime()) 10 11 12 def move(func): 13 for i in range(2): 14 print("Begin watching at the %s! %s" % (func, ctime())) 15 sleep(3) 16 print('end watching %s' % ctime()) 17 18 threads = [] 19 t1 = threading.Thread(target=music, args=('七里香',)) 20 threads.append(t1) 21 t2 = threading.Thread(target=move, args=('阿甘正傳',)) 22 threads.append(t2) 23 24 if __name__ == '__main__': 25 26 for t in threads: 27 # t.setDaemon(True) 28 t.start() 29 # t.join() 30 # t1.join() 31 t2.join() # 考慮這三種join位置下的結果? 32 print("all over %s" % ctime())
join():操作系統
在子線程完成以前,這個子線程的父線程將一直被阻塞。
setDaemon(True):
將線程聲明爲守護線程,必須在start() 方法調用以前設置, 若是不設置爲守護線程程序會被無限掛起。這個方法基本和join是相反的。當咱們 在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程 就分兵兩路,分別運行,那麼當主線程完成想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是 只要主線程完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠 用setDaemon方法啦
join以子線程爲主判斷,setDaemon(True)以主線程爲主判斷
thread 模塊提供的其餘方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。 # 除了使用方法外,線程模塊一樣提供了Thread類來處理線程,Thread類提供瞭如下方法: # run(): 用以表示線程活動的方法。 # start():啓動線程活動。 # join([time]): 等待至線程停止。這阻塞調用線程直至線程的join() 方法被調用停止-正常退出或者拋出未處理的異常-或者是可選的超時發生。 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。
同步鎖
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每一個線程中都獲取這個全局變量 6 # num-=1 7 8 temp=num 9 print('--get num:',num ) 10 #time.sleep(0.1) 11 num =temp-1 #對此公共變量進行-1操做 12 13 14 num = 100 #設定一個共享變量 15 thread_list = [] 16 for i in range(100): 17 t = threading.Thread(target=addNum) 18 t.start() 19 thread_list.append(t) 20 21 for t in thread_list: #等待全部線程執行完畢 22 t.join() 23 24 print('final num:', num )
多個線程同時操做同一個共享資源,因此形成了資源破壞,使用join時會把整個線程停住,形成了串行,失去了多線程的意義,因此要把計算的時候串行
使用同步鎖
threading.Lock()
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每一個線程中都獲取這個全局變量 6 # num-=1 7 lock.acquire() # 鎖開始 8 temp=num 9 print('--get num:',num ) 10 #time.sleep(0.1) 11 num =temp-1 #對此公共變量進行-1操做 12 lock.release() # 鎖結束 13 14 num = 100 #設定一個共享變量 15 thread_list = [] 16 lock=threading.Lock() 17 18 for i in range(100): 19 t = threading.Thread(target=addNum) 20 t.start() 21 thread_list.append(t) 22 23 for t in thread_list: #等待全部線程執行完畢 24 t.join() 25 26 print('final num:', num )
鎖中的內容一次只容許一個進程執行
線程死鎖和遞歸鎖
在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都正在使用,全部這兩個線程在無外力做用下將一直等待下去。下面是一個死鎖的例子:
1 import threading,time 2 3 class myThread(threading.Thread): 4 def doA(self): 5 lockA.acquire() 6 print(self.name,"gotlockA",time.ctime()) 7 time.sleep(3) 8 lockB.acquire() 9 print(self.name,"gotlockB",time.ctime()) 10 lockB.release() 11 lockA.release() 12 13 def doB(self): 14 lockB.acquire() 15 print(self.name,"gotlockB",time.ctime()) 16 time.sleep(2) 17 lockA.acquire() 18 print(self.name,"gotlockA",time.ctime()) 19 lockA.release() 20 lockB.release() 21 def run(self): 22 self.doA() 23 self.doB() 24 if __name__=="__main__": 25 26 lockA=threading.Lock() 27 lockB=threading.Lock() 28 threads=[] 29 for i in range(5): 30 threads.append(myThread()) 31 for t in threads: 32 t.start() 33 for t in threads: 34 t.join()#等待線程結束,後面再講。
爲了支持在同一進程中屢次請求同一資源,使用「可重入鎖」
threading.RLock()
1 import time 2 3 import threading 4 5 class Account: 6 def __init__(self, _id, balance): 7 self.id = _id 8 self.balance = balance 9 self.lock = threading.RLock() 10 11 def withdraw(self, amount): 12 13 with self.lock: 14 self.balance -= amount 15 16 def deposit(self, amount): 17 with self.lock: 18 self.balance += amount 19 20 21 def drawcash(self, amount): # lock.acquire中嵌套lock.acquire的場景 22 23 with self.lock: 24 interest=0.05 25 count=amount+amount*interest 26 27 self.withdraw(count) 28 29 30 def transfer(_from, to, amount): 31 32 # 鎖不能夠加在這裏 由於其餘的其它線程執行的其它方法在不加鎖的狀況下數據一樣是不安全的 33 _from.withdraw(amount) 34 35 to.deposit(amount) 36 37 38 39 alex = Account('alex',1000) 40 yuan = Account('yuan',1000) 41 42 t1=threading.Thread(target = transfer, args = (alex,yuan, 100)) 43 t1.start() 44 45 t2=threading.Thread(target = transfer, args = (yuan,alex, 200)) 46 t2.start() 47 48 t1.join() 49 t2.join() 50 51 print('>>>',alex.balance) 52 print('>>>',yuan.balance)
條件變量同步(Condition)
有一些線程須要知足條件後才能繼續執行python提供了threading.Condition對象用於條件變量線程的支持
lock_con = threading.Condition([Lock/Rlock]) 鎖是可選項,不穿入鎖,對象自動建立一個RLock
wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試得到鎖定(進入鎖定池);其餘線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。激活時從鎖的acquire開始執行,而不是從wait開始
notifyAll(): 調用這個方法將通知等待池中全部的線程,這些線程都將進入鎖定池嘗試得到鎖定。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
實例
1 import threading,time 2 from random import randint 3 class Producer(threading.Thread): 4 def run(self): 5 global L 6 while True: 7 val=randint(0,100) 8 print('生產者',self.name,":Append"+str(val),L) 9 if lock_con.acquire(): 10 L.append(val) 11 lock_con.notify() # 通知wait() 12 lock_con.release() 13 time.sleep(3) 14 class Consumer(threading.Thread): 15 def run(self): 16 global L 17 while True: 18 lock_con.acquire() 19 if len(L)==0: 20 lock_con.wait() # 等待notify() 通知 21 print('消費者',self.name,":Delete"+str(L[0]),L) 22 del L[0] 23 lock_con.release() 24 time.sleep(0.25) 25 26 if __name__=="__main__": 27 28 L=[] 29 lock_con=threading.Condition() 30 threads=[] 31 for i in range(5): 32 threads.append(Producer()) 33 threads.append(Consumer()) 34 for t in threads: 35 t.start() 36 for t in threads: 37 t.join()
多線程通訊
同步條件(Event)
條件同步和條件變量同步意思差很少,只是不能加鎖
event = threading.Event() 條件環境對象,初始值爲false
event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。
1 import threading,time 2 class Boss(threading.Thread): 3 def run(self): 4 print("BOSS:今晚你們都要加班到22:00。") 5 event.isSet() or event.set() 6 time.sleep(5) 7 print("BOSS:<22:00>能夠下班了。") 8 event.isSet() or event.set() 9 class Worker(threading.Thread): 10 def run(self): 11 event.wait() 12 print("Worker:哎……命苦啊!") 13 time.sleep(0.25) 14 event.clear() 15 event.wait() 16 print("Worker:OhYeah!") 17 if __name__=="__main__": 18 event=threading.Event() 19 threads=[] 20 for i in range(5): 21 threads.append(Worker()) 22 threads.append(Boss()) 23 for t in threads: 24 t.start() 25 for t in threads: 26 t.join()
信號量(Semaphore)
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
1 import threading,time 2 class myThread(threading.Thread): 3 def run(self): 4 if semaphore.acquire(): 5 print(self.name) 6 time.sleep(5) 7 semaphore.release() 8 if __name__=="__main__": 9 semaphore=threading.Semaphore(5) 10 thrs=[] 11 for i in range(100): 12 thrs.append(myThread()) 13 for t in thrs: 14 t.start()
多線程利器(queue)
建立一個「隊列」對象 import queue q = queue.Queue(maxsize = 10) queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。 將一個值放入隊列中 q.put(10) 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲 1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。 將一個值從隊列中取出 q.get() 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。 Python Queue模塊有三種隊列及構造函數: 一、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 二、LIFO相似於堆,即先進後出。 class queue.LifoQueue(maxsize) 三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 此包中的經常使用方法(q = queue.Queue()): q.qsize() 返回隊列的大小 q.empty() 若是隊列爲空,返回True,反之False q.full() 若是隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 至關q.get(False) 非阻塞 q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 至關q.put(item, False) q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列爲空,再執行別的操做
相似列表,不過列表在多線程裏不安全