目錄python
線程同步,線程間協同,經過某種技術,讓一個線程訪問某些數據時,其餘線程不能訪問這些數據,直到該線程完成對數據的操做後。不一樣的操做系統有多種實現方式。好比臨界區(Critical Section)、互斥鎖(Mutex)、信號量(Semaphore)、事件(Event)等。多線程
Event是線程間通信機制最簡單的實現,使用一個內部標記flag,主要提供了三個方法wait、clear、set,經過操做flag來控制線程的執行。app
Event對象在全局定義了一個'Flag',若是'Flag'值爲 False,那麼當程序執行 Event對象的wait方法時就會阻塞,若是'Flag'值爲True,那已經阻塞的wait方法會繼續執行。dom
在使用threading.Event 實現線程間通訊時:使用threading.Event可使一個線程等待其餘線程的通知,咱們把這個Event傳遞到線程對象中,Event默認內置了一個標誌,初始值爲False。一旦該線程經過wait()方法進入等待狀態,直到另外一個線程調用該Event的set()方法將內置標誌設置爲True時,該Event會通知全部等待狀態的線程恢復運行。性能
有下面代碼,大欣負責吃包子,廚師負責作包子,只有廚師作好了,大欣才能開始吃。ui
import time import random event = threading.Event() def consumer(): current = threading.current_thread() print('{} 等着吃包子...'.format(current.name)) event.wait() print('包子來了,我正在吃') for i in range(1,5): time.sleep(random.randrange(1,3)) print('{} 吃 包子-{}。'.format(current.name,i)) def chef(): current = threading.current_thread() print('{} 正在作包子'.format(current.name)) time.sleep(3) print('{} 包子作好了,下班'.format(current.name)) event.set() chef = threading.Thread(target=chef,name='大廚師') chef.start() consumer = threading.Thread(target=consumer,name='大欣') consumer.start()
運行consumer時,包子還沒作,因此只能等着,等chef作完了之後,設置了event爲True,這時consumer就開始吃了。wait還能夠指定等待時間,好比chef作的太慢了,consumer不吃了。操作系統
import time import random event = threading.Event() def consumer(): current = threading.current_thread() print('{} 等着吃包子...'.format(current.name)) if not event.wait(2): print('太慢了,不吃了') else: print('包子來了,我正在吃') for i in range(1,5): time.sleep(random.randrange(1,3)) print('{} 吃 包子-{}。'.format(current.name,i)) def chef(): current = threading.current_thread() print('{} 正在作包子'.format(current.name)) time.sleep(8) print('{} 包子作好了,下班'.format(current.name)) event.set() chef = threading.Thread(target=chef,name='大廚師') chef.start() consumer = threading.Thread(target=consumer,name='大欣') consumer.start()
當Event被set後,wait的返回值就是True,若是wait(2),在2秒內,Event沒有被set,那麼返回值是False。線程
因爲線程間的數據是共享的,當咱們多個線程操做一個相同的用戶的數據時,有可能形成混亂,以下例子:code
import threading import time n = 10 def work(): global n while n > 0: time.sleep(0.01) n -= 1 if __name__ == '__main__': t_l = [] for i in range(10): t = threading.Thread(target=work) t_l.append(t) t.start() for t in t_l: t.join() print(n)
咱們認爲結果應該是0,可是結果可能不如人意,由於進程間共享數據的問題,多個進程同時修改共享數據時,因爲GIL的存在同一時刻只有1個線程在運行。當n的值爲1的時候,10個子線程頗有可能同時判斷成功,再要修改的時候被掛起(時間片用完),等到真正回來修改的時候,n已經被其餘線程改過來!因此若是要保持數據的正確性,那麼就須要犧牲性能,即便用鎖機制。orm
import threading mutex = threading.Lock()
# 調用方式一 mutex.acquire() # 加鎖 '''code''' mutex.release() # 解鎖 # 調用方式二 with mutex: '''code''' # 離開wit代碼段,自動解鎖
名稱 | 含義 |
---|---|
acquire(blocking=True,timeout=-1) | 獲取鎖並加鎖 blocking:表示是否阻塞,默認爲True表示阻塞。 timeout:表示阻塞超時時間。 當blocking爲非阻塞時,timeout不能進行設置 |
release() | 釋放鎖,能夠從任何線程上調用釋放. 已上鎖,調用時會釋放鎖,即重置爲unlocked。 未上鎖,調用時會拋出RuntimeError異常 |
因此,上面的例子能夠有以下修改:
import threading import time mutex = threading.Lock() n = 10 def work(): global n while True: mutex.acquire() if n > 0: time.sleep(1) n -= 1 mutex.release() else: mutex.release() break if __name__ == '__main__': t_l = [] for i in range(10): t = threading.Thread(target=work) t_l.append(t) t.start() for t in t_l: t.join() print(n)
在判斷的時候就開始加鎖,在修改完畢的時候解鎖。這樣加鎖的狀況下咱們發現運行時間變長了,那是由於只有搶到鎖的線程才能夠工做(穿行執行),
有下面一個計數器類,來看如何加鎖
import threading class Counter: def __init__(self): self._value = 0 @property def value(self): return self._value def inc(self): self._value += 1 def dec(self): self._value -= 1 def calc(c:Counter): for _ in range(1000): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() lst = [] for i in range(10): t = threading.Thread(target=calc, args=(c,)) lst.append(t) t.start() for t in lst: t.join() print(c.value)
在須要調用和修改的地方加鎖,修改完畢後解鎖,是鎖使用的基本原則,通常來講,加鎖就要解鎖,可是加鎖和解鎖之間會有一些代碼要執行,若是出現異常,那麼鎖是沒法釋放的,可是當前線程已經終止了,這種狀況通常稱爲死鎖,能夠添加異常處理,來確保鎖必定被釋放。
import threading mutex = threading.Lock() class Counter: def __init__(self): self._value = 0 @property def value(self): return self._value def inc(self): try: # 添加異常處理,即使時崩潰也能夠釋放鎖 mutex.acquire() self._value += 1 finally: mutex.release() def dec(self): with mutex: # 上下文管理寫法 self._value -= 1 def calc(c:Counter): for _ in range(1000): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() lst = [] for i in range(10): t = threading.Thread(target=calc, args=(c,)) lst.append(t) t.start() for t in lst: t.join()
固然這裏也能夠爲每個計數器實例對象初始化一個本身的鎖,若是用全局鎖,那麼不一樣的計數器實例,會相互影響(由於多個實例,共享一把鎖),由於不一樣實例的結果是不一樣的,因此建議爲每一個實例構建一個本身的鎖。
class Counter: def __init__(self): self._value = 0 self._lock = threading.Lock() @property def value(self): return self._value def inc(self): try: self._lock.acquire() self._value += 1 finally: self._lock.release() def dec(self): with self._lock: self._value -= 1
當lock.acquire(False)時,該鎖就是非阻塞鎖了,調用時,僅獲取一次,若是獲取到那麼返回True,不然返回False。
import time import threading import random import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s %(thread)s %(message)s') def worker(tasks:list): for task in tasks: if task.lock.acquire(False): # 搶到任務 time.sleep(random.randrange(1,3)) # 模擬執行任務須要的時間 logging.info('{} I am working for {}'.format(threading.current_thread().name, task)) else: # 沒有搶到任務 logging.info('{} not get {}'.format(threading.current_thread().name,task)) class Task: def __init__(self,name): self.task = name self.lock = threading.Lock() def __repr__(self): return self.task __str__ = __repr__ if __name__ == '__main__': task_list = [Task('task{}'.format(x)) for x in range(1,11)] # 構建任務列表 for _ in range(10): threading.Thread(target=worker,args=(task_list,)).start() # 開啓多線程執行任務
10個任務,交給10個線程運行,誰搶到哪一個線程,誰就運行。
鎖適用於訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候。但若是都是讀取的話,就不須要了。使用鎖的時候有一下幾點須要特別注意: