python併發編程之多線程2---(死鎖與遞歸鎖,信號量等)

1、死鎖現象與遞歸鎖

進程也是有死鎖的

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,python

它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程mysql

以下就是死鎖sql

 1 死鎖-------------------
 2 from  threading import Thread,Lock,RLock
 3 import time
 4 mutexA = Lock()
 5 mutexB = Lock()
 6 class MyThread(Thread):
 7     def run(self):
 8         self.f1()
 9         self.f2()
10     def f1(self):
11         mutexA.acquire()
12         print('\033[33m%s 拿到A鎖 '%self.name)
13         mutexB.acquire()
14         print('\033[45%s 拿到B鎖 '%self.name)
15         mutexB.release()
16         mutexA.release()
17     def f2(self):
18         mutexB.acquire()
19         print('\033[33%s 拿到B鎖 ' % self.name)
20         time.sleep(1)  #睡一秒就是爲了保證A鎖已經被別人那到了
21         mutexA.acquire()
22         print('\033[45m%s 拿到B鎖 ' % self.name)
23         mutexA.release()
24         mutexB.release()
25 if __name__ == '__main__':
26     for i in range(10):
27         t = MyThread()
28         t.start() #一開啓就會去調用run方法
死鎖現象

那麼怎麼解決死鎖現象呢?

解決方法,遞歸鎖:在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。數據庫

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。服務器

直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖多線程

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,

即counter遞減到0爲止 
 1 # 2.解決死鎖的方法--------------遞歸鎖
 2 from  threading import Thread,Lock,RLock
 3 import time
 4 mutexB = mutexA = RLock()
 5 class MyThread(Thread):
 6     def run(self):
 7         self.f1()
 8         self.f2()
 9     def f1(self):
10         mutexA.acquire()
11         print('\033[33m%s 拿到A鎖 '%self.name)
12         mutexB.acquire()
13         print('\033[45%s 拿到B鎖 '%self.name)
14         mutexB.release()
15         mutexA.release()
16     def f2(self):
17         mutexB.acquire()
18         print('\033[33%s 拿到B鎖 ' % self.name)
19         time.sleep(1)  #睡一秒就是爲了保證A鎖已經被別人拿到了
20         mutexA.acquire()
21         print('\033[45m%s 拿到B鎖 ' % self.name)
22         mutexA.release()
23         mutexB.release()
24 if __name__ == '__main__':
25     for i in range(10):
26         t = MyThread()
27         t.start() #一開啓就會去調用run方法
解決死鎖

2、信號量Semaphore(其實也是一把鎖)

Semaphore管理一個內置的計數器併發

Semaphore與進程池看起來相似,可是是徹底不一樣的概念。app

進程池:Pool(4),最大隻能產生四個進程,並且從頭至尾都只是這四個進程,不會產生新的。dom

信號量:信號量是產生的一堆進程/線程,即產生了多個任務都去搶那一把鎖ide

 1 from threading import Thread,Semaphore,currentThread
 2 import time,random
 3 sm = Semaphore(5) #運行的時候有5我的
 4 def task():
 5     sm.acquire()
 6     print('\033[42m %s上廁所'%currentThread().getName())
 7     time.sleep(random.randint(1,3))
 8     print('\033[31m %s上完廁所走了'%currentThread().getName())
 9     sm.release()
10 if __name__ == '__main__':
11     for i in range(20):  #開了10個線程 ,這20人都要上廁所
12         t = Thread(target=task)
13         t.start()
Semaphore舉例
 1 hread-1上廁所
 2  Thread-2上廁所
 3  Thread-3上廁所
 4  Thread-4上廁所
 5  Thread-5上廁所
 6  Thread-3上完廁所走了
 7  Thread-6上廁所
 8  Thread-1上完廁所走了
 9  Thread-7上廁所
10  Thread-2上完廁所走了
11  Thread-8上廁所
12  Thread-6上完廁所走了
13  Thread-5上完廁所走了
14  Thread-4上完廁所走了
15  Thread-9上廁所
16  Thread-10上廁所
17  Thread-11上廁所
18  Thread-9上完廁所走了
19  Thread-12上廁所
20  Thread-7上完廁所走了
21  Thread-13上廁所
22  Thread-10上完廁所走了
23  Thread-8上完廁所走了
24  Thread-14上廁所
25  Thread-15上廁所
26  Thread-12上完廁所走了
27  Thread-11上完廁所走了
28  Thread-16上廁所
29  Thread-17上廁所
30  Thread-14上完廁所走了
31  Thread-15上完廁所走了
32  Thread-17上完廁所走了
33  Thread-18上廁所
34  Thread-19上廁所
35  Thread-20上廁所
36  Thread-13上完廁所走了
37  Thread-20上完廁所走了
38  Thread-16上完廁所走了
39  Thread-18上完廁所走了
40  Thread-19上完廁所走了
運行結果

3、Event

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

?
1
2
3
4
5
from threading import Event
Event.isSet() #返回event的狀態值
Event.wait() #若是 event.isSet()==False將阻塞線程;
Event. set () #設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
Event.clear() #恢復

