線程同步,線程間協同,經過某種技術,讓一個線程訪問某些數據時,其餘線程不能訪問這個數據,直到該線程完成對數據的操做爲止。python
臨界區(critical section 全部碰到的都不能使用,等一個使用完成),互斥量(Mutex一個用一個不能用),信號量(semaphore),事件event數據庫
event 事件。是線程間通訊機制中最簡單的實現,使用一個內部標記的flag,經過flag的True或False的變化來進行操做。緩存
名稱 | 含義 |
---|---|
set() | 標記設置爲True,用於後面wait執行和is_set檢查 |
clear() | 標記設置爲False |
is_set() | 標記是否爲True |
wait(timeout=None) | 設置等待標記爲True的時長,None爲無限等待,等到返回爲True,未等到了超時返回爲False |
老闆僱傭了一個工人,讓他生產杯子,老闆一直等着工人。直到生成了10個杯子 安全
import logging import threading import time event=threading.Event() FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") def boss(event:threading.Event): logging.info("準備生產") event.wait() logging.info("生產完成") def woker(event:threading.Event,count:int=10): cups=[] while True: logging.info("開始生產杯子") if len(cups) >= count: event.set() break logging.info("生產了一個杯子") cups.append(1) time.sleep(0.5) logging.info("總共生產了:{} 個杯子".format(len(cups))) b=threading.Thread(target=boss,args=(event,),name='boss') w=threading.Thread(target=woker,args=(event,10),name='woker') b.start() w.start()
結果以下 多線程
import logging import threading import datetime event=threading.Event() FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") def do(event:threading.Event,interval:int): while not event.wait(interval): # 此處須要的結果是返回False或True logging.info('do sth.') e=threading.Event() start=datetime.datetime.now() threading.Thread(target=do,args=(e,3)).start() e.wait(10) e.set() print ("總體運行時間爲:{}".format((datetime.datetime.now()-start).total_seconds())) print ('main exit')
結果以下 併發
wait 優於sleep,wait 會主動讓出時間片,其餘線程能夠被調度,而sleep會佔用時間片不讓出。app
import logging import threading import datetime import time event=threading.Event() FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") def add(x:int,y:int): return x+y class Timer: def __init__(self,interval,fn,*args,**kwargs): self.interval=interval self.fn=fn self.args=args self.kwargs=kwargs self.event=threading.Event() def __run(self): start=datetime.datetime.now() logging.info('開始啓動步驟') event.wait(self.interval) #在此處等待此時間後返回爲False if not self.event.is_set(): # 此處返回爲False 爲正常 self.fn(*self.args,**self.kwargs) logging.info("函數執行成功,執行時間爲{}".format((datetime.datetime.now()-start).total_seconds())) def start(self): threading.Thread(target=self.__run()).start() def cancel(self): self.event.set() t=Timer(3,add,10,20) t.start()
結果以下 dom
使用同一個event對象標記flag
誰wait就是等待flag變爲True,或者等到超時返回False,不限制等待的個數。ide
lock: 鎖,凡是在共享資源爭搶的地方均可以使用,從而保證只有一個使用者能夠徹底使用這個資源。一旦線程獲取到鎖,其餘試圖獲取的鎖的線程將被阻塞。函數
名稱 | 含義 |
---|---|
acquire(blocking=True,timeout=1) | 默認阻塞,阻塞能夠設置超時時間,非阻塞時,timeout禁止設置,成功獲取鎖後,返回True,不然返回False |
release() | 釋放鎖,能夠從任何線程調用釋放。已上鎖的鎖,會被重置爲unlocked,未上鎖的鎖上調用,拋出RuntimeError異常 |
#!/usr/bin/poython3.6 #conding:utf-8 import threading import time lock=threading.Lock() # 實例化鎖對象 lock.acquire() # 加鎖處理,默認是阻塞,阻塞時間能夠設置,非阻塞時,timeout禁止設置,成功獲取鎖,返回True,不然返回False print ('get locker 1') lock.release() # 釋放鎖,能夠從任何線程調用釋放,已上鎖的鎖,會被重置爲unlocked未上鎖的鎖上調用,拋出RuntimeError異常。 print ('release Locker') lock.acquire() print ('get locker 2') lock.release() print ('release Locker') lock.acquire() print ('get locker 3') lock.acquire() # 此處未進行相關的釋放操做,所以其下面的代碼將不能被執行,其會一直阻塞 print ('get locker 4')
結果以下
#!/usr/bin/poython3.6 #conding:utf-8 import threading lock=threading.Lock() lock.acquire() print ('1') lock.release() print ('2') lock.release() # 此處二次調用釋放,致使的結果是拋出異常。 print ('3')
結果以下
鎖釋放後資源必定會出現爭搶狀況,鎖必定要支持上下文,不然全部的線程都將等待。
鎖的注意事項是最好不要出現死鎖的狀況。
解不開的鎖就是死鎖。
此處是沒有退出的狀況的
訂單要求生成100個杯子,組織10人生產
不加鎖的狀況下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") cups=[] def worker(task=100): flag=False while True: count = len(cups) logging.info(len(cups)) if count >= task: flag=True time.sleep(0.001) if not flag: cups.append(1) if flag: break logging.info("共製造{}個容器".format(len(cups))) for i in range(10): #此處起10個線程,表示10個工人 threading.Thread(target=worker,args=(100,),name="woker-{}".format(i)).start()
結果以下
import logging import threading FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") cups=[] def worker(task=100): while True: count = len(cups) logging.info(len(cups)) if count >= task: break cups.append(1) logging.info("{}".format(threading.current_thread().name)) logging.info("共製造{}個容器".format(len(cups))) for i in range(10): #此處起10個線程,表示10個工人 threading.Thread(target=worker,args=(100,),name="woker-{}".format(i)).start()
結果以下
使用上述方式會致使多線程數據同步產生問題,進而致使產生的數據不許確。
import logging import threading FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") cups=[] Lock=threading.Lock() def worker(lock:threading.Lock,task=100): while True: lock.acquire() count = len(cups) logging.info(len(cups)) if count >= task: break # 此處保證每一個線程執行完成會自動退出,不然會阻塞其餘線程的繼續執行 cups.append(1) lock.release() # 釋放鎖 logging.info("{}".format(threading.current_thread().name)) logging.info("共製造{}個容器".format(len(cups))) for i in range(10): #此處起10個線程,表示10個工人 threading.Thread(target=worker,args=(Lock,100,),name="woker-{}".format(i)).start()
結果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 def add(self): self.__x+=1 def sub(self): self.__x-=1 @property def value(self): return self.__x def run(c:Counter,count=100): # 此處的100是執行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=1000 c2=10 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value)
結果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 def add(self): self.__x+=1 def sub(self): self.__x-=1 @property def value(self): return self.__x def run(c:Counter,count=100): # 此處的100是執行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=10 c2=10000 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value) #此處可能在未執行完成就進行了打印操做,可能形成延遲問題。
結果以下
總結以下:
若是修改線程多少,則效果不明顯,由於其函數執行時長和CPU分配的時間片相差較大,所以在時間片的時間內,足夠完成相關的計算操做,但如果增長執行的循環次數,則可能會致使一個線程在一個時間片內未執行完成相關的計算,進而致使打印結果錯誤。
通常來講加鎖後還有一些代碼實現,在釋放鎖以前還可能拋出一些異常,一旦出現異常,鎖是沒法釋放的,可是當前線程可能由於這個異常被終止了,就會產生死鎖,可經過上下文對出現異常的鎖進行關閉操做。
1 使用try...finally語句保證鎖的釋放
2 with上下文管理,鎖對象支持上下文管理源碼以下:
其類中是支持enter和exit的,所以其是能夠經過上下文管理來進行相關的鎖關閉操做的。
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 self.__lock=threading.Lock() def add(self): try: self.__lock.acquire() self.__x+=1 finally: self.__lock.release() # 此處不論是否上述異常,此處都會執行 def sub(self): try: self.__lock.acquire() self.__x-=1 finally: self.__lock.release() @property def value(self): return self.__x def run(c:Counter,count=100): # 此處的100是執行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=10 c2=1000 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value)
結果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 self.__lock=threading.Lock() def add(self): try: self.__lock.acquire() self.__x+=1 finally: self.__lock.release() # 此處不論是否上述異常,此處都會執行 def sub(self): try: self.__lock.acquire() self.__x-=1 finally: self.__lock.release() @property def value(self): return self.__x def run(c:Counter,count=100): # 此處的100是執行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=100 c2=10 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value)
結果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 self.__lock=threading.Lock() def add(self): with self.__lock: self.__x+=1 def sub(self): with self.__lock: self.__x-=1 @property def value(self): return self.__x def run(c:Counter,count=100): # 此處的100是執行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=100 c2=10 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value)
結果以下
經過存活線程數進行判斷
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 self.__lock=threading.Lock() def add(self): with self.__lock: self.__x+=1 def sub(self): with self.__lock: self.__x-=1 @property def value(self): return self.__x def run(c:Counter,count=100): # 此處的100是執行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=10 c2=1000 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() while True: time.sleep(1) if threading.active_count()==1: print (threading.enumerate()) print (c.value) break else: print (threading.enumerate())
結果以下
不阻塞,timeout沒啥用,False表示不使用鎖
非阻塞鎖能提升效率,但可能致使數據不一致
#!/usr/bin/poython3.6 #conding:utf-8 import threading lock=threading.Lock() lock.acquire() print ('1') ret=lock.acquire(blocking=False) print (ret)
結果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") cups=[] lock=threading.Lock() def worker(lock:threading.Lock,task=100): while True: if lock.acquire(False): # 此處返回爲False,則表示未成功獲取到鎖 count=len(cups) logging.info(count) if count >=task: lock.release() break cups.append(1) lock.release() logging.info("{} make1 ".format(threading.current_thread().name)) logging.info("{}".format(len(cups))) for x in range(20): threading.Thread(target=worker,args=(lock,100)).start()
結果以下
鎖適用於訪問和修改同一個共享資源的時候,及就是讀取同一個資源的時候。
若是所有都是讀取同一個資源,則不須要鎖,由於讀取不會致使其改變,所以不必所用鎖的注意事項:
少用鎖,必要時用鎖,多線程訪問被鎖定的資源時,就成了穿行訪問,要麼排隊執行,要麼爭搶執行加鎖的時間越短越好,不須要就當即釋放鎖
必定要避免死鎖多線程運行模型(ATM機)
跟鎖無關的儘可能排列在後面,和鎖區分開
可重入鎖,是線程相關的鎖,線程A得到可重入鎖,並能夠屢次成功獲取,不會阻塞,最後在線程A 中作和acquire次數相同的release便可。
import threading rlock=threading.RLock() #初始化可重用鎖 rlock.acquire() #進行阻塞處理 print ('1') rlock.acquire() print ('2') rlock.acquire(False) # 此處設置爲非阻塞 print ('3') rlock.release() print ('4') rlock.release() print ('5') rlock.release() print ('6') rlock.release() # 此處表示不能釋放多餘的鎖,只能釋放和加入鎖相同次數 print ('7')
結果以下
不一樣線程對Rlock操做的結果
import threading rlock=threading.RLock() #初始化可重用鎖 def sub(lock:threading.RLock): lock.release() ret=rlock.acquire() print (ret) ret=rlock.acquire(timeout=5) print (ret) ret=rlock.acquire(False) print (ret) ret=rlock.acquire(False) print (ret) threading.Thread(target=sub,args=(rlock,)).start() # 此處是啓用另外一個線程來完成對上述的開啓的鎖的關閉,由於其是基於線程的, #所以其必須在該線程中進行相關的處理操做,而不是在另一個線程中進行解鎖操做
結果以下
跨線程的Rlock就沒用了,必須使用Lock,Rlock是線程級別的,其餘的鎖都是能夠在當前進程的另外一個線程中進行加鎖和解鎖的。
構造方法condition(lock=None),可傳入一個Lock或Rlock,默認是Rlock。其主要應用於生產者消費者模型,爲了解決生產者和消費者速度匹配的問題。
名稱 | 含義 |
---|---|
acquire(*args) | 獲取鎖 |
wait(self,timeout=None) | 等待或超時 |
notify(n=1) | 喚醒至少指定數目個數的等待的線程,沒有等待線程就沒有任何操做 |
notify_all() | 喚醒全部等待的線程 |
def __init__(self, lock=None): if lock is None: lock = RLock() # 此處默認使用的是Rlock self._lock = lock # Export the lock's acquire() and release() methods self.acquire = lock.acquire # 進行相關處理 self.release = lock.release # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque() def __enter__(self): # 此處定義了上下文管理的內容 return self._lock.__enter__() def __exit__(self, *args): # 關閉鎖操做 return self._lock.__exit__(*args) def __repr__(self): # 此處實現了可視化相關的操做 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
其內部存儲使用了_waiter 進行相關的處理,來對線程進行集中的放置操做。
def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self._waiters.append(waiter) # 此處使用此方式存儲鎖 saved_state = self._release_save() gotit = False try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() gotit = True else: if timeout > 0: gotit = waiter.acquire(True, timeout) else: gotit = waiter.acquire(False) return gotit finally: self._acquire_restore(saved_state) if not gotit: try: self._waiters.remove(waiter) except Value
喚醒一個release
def notify(self, n=1): if not self._is_owned(): # 此處是用於判斷是否有鎖 raise RuntimeError("cannot notify on un-acquired lock") all_waiters = self._waiters waiters_to_notify = _deque(_islice(all_waiters, n)) if not waiters_to_notify: return for waiter in waiters_to_notify: waiter.release() try: all_waiters.remove(waiter) except ValueError: pass
喚醒全部的所等待
def notify_all(self): """Wake up all threads waiting on this condition. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. """ self.notify(len(self._waiters)) notifyAll = notify_all
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() def produce(self):# 生產者 for i in range(10): data=random.randint(1,100) self.data=data # 產生數據 self.event.wait(1) #此處用於一秒產生一個數據 def custom(self): # 消費者,消費者可能有多個 while True: logging.info(self.data) # 獲取生產者生產的數據 self.event.wait(0.5) # 此處用於等待0.5s產生一個數據 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') c=threading.Thread(target=d.custom,name='custom') p.start() c.start()
此處會使得產生的數據只有一個,而消費者拿到的數據卻有兩份,此處是由消費者來控制其拿出的步驟的。
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() self.conition=threading.Condition() def produce(self):# 生產者 for i in range(10): data=random.randint(1,100) with self.conition: #此處用於先進行上鎖處理,而後最後釋放鎖 self.data=data # 產生數據 self.conition.notify_all() #通知,此處表示有等待線程就通知處理 self.event.wait(1) #此處用於一秒產生一個數據 def custom(self): # 消費者,消費者可能有多個 while True: with self.conition: self.conition.wait() # 此處用於等待notify產生的數據 logging.info(self.data) # 獲取生產者生產的數據 self.event.wait(0.5) # 此處用於等待0.5s產生一個數據 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') c=threading.Thread(target=d.custom,name='custom') p.start() c.start()
此處是由生產者產生數據,通知給消費者,而後消費者再進行拿取,
有時候可能須要多一點的消費者,來保證生產者無庫存
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() self.conition=threading.Condition() def produce(self):# 生產者 for i in range(10): data=random.randint(1,100) with self.conition: #此處用於先進行上鎖處理,而後最後釋放鎖 self.data=data # 產生數據 self.conition.notify_all() #通知,通知處理產生的數據 self.event.wait(1) #此處用於一秒產生一個數據 def custom(self): # 消費者,消費者可能有多個 while True: with self.conition: self.conition.wait() # 此處用於等待notify產生的數據 logging.info(self.data) # 獲取生產者生產的數據 self.event.wait(0.5) # 此處用於等待0.5s產生一個數據 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') c1=threading.Thread(target=d.custom,name='custom-1') c2=threading.Thread(target=d.custom,name='custom-2') p.start() c1.start() c2.start()
結果以下
由於此默認是基於線程的鎖,所以其產生另外一個消費者並不會影響當前消費者的操做,所以能夠拿到兩份生產獲得的數據。
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() self.conition=threading.Condition() def produce(self):# 生產者 for i in range(10): data=random.randint(1,100) with self.conition: #此處用於先進行上鎖處理,而後最後釋放鎖 self.data=data # 產生數據 self.conition.notify(2) #通知兩個線程來處理數據 self.event.wait(1) #此處用於一秒產生一個數據 def custom(self): # 消費者,消費者可能有多個 while True: with self.conition: self.conition.wait() # 此處用於等待notify產生的數據 logging.info(self.data) # 獲取生產者生產的數據 self.event.wait(0.5) # 此處用於等待0.5s產生一個數據 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') p.start() for i in range(5): # 此處用於配置5個消費者, threading.Thread(target=d.custom,name="c-{}".format(i)).start()
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() self.conition=threading.Condition() def produce(self):# 生產者 for i in range(10): data=random.randint(1,100) with self.conition: #此處用於先進行上鎖處理,而後最後釋放鎖 self.data=data # 產生數據 self.conition.notify(5) #通知所有線程來處理數據 self.event.wait(1) #此處用於一秒產生一個數據 def custom(self): # 消費者,消費者可能有多個 while True: with self.conition: self.conition.wait() # 此處用於等待notify產生的數據 logging.info(self.data) # 獲取生產者生產的數據 self.event.wait(0.5) # 此處用於等待0.5s產生一個數據 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') p.start() for i in range(5): # 此處用於配置5個消費者, threading.Thread(target=d.custom,name="c-{}".format(i)).start()
結果以下
注: 上述實例中。程序自己不是線程安全的,程序邏輯有不少瑕疵,可是能夠很好的幫助理解condition的使用,和生產者消費者模式
輪循太消耗CPU時間了
condition 用於生產者消費者模型中,解決生產者消費者速度匹配的問題
採用了通知機制,很是有效率
使用方式
使用condition,必須先acquire,用完了要release,由於內部使用了鎖,默認是Rlock,最好的方式使用with上下文。消費者wait,等待通知
生產者生產好消息,對消費者發送消息,可使用notifiy或者notify_all方法。
賽馬模式,並行初始化,多線程並行初始化
有人翻譯爲柵欄,有人稱爲屏障,能夠想象爲路障,道閘
python3.2 中引入的新功能
名稱 | 含義 |
---|---|
Barrier(parties,action=None,timeout=None) | 構建 barrier對象,指定參與方數目,timeout是wait方法未指定超時的默認值 |
n_waiting | 當前在屏障中等待的線程數 |
parties | 各方數,須要多少等待 |
wait(timeout=None) | 等待經過屏障,返回0到線程-1的整數,每一個線程返回不一樣,若是wait方法設置了超時,並超時發送,屏障將處於broken狀態 |
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def worker(barrier:threading.Barrier): logging.info("當前等待線程數量爲:{}".format(barrier.n_waiting)) # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是須要從新等待第一個等待的線程順序執行 try: bid=barrier.wait() # 此處只有3個線程都存在的狀況下才會直接執行下面的,不然會一直等待 logging.info("after barrier:{}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread().name)) barrier=threading.Barrier(parties=3) # 三個線程釋放一次 for x in range(3): # 此處表示產生3個線程 threading.Event().wait(2) threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()
結果以下
產生的線程不是等待線程的倍數
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def worker(barrier:threading.Barrier): logging.info("當前等待線程數量爲:{}".format(barrier.n_waiting)) # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是須要從新等待第一個等待的線程順序執行 try: bid=barrier.wait() # 此處只有3個線程都存在的狀況下才會直接執行下面的,不然會一直等待 logging.info("after barrier:{}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread().name)) barrier=threading.Barrier(parties=3) # 三個線程釋放一次 for x in range(4): # 此處表示產生4個線程,則會有一個一直等待 threading.Event().wait(2) threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()
其第4個線程會一直等待下去,直到有3個線程在等待的同時才進行下一步操做。
從運行結果來看,全部線程衝到了barrier前等待,直到parties的數目,屏障將會打開,全部線程中止等待,繼續執行
再有wait,屏障就就緒等待達到參數數目時再放行
參數 | 含義 |
---|---|
broken | 若是屏障處於打破狀態,則返回True |
abort() | 將屏障處於broken狀態,等待中的線程或調用等待方法的線程都會拋出BrokenbarrierError異常,直到reset方法來恢復屏障 |
reset() | 恢復屏障,從新開始攔截 |
import threading import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def worker(barrier:threading.Barrier): logging.info("當前等待線程數量爲:{}".format(barrier.n_waiting)) # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是須要從新等待第一個等待的線程順序執行 try: bid=barrier.wait() # 此處只有3個線程都存在的狀況下才會直接執行下面的,不然會一直等待 logging.info("after barrier:{}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread().name)) barrier=threading.Barrier(parties=3) # 三個線程釋放一次 for x in range(5): # 此處表示產生5個線程 threading.Event().wait(2) threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start() if x==4: barrier.abort() # 打破屏障,前三個沒問題,後兩個會致使屏障打破一塊兒走出
結果以下
import threading import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def worker(barrier:threading.Barrier): logging.info("當前等待線程數量爲:{}".format(barrier.n_waiting)) # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是須要從新等待第一個等待的線程順序執行 try: bid=barrier.wait() # 此處只有3個線程都存在的狀況下才會直接執行下面的,不然會一直等待 logging.info("after barrier:{}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread().name)) barrier=threading.Barrier(parties=3) # 三個線程釋放一次 for x in range(9): # 此處表示產生5個線程 if x==2: #此處第一個和第二個等到,等到了第三個直接打破,前兩個和第三個一塊兒都是打破輸出 barrier.abort() # 打破屏障,前三個沒問題,後兩個會致使屏障打破一塊兒走出 elif x==6: #x=6表示第7個,直到第6個,到第7個,第8個,第9個,恰好3個直接柵欄退出 barrier.reset() threading.Event().wait(2) threading.Thread(target=worker,args=(barrier,)).start()
結果以下
併發初始化
全部的線程都必須初始化完成後,才能繼續工做,例如運行加載數據,檢查,若是這些工做沒有完成,就開始運行,則不能正常工做
10個線程作10種不一樣的工做準備,每一個線程負責一種工做,只有這10個線程都完成後,才能繼續工做,先完成的要等待後完成的線程。
如 啓動了一個線程,須要先加載磁盤,緩存預熱,初始化連接池等工做,這些工做能夠齊頭並進,只不過只有都知足了,程序才能繼續向後執行,假設數據庫連接失敗,則初始化工做就會失敗,就要about,屏蔽broken,全部線程收到異常後直接退出。
和Lock 很像,信號量對象內部維護一個倒計數器,每一次acquire都會減1,當acquire方法發現計數爲0時就會阻塞請求的線程,直到其餘線程對信號量release後,計數大於0,恢復阻塞的線程。
名稱 | 含義 |
---|---|
Semaphore(value=1) | 構造方法,value小於0,拋出ValueError異常 |
acquire(blocking=True,timeout=None) | 獲取信號量,計數器減1,獲取成功返回爲True |
release() | 釋放信號量,計數器加1 |
semaphore 默認值是1,表示只能去一個後就等待着,其至關於初始化一個值。
計數器中的數字永遠不可能低於0
import threading import logging import time logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def woker(sem:threading.Semaphore): logging.info("in sub thread") logging.info(sem.acquire()) logging.info("sub thread over") s=threading.Semaphore(3) # 初始化3個信號量 logging.info(s.acquire()) # 取出三個信號量 logging.info(s.acquire()) logging.info(s.acquire()) threading.Thread(target=woker,args=(s,)).start() # 此處若再想取出,則不能成功,則會阻塞 print ('----------------------') logging.info(s.acquire(False)) #此處表示不阻塞 print ('+++++++++++++++++++++++') time.sleep(2) logging.info(s.acquire(timeout=3)) # 此處表示阻塞超時3秒後釋放 print ('%%%%%%%%%%%%%%%%%%%%%') s.release() # 此處用於對上述線程中的調用的函數中的內容進行處理
結果以下
都是針對同一個對象進行的處理
import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Name: def __init__(self,name): self.name=name class Pool: def __init__(self,count=3): self.count=count self.pool=[ Name("conn-{}".format(i)) for i in range(3)] # 初始化連接 def get_conn(self): if len(self.pool)>0: data=self.pool.pop() # 從尾部拿出來一個 logging.info(data) else: return None def return_conn(self,name:Name): # 此處添加一個 self.pool.append(name) pool=Pool(3) pool.get_conn() pool.get_conn() pool.get_conn() pool.return_conn(Name('aaa')) pool.get_conn()
結果以下
import logging import threading import random logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Name: def __init__(self,name): self.name=name class Pool: def __init__(self,count=3): self.count=count self.sem=threading.Semaphore(count) self.event=threading.Event() self.pool=[ Name("conn-{}".format(i)) for i in range(count)] def get_conn(self): self.sem.acquire() data=self.pool.pop() return data def return_conn(self,name:Name): # 此處添加一個 self.pool.append(name) self.sem.release() def woker(pool:Pool): conn=pool.get_conn() logging.info(conn) threading.Event().wait(random.randint(1,4)) pool.return_conn(conn) pool=Pool(3) for i in range(8): threading.Thread(target=woker,name="worker-{}".format(i),args=(pool,)).start()
結果以下
上述實例中,使用信號量解決資源有限的問題,若是池中有資源,請求者獲取資源時信號量減1,請求者只能等待,當使用者徹底歸資源後信號量加1,等待線程就能夠喚醒拿走資源。
有界信號量,不容許使用release超出初始值範圍,不然,拋出ValueError異常,這個用有界信號修改源代碼,保證若是多return_conn 就會拋出異常,保證了歸還連接拋出異常。
信號量一直release會一直向上加,其會將信號量和連接池都擴容了此處便產生了BoundedSemaphore
import logging import threading import random logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") s=threading.BoundedSemaphore(3) # 邊界 s.acquire() # 此處須要拿取,不然不能直接向其中加 print (1) s.release() print (2) s.release() print (3)
結果以下
應用以下
import logging import threading import time logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s ") class Conn: def __init__(self,name): self.name=name class Pool: def __init__(self,count=3): self.count=count # 初始化連接池 self.sema=threading.BoundedSemaphore(count) self.pool=[Conn("conn-{}".format(i)) for i in range(count)] # 初始化連接 def get_conn(self): self.sema.acquire() # 當拿取的時候,減一 data=self.pool.pop() # 從尾部拿出一個 print (data) def return_conn(self,conn:Conn): #此處返回一個鏈接池 self.pool.append(conn) # 必須保證其在拿的時候有 # 使用try 能夠進行處理,下面的必須執行,加成功了,下面的必定要成功的, self.sema.release() pool=Pool(3) con=Conn('a') conn=pool.get_conn() conn=pool.get_conn() conn=pool.get_conn()
結果以下
若是使用了信號量,仍是沒有用完
self.pool.append(conn)
self.sem.release()
一種極端的狀況下,計數器還差1就滿了,有3個線程A,B,C都執行了第一句,都沒有來得release,這時候輪到線程A release,正常的release,而後輪到線程C先release,必定出現問題,超界了,必定出現問題。
不少線程用完了信號量沒有獲取信號量的線程都會阻塞,沒有線程和歸還的線程爭搶,當append後才release,這時候才能等待的線程被喚醒,才能Pop,也就是沒有獲取信號量就不能pop,這是安全的。