1、死鎖現象與遞歸鎖
進程也是有死鎖的
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,python
它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,mysql
以下就是死鎖sql
1 死鎖------------------- 2 from threading import Thread,Lock,RLock 3 import time 4 mutexA = Lock() 5 mutexB = Lock() 6 class MyThread(Thread): 7 def run(self): 8 self.f1() 9 self.f2() 10 def f1(self): 11 mutexA.acquire() 12 print('\033[33m%s 拿到A鎖 '%self.name) 13 mutexB.acquire() 14 print('\033[45%s 拿到B鎖 '%self.name) 15 mutexB.release() 16 mutexA.release() 17 def f2(self): 18 mutexB.acquire() 19 print('\033[33%s 拿到B鎖 ' % self.name) 20 time.sleep(1) #睡一秒就是爲了保證A鎖已經被別人那到了 21 mutexA.acquire() 22 print('\033[45m%s 拿到B鎖 ' % self.name) 23 mutexA.release() 24 mutexB.release() 25 if __name__ == '__main__': 26 for i in range(10): 27 t = MyThread() 28 t.start() #一開啓就會去調用run方法
那麼怎麼解決死鎖現象呢?
解決方法,遞歸鎖:在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。數據庫
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。服務器
直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖多線程
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,
即counter遞減到0爲止
1 # 2.解決死鎖的方法--------------遞歸鎖 2 from threading import Thread,Lock,RLock 3 import time 4 mutexB = mutexA = RLock() 5 class MyThread(Thread): 6 def run(self): 7 self.f1() 8 self.f2() 9 def f1(self): 10 mutexA.acquire() 11 print('\033[33m%s 拿到A鎖 '%self.name) 12 mutexB.acquire() 13 print('\033[45%s 拿到B鎖 '%self.name) 14 mutexB.release() 15 mutexA.release() 16 def f2(self): 17 mutexB.acquire() 18 print('\033[33%s 拿到B鎖 ' % self.name) 19 time.sleep(1) #睡一秒就是爲了保證A鎖已經被別人拿到了 20 mutexA.acquire() 21 print('\033[45m%s 拿到B鎖 ' % self.name) 22 mutexA.release() 23 mutexB.release() 24 if __name__ == '__main__': 25 for i in range(10): 26 t = MyThread() 27 t.start() #一開啓就會去調用run方法
2、信號量Semaphore(其實也是一把鎖)
Semaphore管理一個內置的計數器併發
Semaphore與進程池看起來相似,可是是徹底不一樣的概念。app
進程池:Pool(4),最大隻能產生四個進程,並且從頭至尾都只是這四個進程,不會產生新的。dom
信號量:信號量是產生的一堆進程/線程,即產生了多個任務都去搶那一把鎖ide
1 from threading import Thread,Semaphore,currentThread 2 import time,random 3 sm = Semaphore(5) #運行的時候有5我的 4 def task(): 5 sm.acquire() 6 print('\033[42m %s上廁所'%currentThread().getName()) 7 time.sleep(random.randint(1,3)) 8 print('\033[31m %s上完廁所走了'%currentThread().getName()) 9 sm.release() 10 if __name__ == '__main__': 11 for i in range(20): #開了10個線程 ,這20人都要上廁所 12 t = Thread(target=task) 13 t.start()
1 hread-1上廁所 2 Thread-2上廁所 3 Thread-3上廁所 4 Thread-4上廁所 5 Thread-5上廁所 6 Thread-3上完廁所走了 7 Thread-6上廁所 8 Thread-1上完廁所走了 9 Thread-7上廁所 10 Thread-2上完廁所走了 11 Thread-8上廁所 12 Thread-6上完廁所走了 13 Thread-5上完廁所走了 14 Thread-4上完廁所走了 15 Thread-9上廁所 16 Thread-10上廁所 17 Thread-11上廁所 18 Thread-9上完廁所走了 19 Thread-12上廁所 20 Thread-7上完廁所走了 21 Thread-13上廁所 22 Thread-10上完廁所走了 23 Thread-8上完廁所走了 24 Thread-14上廁所 25 Thread-15上廁所 26 Thread-12上完廁所走了 27 Thread-11上完廁所走了 28 Thread-16上廁所 29 Thread-17上廁所 30 Thread-14上完廁所走了 31 Thread-15上完廁所走了 32 Thread-17上完廁所走了 33 Thread-18上廁所 34 Thread-19上廁所 35 Thread-20上廁所 36 Thread-13上完廁所走了 37 Thread-20上完廁所走了 38 Thread-16上完廁所走了 39 Thread-18上完廁所走了 40 Thread-19上完廁所走了
3、Event
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
1
2
3
4
5
|
from
threading
import
Event
Event.isSet()
#返回event的狀態值
Event.wait()
#若是 event.isSet()==False將阻塞線程;
Event.
set
()
#設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
Event.clear()
#恢復
|
例如1.,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做
1 #首先定義兩個函數,一個是鏈接數據庫 2 # 一個是檢測數據庫 3 from threading import Thread,Event,currentThread 4 import time 5 e = Event() 6 def conn_mysql(): 7 '''連接數據庫''' 8 count = 1 9 while not e.is_set(): #當沒有檢測到時候 10 if count >3: #若是嘗試次數大於3,就主動拋異常 11 raise ConnectionError('嘗試連接的次數過多') 12 print('\033[45m%s 第%s次嘗試'%(currentThread(),count)) 13 e.wait(timeout=1) #等待檢測(裏面的參數是超時1秒) 14 count+=1 15 print('\033[44m%s 開始連接...'%(currentThread().getName())) 16 def check_mysql(): 17 '''檢測數據庫''' 18 print('\033[42m%s 檢測mysql...' % (currentThread().getName())) 19 time.sleep(5) 20 e.set() 21 if __name__ == '__main__': 22 for i in range(3): #三個去連接 23 t = Thread(target=conn_mysql) 24 t.start() 25 t = Thread(target=check_mysql) 26 t.start()
2.例如2,紅綠燈的例子
1 from threading import Thread,Event,currentThread 2 import time 3 e = Event() 4 def traffic_lights(): 5 '''紅綠燈''' 6 time.sleep(5) 7 e.set() 8 def car(): 9 '''車''' 10 print('\033[42m %s 等綠燈\033[0m'%currentThread().getName()) 11 e.wait() 12 print('\033[44m %s 車開始通行' % currentThread().getName()) 13 if __name__ == '__main__': 14 for i in range(10): 15 t = Thread(target=car) #10輛車 16 t.start() 17 traffic_thread = Thread(target=traffic_lights) #一個紅綠燈 18 traffic_thread.start()
4、定時器(Timer)
指定n秒後執行某操做
from threading import Timer def func(n): print('hello,world',n) t = Timer(3,func,args=(123,)) #等待三秒後執行func函數,由於func函數有參數,那就再傳一個參數進去 t.start()
5、線程queue
queue隊列 :使用import queue,用法與進程Queue同樣
queue.
Queue
(maxsize=0) #先進先出
1 # 1.隊列----------- 2 import queue 3 q = queue.Queue(3) #先進先出 4 q.put('first') 5 q.put('second') 6 q.put('third') 7 print(q.get()) 8 print(q.get()) 9 print(q.get())
queue.
LifoQueue
(maxsize=0)#先進後出
1 # 2.堆棧---------- 2 q = queue.LifoQueue() #先進後出(或者後進先出) 3 q.put('first') 4 q.put('second') 5 q.put('third') 6 q.put('for') 7 print(q.get()) 8 print(q.get()) 9 print(q.get())
queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
1 # ---------------- 2 '''3.put進入一個元組,元組的第一個元素是優先級 3 (一般也能夠是數字,或者也能夠是非數字之間的比較) 4 數字越小,優先級越高''' 5 q = queue.PriorityQueue() 6 q.put((20,'a')) 7 q.put((10,'b')) #先出來的是b,數字越小優先級越高嘛 8 q.put((30,'c')) 9 print(q.get()) 10 print(q.get()) 11 print(q.get())
6、多線程性能測試
1.多核也就是多個CPU
(1)cpu越多,提升的是計算的性能
(2)若是程序是IO操做的時候(多核和單核是同樣的),再多的cpu也沒有意義。
2.實現併發
第一種:一個進程下,開多個線程
第二種:開多個進程
3.多進程:
優勢:能夠利用多核
缺點:開銷大
4.多線程
優勢:開銷小
缺點:不能夠利用多核
5多進程和多線程的應用場景
1.計算密集型:也就是計算多,IO少
若是是計算密集型,就用多進程(如金融分析等)
2.IO密集型:也就是IO多,計算少
若是是IO密集型的,就用多線程(通常遇到的都是IO密集型的)
下例子練習:
1 # 計算密集型的要開啓多進程 2 from multiprocessing import Process 3 from threading import Thread 4 import time 5 def work(): 6 res = 0 7 for i in range(10000000): 8 res+=i 9 if __name__ == '__main__': 10 l = [] 11 start = time.time() 12 for i in range(4): 13 p = Process(target=work) #1.9371106624603271 #能夠利用多核(也就是多個cpu) 14 # p = Thread(target=work) #3.0401737689971924 15 l.append(p) 16 p.start() 17 for p in l: 18 p.join() 19 stop = time.time() 20 print('%s'%(stop-start))
1 # I/O密集型要開啓多線程 2 from multiprocessing import Process 3 from threading import Thread 4 import time 5 def work(): 6 time.sleep(3) 7 if __name__ == '__main__': 8 l = [] 9 start = time.time() 10 for i in range(400): 11 # p = Process(target=work) #34.9549994468689 #由於開了好多進程,它的開銷大,花費的時間也就長了 12 p = Thread(target=work) #2.2151265144348145 #當開了多個線程的時候,它的開銷小,花費的時間也小了 13 l.append(p) 14 p.start() 15 for i in l : 16 i.join() 17 stop = time.time() 18 print('%s'%(stop-start))