線程同步,線程間協同,經過某種技術,讓一個線程訪問某些數據時,其餘線程不能訪問這些數據,直到該線程完成對數據的操做後。不一樣的操做系統有多種實現方式。好比臨界區(Critical Section)、互斥鎖(Mutex)、信號量(Semaphore)、事件(Event)等。多線程
Event對象在全局定義了一個'Flag',若是'Flag'值爲 False,那麼當程序執行 Event對象的wait方法時就會阻塞,若是'Flag'值爲True,那已經阻塞的wait方法會繼續執行。dom
在使用threading.Event 實現線程間通訊時:使用threading.Event可使一個線程等待其餘線程的通知,咱們把這個Event傳遞到線程對象中,Event默認內置了一個標誌,初始值爲False。一旦該線程經過wait()方法進入等待狀態,直到另外一個線程調用該Event的set()方法將內置標誌設置爲True時,該Event會通知全部等待狀態的線程恢復運行。性能
import time import random event = threading.Event() def consumer(): current = threading.current_thread() print('{} 等着吃包子...'.format(current.name)) event.wait() print('包子來了,我正在吃') for i in range(1,5): time.sleep(random.randrange(1,3)) print('{} 吃 包子-{}。'.format(current.name,i)) def chef(): current = threading.current_thread() print('{} 正在作包子'.format(current.name)) time.sleep(3) print('{} 包子作好了,下班'.format(current.name)) event.set() chef = threading.Thread(target=chef,name='大廚師') chef.start() consumer = threading.Thread(target=consumer,name='大欣') consumer.start()
import time import random event = threading.Event() def consumer(): current = threading.current_thread() print('{} 等着吃包子...'.format(current.name)) if not event.wait(2): print('太慢了,不吃了') else: print('包子來了,我正在吃') for i in range(1,5): time.sleep(random.randrange(1,3)) print('{} 吃 包子-{}。'.format(current.name,i)) def chef(): current = threading.current_thread() print('{} 正在作包子'.format(current.name)) time.sleep(8) print('{} 包子作好了,下班'.format(current.name)) event.set() chef = threading.Thread(target=chef,name='大廚師') chef.start() consumer = threading.Thread(target=consumer,name='大欣') consumer.start()
import threading import time n = 10 def work(): global n while n > 0: time.sleep(0.01) n -= 1 if __name__ == '__main__': t_l = [] for i in range(10): t = threading.Thread(target=work) t_l.append(t) t.start() for t in t_l: t.join() print(n)
import threading mutex = threading.Lock()
# 調用方式一 mutex.acquire() # 加鎖 '''code''' mutex.release() # 解鎖 # 調用方式二 with mutex: '''code''' # 離開wit代碼段,自動解鎖
名稱 | 含義 |
acquire(blocking=True,timeout=-1) | 獲取鎖並加鎖 blocking:表示是否阻塞,默認爲True表示阻塞。 timeout:表示阻塞超時時間。 當blocking爲非阻塞時,timeout不能進行設置 |
release() | 釋放鎖,能夠從任何線程上調用釋放. 已上鎖,調用時會釋放鎖,即重置爲unlocked。 未上鎖,調用時會拋出RuntimeError異常 |
import threading import time mutex = threading.Lock() n = 10 def work(): global n while True: mutex.acquire() if n > 0: time.sleep(1) n -= 1 mutex.release() else: mutex.release() break if __name__ == '__main__': t_l = [] for i in range(10): t = threading.Thread(target=work) t_l.append(t) t.start() for t in t_l: t.join() print(n)
import threading class Counter: def __init__(self): self._value = 0 @property def value(self): return self._value def inc(self): self._value += 1 def dec(self): self._value -= 1 def calc(c:Counter): for _ in range(1000): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() lst = [] for i in range(10): t = threading.Thread(target=calc, args=(c,)) lst.append(t) t.start() for t in lst: t.join() print(c.value)
import threading mutex = threading.Lock() class Counter: def __init__(self): self._value = 0 @property def value(self): return self._value def inc(self): try: # 添加異常處理,即使時崩潰也能夠釋放鎖 mutex.acquire() self._value += 1 finally: mutex.release() def dec(self): with mutex: # 上下文管理寫法 self._value -= 1 def calc(c:Counter): for _ in range(1000): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() lst = [] for i in range(10): t = threading.Thread(target=calc, args=(c,)) lst.append(t) t.start() for t in lst: t.join()
class Counter: def __init__(self): self._value = 0 self._lock = threading.Lock() @property def value(self): return self._value def inc(self): try: self._lock.acquire() self._value += 1 finally: self._lock.release() def dec(self): with self._lock: self._value -= 1
import time import threading import random import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s %(thread)s %(message)s') def worker(tasks:list): for task in tasks: if task.lock.acquire(False): # 搶到任務 time.sleep(random.randrange(1,3)) # 模擬執行任務須要的時間 logging.info('{} I am working for {}'.format(threading.current_thread().name, task)) else: # 沒有搶到任務 logging.info('{} not get {}'.format(threading.current_thread().name,task)) class Task: def __init__(self,name): self.task = name self.lock = threading.Lock() def __repr__(self): return self.task __str__ = __repr__ if __name__ == '__main__': task_list = [Task('task{}'.format(x)) for x in range(1,11)] # 構建任務列表 for _ in range(10): threading.Thread(target=worker,args=(task_list,)).start() # 開啓多線程執行任務