例如1.,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做

 1 #首先定義兩個函數,一個是鏈接數據庫
 2 # 一個是檢測數據庫
 3 from threading import Thread,Event,currentThread
 4 import time
 5 e = Event()
 6 def conn_mysql():
 7     '''連接數據庫'''
 8     count = 1
 9     while not e.is_set():  #當沒有檢測到時候
10         if count >3: #若是嘗試次數大於3,就主動拋異常
11             raise ConnectionError('嘗試連接的次數過多')
12         print('\033[45m%s 第%s次嘗試'%(currentThread(),count))
13         e.wait(timeout=1) #等待檢測(裏面的參數是超時1秒)
14         count+=1
15     print('\033[44m%s 開始連接...'%(currentThread().getName()))
16 def check_mysql():
17     '''檢測數據庫'''
18     print('\033[42m%s 檢測mysql...' % (currentThread().getName()))
19     time.sleep(5)
20     e.set()
21 if __name__ == '__main__':
22     for i  in range(3):  #三個去連接
23         t = Thread(target=conn_mysql)
24         t.start()
25     t = Thread(target=check_mysql)
26     t.start()
詳看

2.例如2,紅綠燈的例子

 1 from  threading import Thread,Event,currentThread
 2 import time
 3 e = Event()
 4 def traffic_lights():
 5     '''紅綠燈'''
 6     time.sleep(5)
 7     e.set()
 8 def car():
 9     ''''''
10     print('\033[42m %s 等綠燈\033[0m'%currentThread().getName())
11     e.wait()
12     print('\033[44m %s 車開始通行' % currentThread().getName())
13 if __name__ == '__main__':
14     for i in range(10):
15         t = Thread(target=car)  #10輛車
16         t.start()
17     traffic_thread = Thread(target=traffic_lights)  #一個紅綠燈
18     traffic_thread.start()
紅綠燈

4、定時器(Timer)

指定n秒後執行某操做

from threading import Timer
def func(n):
    print('hello,world',n)
t = Timer(3,func,args=(123,))  #等待三秒後執行func函數,由於func函數有參數,那就再傳一個參數進去
t.start()

5、線程queue

queue隊列 :使用import queue,用法與進程Queue同樣

queue.Queue(maxsize=0) #先進先出

1 # 1.隊列-----------
2 import queue
3 q = queue.Queue(3) #先進先出
4 q.put('first')
5 q.put('second')
6 q.put('third')
7 print(q.get())
8 print(q.get())
9 print(q.get())
View Code

queue.LifoQueue(maxsize=0)#先進後出

1 # 2.堆棧----------
2 q = queue.LifoQueue() #先進後出(或者後進先出)
3 q.put('first')
4 q.put('second')
5 q.put('third')
6 q.put('for')
7 print(q.get())
8 print(q.get())
9 print(q.get())
View Code

queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

 1 # ----------------
 2 '''3.put進入一個元組,元組的第一個元素是優先級
 3 (一般也能夠是數字,或者也能夠是非數字之間的比較)
 4 數字越小,優先級越高'''
 5 q = queue.PriorityQueue()
 6 q.put((20,'a'))
 7 q.put((10,'b'))  #先出來的是b,數字越小優先級越高嘛
 8 q.put((30,'c'))
 9 print(q.get())
10 print(q.get())
11 print(q.get())
View Code

6、多線程性能測試

1.多核也就是多個CPU
(1)cpu越多,提升的是計算的性能
(2)若是程序是IO操做的時候(多核和單核是同樣的),再多的cpu也沒有意義。
2.實現併發
第一種:一個進程下,開多個線程
第二種:開多個進程
3.多進程:
優勢:能夠利用多核
缺點:開銷大
4.多線程
優勢:開銷小
缺點:不能夠利用多核
5多進程和多線程的應用場景
1.計算密集型:也就是計算多,IO少
若是是計算密集型,就用多進程(如金融分析等)
2.IO密集型:也就是IO多,計算少
若是是IO密集型的,就用多線程(通常遇到的都是IO密集型的)
下例子練習:
 1 # 計算密集型的要開啓多進程
 2 from  multiprocessing import Process
 3 from threading import Thread
 4 import time
 5 def work():
 6     res = 0
 7     for i in range(10000000):
 8         res+=i
 9 if __name__ == '__main__':
10     l = []
11     start = time.time()
12     for i in range(4):
13         p = Process(target=work)  #1.9371106624603271  #能夠利用多核(也就是多個cpu)
14         # p  = Thread(target=work)  #3.0401737689971924
15         l.append(p)
16         p.start()
17     for p in l:
18         p.join()
19     stop = time.time()
20     print('%s'%(stop-start))
計算密集型
 1 # I/O密集型要開啓多線程
 2 from multiprocessing import Process
 3 from threading import Thread
 4 import time
 5 def work():
 6     time.sleep(3)
 7 if __name__ == '__main__':
 8     l = []
 9     start = time.time()
10     for i in range(400):
11         # p = Process(target=work)  #34.9549994468689   #由於開了好多進程,它的開銷大,花費的時間也就長了
12         p = Thread(target=work) #2.2151265144348145  #當開了多個線程的時候,它的開銷小,花費的時間也小了
13         l.append(p)
14         p.start()
15     for i in l :
16         i.join()
17     stop = time.time()
18     print('%s'%(stop-start))
I/O密集型
相關文章
相關標籤/搜索