線程的補充 1、鎖 1、數據安全問題 # 線程爲何要有鎖 # 線程之間的數據安全問題 : # += -= 賦值操做不安全 # 線程安全的數據類型有: # pop append 都是線程安全的 # 隊列也是數據安全的
# logging 例子:數據不安全
from threading import Thread n = 0 def func(): global n for i in range(150000): n -= 1 def func2(): global n for i in range(150000): n += 1 t_lst = [] for i in range(10): t2 = Thread(target=func2) t = Thread(target=func) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('--->',n) # --->-17665 所以在進行 += -=操做的時候應該加上鎖: from threading import Thread,Lock n = 0 def func(lock): global n for i in range(150000): lock.acquire() n -= 1 lock.release() def func2(lock): global n for i in range(150000): lock.acquire() n += 1 lock.release() lock = Lock() t_lst = [] for i in range(10): t2 = Thread(target=func2,args=(lock,)) t = Thread(target=func,args=(lock,)) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('--->',n) # --->0 2、死鎖問題(互斥鎖纔會出現死鎖Lock,遞歸鎖能夠快速解決死鎖問題RLock) 所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法運行下去。 此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。即:操做的時候,搶到一把鎖以後還要再去搶第二把鎖, 可是因爲一個線程搶到一把鎖,另外一個線程搶到了另外一把鎖,因此致使死鎖。 例如:(互斥鎖纔會出現死鎖) import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s拿到麪條了'%name) fork_lock.acquire() print('%s拿到叉子了' % name) print('%s吃麪' % name) time.sleep(0.3) fork_lock.release() print('%s放下叉子' % name) noodle_lock.release() print('%s放下面'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到麪條了'%name) print('%s吃麪'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面'%name) fork_lock.release() print('%s放下叉子' % name) name_list = ['小明','小紅'] name_list2 = ['小白','小黑'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start() 結果: 解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。 這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。 上面的例子若是使用RLock代替Lock,則不會發生死鎖: import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s拿到麪條了'%name) fork_lock.acquire() print('%s拿到叉子了' % name) print('%s吃麪' % name) time.sleep(0.3) fork_lock.release() print('%s放下叉子' % name) noodle_lock.release() print('%s放下面'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到麪條了'%name) print('%s吃麪'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面'%name) fork_lock.release() print('%s放下叉子' % name) name_list = ['小明','小紅'] name_list2 = ['小白','小黑'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start() 3、死鎖總結 互斥鎖:每釋放一把鑰匙,別人馬上就能夠搶 遞歸鎖:要等所有鑰匙釋放完了,別人才能去搶,其實是一串鑰匙,只有一我的能獲得(圖遞歸鎖) 遞歸鎖能夠解決互斥鎖的死鎖問題 互斥鎖 兩把鎖(一個門須要兩把鑰匙才能開,但可能出現一我的搶了一把鑰匙,另外一我的搶了另外一把鑰匙,致使你們都進不去) 多個線程搶 遞歸鎖 一把鎖(搶一串鑰匙,第一把進了一個門,另外一把鑰匙再進第二個門...而後出第二個門,再出第一個門,徹底出來後其餘人才能搶這個鑰匙串) 多個線程搶 遞歸鎖好很差? 遞歸鎖並非一個好的解決方案 死鎖現象的發生不是互斥鎖的問題 而是程序員的邏輯有問題致使的 遞歸鎖可以快速的解決死鎖問題 遞歸鎖 迅速恢復服務 遞歸鎖替換互斥鎖 在接下來的時間中慢慢把遞歸鎖替換成互斥鎖 可以完善代碼的邏輯 提升代碼的效率 多個線程之間,用完一個資源再用另一個資源 先釋放一個資源,再去獲取一個資源的鎖 2、信號量 跟進程的同樣 Semaphore管理一個內置的計數器, 每當調用acquire()時內置計數器-1; 調用release() 時內置計數器+1; 計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。 import time from threading import Semaphore,Thread def func(index,sem): sem.acquire() print(index) time.sleep(1) sem.release() sem = Semaphore(5) for i in range(10): Thread(target=func,args=(i,sem)).start() 3、事件 跟進程的同樣 線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。 爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在初始狀況下,Event對象中的信號標誌被設置爲False。 若是有線程等待一個Event對象, 而這個Event對象的標誌爲False,那麼這個線程將會被一直阻塞直至該標誌爲True。一個線程若是將一個Event對象的信號標誌設置爲True,它將喚醒全部等待這個Event對象的線程。 若是一個線程等待一個已經被設置爲True的Event對象,那麼它將忽略這個事件, 繼續執行。 event.wait() 信號爲False的時候就阻塞,直到事件內的信號變成True纔不阻塞 event.set() 把信號變成True event.clear() 把信號變成False event.is_set() 查看信號狀態是否爲True 例如:檢測數據庫鏈接 import time import random from threading import Thread,Event def check(e): print('開始檢測數據庫鏈接') time.sleep(random.randint(1,3)) # 檢測數據庫鏈接(這裏只是模擬,應該是真正檢測的) e.set() # 成功了 def connect(e): for i in range(3): # 嘗試三次去鏈接數據庫 e.wait(0.5) # 超時就向下繼續執行 if e.is_set(): # 若是True才顯示鏈接成功 print('數據庫鏈接成功') break else: print('嘗試鏈接數據庫%s次失敗' % (i + 1)) else: raise TimeoutError e = Event() Thread(target=connect,args=(e,)).start() Thread(target=check,args=(e,)).start() 4、條件 Python提供的Condition對象提供了對複雜線程同步問題的支持。Condition被稱爲條件變量,除了提供與Lock相似的acquire和release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,而後判斷一些條件。 若是條件不知足則wait;若是條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後會從新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。 就像排隊進火車站同樣 notify 控制流量 通知有多少人能夠經過了 wait 在門口等待的全部人 acquire wait 使用先後都須要加鎖 作的事情 release acquire notify 使用先後都須要加鎖 release 例子: from threading import Thread,Condition def func(con,index): print('%s在等待'%index) con.acquire() con.wait() print('%s do something'%index) con.release() con = Condition() for i in range(10): t = Thread(target=func,args=(con,i)) t.start() # con.acquire() # con.notify_all() # 讓全部的線程經過 # con.release() count = 10 while count > 0: num = int(input('>>>:')) con.acquire() con.notify(num) # 選擇讓多少個線程經過 count -= num con.release() 5、定時器 定時器,指定n秒後執行某個操做 from threading import Timer def func(): print('執行我啦') t = Timer(5,func) # 起一個定時器任務,設置多少秒後執行 t.start() print('主線程') 6、隊列 queue隊列 :使用import queue,用法與進程Queue同樣
注意:
在進程模塊multiprocessing中導入的隊列Queue是IPC進程之間通訊的,基於socket的,是在文件級別的
而queue模塊的Queue隊列是在內存級別的,不能用於IPC通訊
1、先進先出(普通的隊列) from queue import Queue q = Queue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) 結果: 1 2 3 2、後進先出(棧) from queue import LifoQueue lq = LifoQueue() lq.put(1) lq.put(2) lq.put(3) print(lq.get()) print(lq.get()) print(lq.get()) 結果: 3 2 1 3、優先級隊列 數字越小優先級越高 字母ASCII越小優先級越高 注意:不能數字和字母分別put from queue import PriorityQueue pq = PriorityQueue() pq.put(10) pq.put(5) pq.put(6) print(pq.get()) print(pq.get()) print(pq.get()) 結果: 5 6 10 from queue import PriorityQueue pq = PriorityQueue() pq.put('a') pq.put('A') pq.put('b') print(pq.get()) print(pq.get()) print(pq.get()) 結果: A a b
from queue import PriorityQueue pq = PriorityQueue() pq.put((15,'abc')) # 先按元組第一個元素比較,若相同則按第二個元素比較 pq.put((5,'ghi')) pq.put((12,'def')) pq.put((12,'aaa')) print(pq.get()) print(pq.get()) print(pq.get()) 結果: (5, 'ghi') (12, 'aaa') (12, 'def')