線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。python
進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體。程序是指令、數據及其組織形式的描述,進程是程序的實體。安全
全局解釋器鎖又叫作GIL多線程
python目前有不少解釋器,目前使用最普遍的是CPython,還有PYPY和JPython等解釋器,可是使用最普遍的仍是CPython解釋器,而對於全局解釋器鎖來講,就是在CPython上面纔有的,它的原理是在解釋器層面加上一把大鎖,保證同一時刻只能有一個python線程在解釋器中執行。併發
對於計算密集型的python多線程來講,沒法利用到多線程帶來的效果, 在2.7時計算密集型的python多線程執行效率比順序執行的效率還低的多,在python3.5中對這種狀況進行了優化,基本能實現這種多線程執行時間和順序執行時間差很少的效果。app
對於I/O密集型的python多線程來講,GIL的影響不是很大,由於I/O密集型的python多線程進程,每一個線程在等待I/O的時候,將會釋放GIL資源,供別的線程來搶佔。因此對於I/O密集型的python多線程進程來講,仍是能比順序執行的效率要高的。dom
python的GIL這個東西。。。比較噁心,可是因爲CPython解釋器中的不少東西都是依賴這個東西開發的,若是改的話,將是一件浩大的工程。。。因此到如今仍是存在這個問題,這也是python最爲別人所詬病的地方。。。ide
線程的調用有兩種方式,分爲直接調用和繼承式調用,示例代碼以下:函數
1 #直接調用 2 import threading 3 import time 4 5 def sayhi(num): #定義每一個線程要運行的函數 6 7 print("running on number:%s" %num) 8 9 time.sleep(3) 10 11 if __name__ == '__main__': 12 13 t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例 14 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一個線程實例 15 16 t1.start() #啓動線程 17 t2.start() #啓動另外一個線程 18 19 print(t1.getName()) #獲取線程名 20 print(t2.getName()) 21 22 #繼承式調用 23 import threading 24 import time 25 26 27 class MyThread(threading.Thread): 28 def __init__(self,num): 29 threading.Thread.__init__(self) 30 self.num = num 31 32 def run(self):#定義每一個線程要運行的函數 33 34 print("running on number:%s" %self.num) 35 36 time.sleep(3) 37 38 if __name__ == '__main__': 39 40 t1 = MyThread(1) 41 t2 = MyThread(2) 42 t1.start() 43 t2.start()
能夠看到直接調用是導入threading模塊並定義一個函數,以後實例化threading.Thread類的時候,將剛定義的函數名經過target參數傳遞進去,而後調用實例的start()方法啓動一個線程。優化
而繼承式調用是建立一個類繼承自threading.Thread類,並在構造方法中調用父類的構造方法,以後重寫run方法,run方法中就是每一個線程起來以後執行的內容,就相似於前面經過target參數傳遞進去的函數。以後以這個繼承的類來建立對象,並執行對象的start()方法啓動一個線程。ui
從這裏能夠看出,其實。。。直接調用經過使用target參數把函數帶進類裏面以後應該是用這個函數替代了run方法。
join()方法在該線程對象啓動了以後調用線程的join()方法以後,那麼主線程將會阻塞在當前位置直到子線程執行完成才繼續往下走,若是全部子線程對象都調用了join()方法,那麼主線程將會在等待全部子線程都執行完以後再往下執行。
setDaemon(True)方法在子線程對象調用start()方法(啓動該線程)以前就調用的話,將會將該子線程設置成守護模式啓動,這是什麼意思呢?當子線程還在運行的時候,父線程已經執行完了,若是這個子線程設置是以守護模式啓動的,那麼隨着主線程執行完成退出時,子線程立馬也退出,若是沒有設置守護啓動子線程(也就是正常狀況下)的話,主線程執行完成以後,進程會等待全部子線程執行完成以後才退出。
示例代碼以下:
1 import threading 2 from time import ctime,sleep 3 import time 4 5 def music(func): 6 for i in range(2): 7 print ("Begin listening to %s. %s" %(func,ctime())) 8 sleep(4) 9 print("end listening %s"%ctime()) 10 11 def move(func): 12 for i in range(2): 13 print ("Begin watching at the %s! %s" %(func,ctime())) 14 sleep(5) 15 print('end watching %s'%ctime()) 16 17 threads = [] 18 t1 = threading.Thread(target=music,args=('七里香',)) 19 threads.append(t1) 20 t2 = threading.Thread(target=move,args=('阿甘正傳',)) 21 threads.append(t2) 22 23 if __name__ == '__main__': 24 25 for t in threads: 26 # t.setDaemon(True) 27 t.start() 28 # t.join() 29 # t1.join() 30 t2.join()########考慮這三種join位置下的結果? 31 print ("all over %s" %ctime())
互斥鎖的產生是由於前面提到過多線程之間是共享同一塊內存地址的,也就是說多個不一樣的線程可以訪問同一個變量中的數據,那麼,當多個線程要修改這個變量,會產生什麼狀況呢?當多個線程修改同一個數據的時候,若是操做的時間夠短的話,能獲得咱們想要的結果,可是,若是修改數據不是原子性的(這中間的時間太長)的話。。。頗有可能形成數據的錯誤覆蓋,從而獲得咱們不想要的結果。例子以下:
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每一個線程中都獲取這個全局變量 6 # num-=1 # 若是是這種狀況,那麼操做時間足夠短,相似於原子操做了,因此,可以獲得咱們想要的結果 7 8 temp=num 9 print('--get num:',num ) # 由於print會調用終端輸出,終端是一個設備,至關於要等待終端I/O就緒以後才能輸出打印內容,在等待終端I/O的過程當中,該線程已經掛起。。。這時其餘線程獲取到的是沒被改變以前的num值,以後該線程I/O就緒以後切換回來,對num-1了,其餘線程在I/O就緒以後也在沒被改變以前的num基礎上減一,這樣。。。就獲得了咱們不想看到的結果。。。 10 #time.sleep(0.1) # sleep也能達到相同的效果,執行到sleep時,該線程直接進入休眠狀態,釋放了GIL直到sleep時間過去。 11 num =temp-1 #對此公共變量進行-1操做 12 13 14 num = 100 #設定一個共享變量 15 thread_list = [] 16 for i in range(100): 17 t = threading.Thread(target=addNum) 18 t.start() 19 thread_list.append(t) 20 21 for t in thread_list: #等待全部線程執行完畢 22 t.join() 23 24 print('final num:', num ) 25 這時候,就須要互斥鎖出場了,前面出現的num能夠被稱做臨界資源(會被多個線程同時訪問),爲了讓臨界資源可以實現按照咱們控制訪問,須要使用互斥鎖來鎖住臨界資源,當一個線程須要訪問臨界資源時先檢查這個資源有沒有被鎖住,若是沒有被鎖住,那麼訪問這個資源並同時給這個資源加上鎖,這樣別的線程就沒法訪問該臨界資源了,直到這個線程訪問完了這個臨界資源以後,釋放這把鎖,其餘線程纔可以搶佔該臨界資源。這個,就是互斥鎖的概念。 示例代碼: 26 27 import time 28 import threading 29 30 def addNum(): 31 global num #在每一個線程中都獲取這個全局變量 32 # num-=1 33 lock.acquire() # 檢查互斥鎖,若是沒鎖住,則鎖住並往下執行,若是檢查到鎖住了,則掛起等待鎖被釋放時再搶佔。 34 temp=num 35 print('--get num:',num ) 36 #time.sleep(0.1) 37 num =temp-1 #對此公共變量進行-1操做 38 lock.release() # 釋放該鎖 39 40 num = 100 #設定一個共享變量 41 thread_list = [] 42 lock=threading.Lock() # 定義互斥鎖 43 44 for i in range(100): 45 t = threading.Thread(target=addNum) 46 t.start() 47 thread_list.append(t) 48 49 for t in thread_list: #等待全部線程執行完畢 50 t.join() 51 52 print('final num:', num )
互斥鎖與GIL的關係?
Python的線程在GIL的控制之下,線程之間,對整個python解釋器,對python提供的C API的訪問都是互斥的,這能夠看做是Python內核級的互斥機制。可是這種互斥是咱們不能控制的,咱們還須要另一種可控的互斥機制———用戶級互斥。內核級經過互斥保護了內核的共享資源,一樣,用戶級互斥保護了用戶程序中的共享資源。
GIL 的做用是:對於一個解釋器,只能有一個thread在執行bytecode。因此每時每刻只有一條bytecode在被執行一個thread。GIL保證了bytecode 這層面上是thread safe的。 可是若是你有個操做好比 x += 1,這個操做須要多個bytecodes操做,在執行這個操做的多條bytecodes期間的時候可能中途就換thread了,這樣就出現了data races的狀況了。
若是公共的臨界資源比較多,而且線程間都使用互斥鎖去訪問臨界資源,那麼將有可能出現一個狀況:
這樣,線程1和線程2各執己見。。。結果就都卡死在這了,這就是線程死鎖的由來。。。
示例代碼以下:
1 import threading,time 2 3 class myThread(threading.Thread): 4 def doA(self): 5 lockA.acquire() # 鎖住A資源 6 print(self.name,"gotlockA",time.ctime()) 7 time.sleep(3) 8 lockB.acquire() # 鎖住B資源 9 print(self.name,"gotlockB",time.ctime()) 10 lockB.release() # 解鎖B資源 11 lockA.release() # 解鎖A資源 12 13 def doB(self): 14 lockB.acquire() 15 print(self.name,"gotlockB",time.ctime()) 16 time.sleep(2) 17 lockA.acquire() 18 print(self.name,"gotlockA",time.ctime()) 19 lockA.release() 20 lockB.release() 21 def run(self): 22 self.doA() 23 self.doB() 24 if __name__=="__main__": 25 26 lockA=threading.Lock() 27 lockB=threading.Lock() 28 threads=[] 29 for i in range(5): 30 threads.append(myThread()) 31 for t in threads: 32 t.start() 33 for t in threads: 34 t.join() # 等待線程結束
那麼,怎麼解決這個問題呢?python中提供了一個方法(不止python中,基本上全部的語言中都支持這個方法)那就是遞歸鎖。遞歸鎖的建立是使用threading.RLock(),它裏面其實維護了兩個東西,一個是Lock,另外一個是counter,counter記錄了加鎖的次數,每加一把鎖,counter就會+1,釋放一次鎖counter就會減一,直到全部加的鎖都被釋放掉了以後其餘線程纔可以訪問這把鎖獲取資源。固然這個限制是對於線程之間的,同一個線程中,只要這個線程搶到了這把鎖,那麼這個線程就能夠對這把鎖加多個鎖,而不會阻塞本身的執行。這就是遞歸鎖的原理。
示例代碼:
1 import time 2 import threading 3 class Account: 4 def __init__(self, _id, balance): 5 self.id = _id 6 self.balance = balance 7 self.lock = threading.RLock() 8 9 def withdraw(self, amount): 10 with self.lock: # 會將包裹的這塊代碼用鎖保護起來,直到這塊代碼執行完成以後,這把鎖就會被釋放掉 11 self.balance -= amount 12 13 def deposit(self, amount): 14 with self.lock: 15 self.balance += amount 16 17 18 def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的場景 19 20 with self.lock: 21 interest=0.05 22 count=amount+amount*interest 23 24 self.withdraw(count) 25 26 27 def transfer(_from, to, amount): 28 29 #鎖不能夠加在這裏 由於其餘的線程執行的其它方法在不加鎖的狀況下數據一樣是不安全的 30 _from.withdraw(amount) 31 to.deposit(amount) 32 33 alex = Account('alex',1000) 34 yuan = Account('yuan',1000) 35 36 t1=threading.Thread(target = transfer, args = (alex,yuan, 100)) 37 t1.start() 38 39 t2=threading.Thread(target = transfer, args = (yuan,alex, 200)) 40 t2.start() 41 42 t1.join() 43 t2.join() 44 45 print('>>>',alex.balance) 46 print('>>>',yuan.balance)
有一類線程須要知足條件以後纔可以繼續執行,Python提供了threading.Condition 對象用於條件變量線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。
lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項,不傳人鎖,對象自動建立一個RLock()。
示例代碼:
1 import threading,time 2 from random import randint 3 class Producer(threading.Thread): 4 def run(self): 5 global L 6 while True: 7 val=randint(0,100) 8 print('生產者',self.name,":Append"+str(val),L) 9 if lock_con.acquire(): 10 L.append(val) 11 lock_con.notify() 12 lock_con.release() 13 time.sleep(3) 14 class Consumer(threading.Thread): 15 def run(self): 16 global L 17 while True: 18 lock_con.acquire() 19 if len(L)==0: 20 lock_con.wait() 21 print('消費者',self.name,":Delete"+str(L[0]),L) 22 del L[0] 23 lock_con.release() 24 time.sleep(0.25) 25 26 if __name__=="__main__": 27 28 L=[] 29 lock_con=threading.Condition() 30 threads=[] 31 for i in range(5): 32 threads.append(Producer()) 33 threads.append(Consumer()) 34 for t in threads: 35 t.start() 36 for t in threads: 37 t.join()
條件同步和條件變量同步差很少意思,只是少了鎖功能,由於條件同步設計於不訪問共享資源的條件環境。event=threading.Event():條件環境對象,初始值 爲False;
例子1:
1 import threading,time 2 class Boss(threading.Thread): 3 def run(self): 4 print("BOSS:今晚你們都要加班到22:00。") 5 event.isSet() or event.set() 6 time.sleep(5) 7 print("BOSS:<22:00>能夠下班了。") 8 event.isSet() or event.set() 9 class Worker(threading.Thread): 10 def run(self): 11 event.wait() 12 print("Worker:哎……命苦啊!") 13 time.sleep(0.25) 14 event.clear() 15 event.wait() 16 print("Worker:OhYeah!") 17 if __name__=="__main__": 18 event=threading.Event() 19 threads=[] 20 for i in range(5): 21 threads.append(Worker()) 22 threads.append(Boss()) 23 for t in threads: 24 t.start() 25 for t in threads: 26 t.join()
例子2:
1 import threading,time 2 import random 3 def light(): 4 if not event.isSet(): 5 event.set() #wait就不阻塞 #綠燈狀態 6 count = 0 7 while True: 8 if count < 10: 9 print('\033[42;1m--green light on---\033[0m') 10 elif count <13: 11 print('\033[43;1m--yellow light on---\033[0m') 12 elif count <20: 13 if event.isSet(): 14 event.clear() 15 print('\033[41;1m--red light on---\033[0m') 16 else: 17 count = 0 18 event.set() #打開綠燈 19 time.sleep(1) 20 count +=1 21 def car(n): 22 while 1: 23 time.sleep(random.randrange(10)) 24 if event.isSet(): #綠燈 25 print("car [%s] is running.." % n) 26 else: 27 print("car [%s] is waiting for the red light.." %n) 28 if __name__ == '__main__': 29 event = threading.Event() 30 Light = threading.Thread(target=light) 31 Light.start() 32 for i in range(3): 33 t = threading.Thread(target=car,args=(i,)) 34 t.start()
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
例子:
1 import threading,time 2 class myThread(threading.Thread): 3 def run(self): 4 if semaphore.acquire(): 5 print(self.name) 6 time.sleep(5) 7 semaphore.release() 8 if __name__=="__main__": 9 semaphore=threading.Semaphore(5) 10 thrs=[] 11 for i in range(100): 12 thrs.append(myThread()) 13 for t in thrs: 14 t.start()
使用隊列方法:
1 建立一個「隊列」對象 2 import Queue 3 q = Queue.Queue(maxsize = 10) 4 Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。 5 6 將一個值放入隊列中 7 q.put(10) 8 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲 9 1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。 10 11 將一個值從隊列中取出 12 q.get() 13 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。 14 15 Python Queue模塊有三種隊列及構造函數: 16 一、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 17 二、LIFO相似於堆,即先進後出。 class queue.LifoQueue(maxsize) 18 三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 19 20 此包中的經常使用方法(q = Queue.Queue()): 21 q.qsize() 返回隊列的大小 22 q.empty() 若是隊列爲空,返回True,反之False 23 q.full() 若是隊列滿了,返回True,反之False 24 q.full 與 maxsize 大小對應 25 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 26 q.get_nowait() 至關q.get(False) 27 非阻塞 q.put(item) 寫入隊列,timeout等待時間 28 q.put_nowait(item) 至關q.put(item, False) 29 q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 30 q.join() 實際上意味着等到隊列爲空,再執行別的操做
例子集合:
1 #例子1: 2 import threading,queue 3 from time import sleep 4 from random import randint 5 class Production(threading.Thread): 6 def run(self): 7 while True: 8 r=randint(0,100) 9 q.put(r) 10 print("生產出來%s號包子"%r) 11 sleep(1) 12 class Proces(threading.Thread): 13 def run(self): 14 while True: 15 re=q.get() 16 print("吃掉%s號包子"%re) 17 if __name__=="__main__": 18 q=queue.Queue(10) 19 threads=[Production(),Production(),Production(),Proces()] 20 for t in threads: 21 t.start() 22 23 #例子2 24 import time,random 25 import queue,threading 26 q = queue.Queue() 27 def Producer(name): 28 count = 0 29 while count <20: 30 time.sleep(random.randrange(3)) 31 q.put(count) 32 print('Producer %s has produced %s baozi..' %(name, count)) 33 count +=1 34 def Consumer(name): 35 count = 0 36 while count <20: 37 time.sleep(random.randrange(4)) 38 if not q.empty(): 39 data = q.get() 40 print(data) 41 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 42 else: 43 print("-----no baozi anymore----") 44 count +=1 45 p1 = threading.Thread(target=Producer, args=('A',)) 46 c1 = threading.Thread(target=Consumer, args=('B',)) 47 p1.start() 48 c1.start() 49 50 #例子3 51 #實現一個線程不斷生成一個隨機數到一個隊列中(考慮使用Queue這個模塊) 52 # 實現一個線程從上面的隊列裏面不斷的取出奇數 53 # 實現另一個線程從上面的隊列裏面不斷取出偶數 54 55 import random,threading,time 56 from queue import Queue 57 #Producer thread 58 class Producer(threading.Thread): 59 def __init__(self, t_name, queue): 60 threading.Thread.__init__(self,name=t_name) 61 self.data=queue 62 def run(self): 63 for i in range(10): #隨機產生10個數字 ,能夠修改成任意大小 64 randomnum=random.randint(1,99) 65 print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)) 66 self.data.put(randomnum) #將數據依次存入隊列 67 time.sleep(1) 68 print ("%s: %s finished!" %(time.ctime(), self.getName())) 69 70 #Consumer thread 71 class Consumer_even(threading.Thread): 72 def __init__(self,t_name,queue): 73 threading.Thread.__init__(self,name=t_name) 74 self.data=queue 75 def run(self): 76 while 1: 77 try: 78 val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超時5秒 79 if val_even%2==0: 80 print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)) 81 time.sleep(2) 82 else: 83 self.data.put(val_even) 84 time.sleep(2) 85 except: #等待輸入,超過5秒 就報異常 86 print ("%s: %s finished!" %(time.ctime(),self.getName())) 87 break 88 class Consumer_odd(threading.Thread): 89 def __init__(self,t_name,queue): 90 threading.Thread.__init__(self, name=t_name) 91 self.data=queue 92 def run(self): 93 while 1: 94 try: 95 val_odd = self.data.get(1,5) 96 if val_odd%2!=0: 97 print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)) 98 time.sleep(2) 99 else: 100 self.data.put(val_odd) 101 time.sleep(2) 102 except: 103 print ("%s: %s finished!" % (time.ctime(), self.getName())) 104 break 105 #Main thread 106 def main(): 107 queue = Queue() 108 producer = Producer('Pro.', queue) 109 consumer_even = Consumer_even('Con_even.', queue) 110 consumer_odd = Consumer_odd('Con_odd.',queue) 111 producer.start() 112 consumer_even.start() 113 consumer_odd.start() 114 producer.join() 115 consumer_even.join() 116 consumer_odd.join() 117 print ('All threads terminate!') 118 119 if __name__ == '__main__': 120 main() 121 122 #注意:列表是線程不安全的 123 #例子4 124 import threading,time 125 126 li=[1,2,3,4,5] 127 128 def pri(): 129 while li: 130 a=li[-1] 131 print(a) 132 time.sleep(1) 133 try: 134 li.remove(a) 135 except: 136 print('----',a) 137 138 t1=threading.Thread(target=pri,args=()) 139 t1.start()
t2=threading.Thread(target=pri,args=()) t2.start()