python線程同步

一 概念

線程同步,線程間協同,經過某種技術,讓一個線程訪問某些數據時,其餘線程不能訪問這個數據,直到該線程完成對數據的操做爲止。python


臨界區(critical section 全部碰到的都不能使用,等一個使用完成),互斥量(Mutex一個用一個不能用),信號量(semaphore),事件event數據庫

二 event

1 概念

event 事件。是線程間通訊機制中最簡單的實現,使用一個內部標記的flag,經過flag的True或False的變化來進行操做。緩存

2 參數詳解

名稱 含義
set() 標記設置爲True,用於後面wait執行和is_set檢查
clear() 標記設置爲False
is_set() 標記是否爲True
wait(timeout=None) 設置等待標記爲True的時長,None爲無限等待,等到返回爲True,未等到了超時返回爲False

3 相關實例

老闆僱傭了一個工人,讓他生產杯子,老闆一直等着工人。直到生成了10個杯子 安全

import logging
import  threading
import  time
event=threading.Event()
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
def  boss(event:threading.Event):
    logging.info("準備生產")
    event.wait()
    logging.info("生產完成")

def  woker(event:threading.Event,count:int=10):
    cups=[]
    while  True:
        logging.info("開始生產杯子")
        if len(cups) >= count:
            event.set()
            break
        logging.info("生產了一個杯子")
        cups.append(1)
        time.sleep(0.5)
    logging.info("總共生產了:{} 個杯子".format(len(cups)))
b=threading.Thread(target=boss,args=(event,),name='boss')
w=threading.Thread(target=woker,args=(event,10),name='woker')
b.start()
w.start()

結果以下 多線程

python線程同步

4 wait 使用

import logging
import  threading
import  datetime

event=threading.Event()
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
def  do(event:threading.Event,interval:int):
    while not  event.wait(interval):  # 此處須要的結果是返回False或True
        logging.info('do sth.')

e=threading.Event()

start=datetime.datetime.now()
threading.Thread(target=do,args=(e,3)).start()
e.wait(10)
e.set()
print ("總體運行時間爲:{}".format((datetime.datetime.now()-start).total_seconds()))
print ('main exit')

結果以下 併發

python線程同步

wait 優於sleep,wait 會主動讓出時間片,其餘線程能夠被調度,而sleep會佔用時間片不讓出。app

5 Timer 實現

import logging
import  threading
import  datetime
import  time
event=threading.Event()
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
def add(x:int,y:int):
    return x+y
class  Timer:
    def __init__(self,interval,fn,*args,**kwargs):
        self.interval=interval
        self.fn=fn
        self.args=args
        self.kwargs=kwargs
        self.event=threading.Event()

    def  __run(self):
        start=datetime.datetime.now()
        logging.info('開始啓動步驟')
        event.wait(self.interval)  #在此處等待此時間後返回爲False
        if not  self.event.is_set(): # 此處返回爲False 爲正常
            self.fn(*self.args,**self.kwargs)
        logging.info("函數執行成功,執行時間爲{}".format((datetime.datetime.now()-start).total_seconds()))

    def start(self):
        threading.Thread(target=self.__run()).start()
    def  cancel(self):
        self.event.set()  
t=Timer(3,add,10,20)
t.start()

結果以下 dom

python線程同步

6 總結:

使用同一個event對象標記flag
誰wait就是等待flag變爲True,或者等到超時返回False,不限制等待的個數。ide

三 線程同步之lock

1 簡介

lock: 鎖,凡是在共享資源爭搶的地方均可以使用,從而保證只有一個使用者能夠徹底使用這個資源。一旦線程獲取到鎖,其餘試圖獲取的鎖的線程將被阻塞。函數

2 參數詳解

名稱 含義
acquire(blocking=True,timeout=1) 默認阻塞,阻塞能夠設置超時時間,非阻塞時,timeout禁止設置,成功獲取鎖後,返回True,不然返回False
release() 釋放鎖,能夠從任何線程調用釋放。已上鎖的鎖,會被重置爲unlocked,未上鎖的鎖上調用,拋出RuntimeError異常

3 示例講解

1 阻塞相關性質

#!/usr/bin/poython3.6
#conding:utf-8
import  threading
import  time
lock=threading.Lock()  # 實例化鎖對象
lock.acquire()  # 加鎖處理,默認是阻塞,阻塞時間能夠設置,非阻塞時,timeout禁止設置,成功獲取鎖,返回True,不然返回False
print ('get locker 1')
lock.release() # 釋放鎖,能夠從任何線程調用釋放,已上鎖的鎖,會被重置爲unlocked未上鎖的鎖上調用,拋出RuntimeError異常。
print ('release  Locker')
lock.acquire()
print ('get locker 2')
lock.release()
print ('release  Locker')
lock.acquire()
print ('get locker 3')
lock.acquire()  # 此處未進行相關的釋放操做,所以其下面的代碼將不能被執行,其會一直阻塞
print ('get locker 4')

結果以下

python線程同步

#!/usr/bin/poython3.6
#conding:utf-8
import  threading
lock=threading.Lock()
lock.acquire()
print ('1')
lock.release()
print ('2')
lock.release()  # 此處二次調用釋放,致使的結果是拋出異常。
print ('3')

結果以下

python線程同步

2 阻塞總結

鎖釋放後資源必定會出現爭搶狀況,鎖必定要支持上下文,不然全部的線程都將等待。
鎖的注意事項是最好不要出現死鎖的狀況。
解不開的鎖就是死鎖。
此處是沒有退出的狀況的

4 實例

1 題目

訂單要求生成100個杯子,組織10人生產
不加鎖的狀況下

2 經過flag 來進行相關的控制操做

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
cups=[]
def worker(task=100):
    flag=False
    while True:
        count = len(cups)
        logging.info(len(cups))
        if count >= task:
            flag=True
        time.sleep(0.001)
        if not  flag:
            cups.append(1)
        if flag:
            break
    logging.info("共製造{}個容器".format(len(cups)))

for  i in range(10):  #此處起10個線程,表示10個工人
    threading.Thread(target=worker,args=(100,),name="woker-{}".format(i)).start()

結果以下

python線程同步

3 經過直接判斷的方式進行處理

import logging
import  threading
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
cups=[]
def worker(task=100):
    while True:
        count = len(cups)
        logging.info(len(cups))
        if count >= task:
            break
        cups.append(1)
        logging.info("{}".format(threading.current_thread().name))
    logging.info("共製造{}個容器".format(len(cups)))

for  i in range(10):  #此處起10個線程,表示10個工人 
    threading.Thread(target=worker,args=(100,),name="woker-{}".format(i)).start()

結果以下

python線程同步

使用上述方式會致使多線程數據同步產生問題,進而致使產生的數據不許確。

4 加鎖的狀況處理

import logging
import  threading
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
cups=[]
Lock=threading.Lock()
def worker(lock:threading.Lock,task=100):
    while True:
        lock.acquire()
        count = len(cups)
        logging.info(len(cups))
        if count >= task:
            break # 此處保證每一個線程執行完成會自動退出,不然會阻塞其餘線程的繼續執行
        cups.append(1)
        lock.release() # 釋放鎖
        logging.info("{}".format(threading.current_thread().name))
    logging.info("共製造{}個容器".format(len(cups)))

for  i in range(10):  #此處起10個線程,表示10個工人
    threading.Thread(target=worker,args=(Lock,100,),name="woker-{}".format(i)).start()

結果以下

python線程同步

5 線程換和CPU時間片

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
    def add(self):
        self.__x+=1
    def sub(self):
        self.__x-=1

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=1000
c2=10
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value)

結果以下

python線程同步

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
    def add(self):
        self.__x+=1
    def sub(self):
        self.__x-=1

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=10
c2=10000
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value) #此處可能在未執行完成就進行了打印操做,可能形成延遲問題。

結果以下

python線程同步

總結以下:
若是修改線程多少,則效果不明顯,由於其函數執行時長和CPU分配的時間片相差較大,所以在時間片的時間內,足夠完成相關的計算操做,但如果增長執行的循環次數,則可能會致使一個線程在一個時間片內未執行完成相關的計算,進而致使打印結果錯誤。

5 加鎖和解鎖:

1 加鎖的必要性

通常來講加鎖後還有一些代碼實現,在釋放鎖以前還可能拋出一些異常,一旦出現異常,鎖是沒法釋放的,可是當前線程可能由於這個異常被終止了,就會產生死鎖,可經過上下文對出現異常的鎖進行關閉操做。

2 加鎖,解鎖經常使用語句

1 使用try...finally語句保證鎖的釋放
2 with上下文管理,鎖對象支持上下文管理

源碼以下:
其類中是支持enter和exit的,所以其是能夠經過上下文管理來進行相關的鎖關閉操做的。

python線程同步

3 使用try..finally 處理

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
        self.__lock=threading.Lock()
    def add(self):
        try:
            self.__lock.acquire()
            self.__x+=1
        finally:
            self.__lock.release()  # 此處不論是否上述異常,此處都會執行
    def sub(self):
        try:
            self.__lock.acquire()
            self.__x-=1
        finally:
            self.__lock.release()

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=10
c2=1000
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value)

結果以下

python線程同步

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
        self.__lock=threading.Lock()
    def add(self):
        try:
            self.__lock.acquire()
            self.__x+=1
        finally:
            self.__lock.release()  # 此處不論是否上述異常,此處都會執行
    def sub(self):
        try:
            self.__lock.acquire()
            self.__x-=1
        finally:
            self.__lock.release()

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=100
c2=10
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value)

結果以下

python線程同步

4 使用with上下文管理方式處理

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
        self.__lock=threading.Lock()
    def add(self):
        with self.__lock:
            self.__x+=1
    def sub(self):
        with  self.__lock:
            self.__x-=1

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=100
c2=10
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value)

結果以下

python線程同步

5 處理執行結果

經過存活線程數進行判斷

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
        self.__lock=threading.Lock()
    def add(self):
        with self.__lock:
            self.__x+=1
    def sub(self):
        with  self.__lock:
            self.__x-=1

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=10
c2=1000
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
while  True:
    time.sleep(1)
    if threading.active_count()==1:
        print (threading.enumerate())
        print (c.value)
        break
    else:
        print (threading.enumerate())

結果以下
python線程同步

5 非阻塞鎖使用

1 簡介

不阻塞,timeout沒啥用,False表示不使用鎖

非阻塞鎖能提升效率,但可能致使數據不一致

2 示例

#!/usr/bin/poython3.6
#conding:utf-8
import  threading
lock=threading.Lock()
lock.acquire()
print ('1')
ret=lock.acquire(blocking=False)
print (ret)

結果以下

python線程同步

3 相關實例

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
cups=[]
lock=threading.Lock()
def worker(lock:threading.Lock,task=100):
    while True:
        if lock.acquire(False): # 此處返回爲False,則表示未成功獲取到鎖
            count=len(cups)
            logging.info(count)
            if count >=task:
                lock.release()
                break
            cups.append(1)
            lock.release()
            logging.info("{} make1 ".format(threading.current_thread().name))
    logging.info("{}".format(len(cups)))
for x in range(20):
    threading.Thread(target=worker,args=(lock,100)).start()

結果以下

python線程同步

6 鎖的應用場景

鎖適用於訪問和修改同一個共享資源的時候,及就是讀取同一個資源的時候。
若是所有都是讀取同一個資源,則不須要鎖,由於讀取不會致使其改變,所以不必

所用鎖的注意事項:
少用鎖,必要時用鎖,多線程訪問被鎖定的資源時,就成了穿行訪問,要麼排隊執行,要麼爭搶執行

加鎖的時間越短越好,不須要就當即釋放鎖
必定要避免死鎖

多線程運行模型(ATM機)
跟鎖無關的儘可能排列在後面,和鎖區分開

四 線程同步之Rlock

1 簡介

可重入鎖,是線程相關的鎖,線程A得到可重入鎖,並能夠屢次成功獲取,不會阻塞,最後在線程A 中作和acquire次數相同的release便可。

2 相關屬性

import  threading
rlock=threading.RLock()  #初始化可重用鎖

rlock.acquire()  #進行阻塞處理
print  ('1')
rlock.acquire()
print  ('2')
rlock.acquire(False)  # 此處設置爲非阻塞
print  ('3')
rlock.release()
print  ('4')
rlock.release()
print  ('5')
rlock.release()
print  ('6')
rlock.release()  # 此處表示不能釋放多餘的鎖,只能釋放和加入鎖相同次數
print  ('7')

結果以下

python線程同步

不一樣線程對Rlock操做的結果

import  threading
rlock=threading.RLock()  #初始化可重用鎖
def sub(lock:threading.RLock):
    lock.release()
ret=rlock.acquire()
print (ret)
ret=rlock.acquire(timeout=5)
print (ret)
ret=rlock.acquire(False)
print (ret)
ret=rlock.acquire(False)
print (ret)

threading.Thread(target=sub,args=(rlock,)).start() # 此處是啓用另外一個線程來完成對上述的開啓的鎖的關閉,由於其是基於線程的,
#所以其必須在該線程中進行相關的處理操做,而不是在另一個線程中進行解鎖操做

結果以下

python線程同步

3 總結:

跨線程的Rlock就沒用了,必須使用Lock,Rlock是線程級別的,其餘的鎖都是能夠在當前進程的另外一個線程中進行加鎖和解鎖的。

五 線程同步之condition

1 簡介

構造方法condition(lock=None),可傳入一個Lock或Rlock,默認是Rlock。其主要應用於生產者消費者模型,爲了解決生產者和消費者速度匹配的問題。

2 相關參數解析及相關源碼

名稱 含義
acquire(*args) 獲取鎖
wait(self,timeout=None) 等待或超時
notify(n=1) 喚醒至少指定數目個數的等待的線程,沒有等待線程就沒有任何操做
notify_all() 喚醒全部等待的線程

3 相關源碼

def __init__(self, lock=None):
        if lock is None:
            lock = RLock()  # 此處默認使用的是Rlock
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire  # 進行相關處理
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()

    def __enter__(self):  # 此處定義了上下文管理的內容
        return self._lock.__enter__()

    def __exit__(self, *args): # 關閉鎖操做
        return self._lock.__exit__(*args)

    def __repr__(self):  # 此處實現了可視化相關的操做
        return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))

其內部存儲使用了_waiter 進行相關的處理,來對線程進行集中的放置操做。

def wait(self, timeout=None):

        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)  # 此處使用此方式存儲鎖
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except Value

喚醒一個release

def notify(self, n=1):
        if not self._is_owned():  # 此處是用於判斷是否有鎖
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

喚醒全部的所等待

def notify_all(self):
        """Wake up all threads waiting on this condition.

        If the calling thread has not acquired the lock when this method
        is called, a RuntimeError is raised.

        """
        self.notify(len(self._waiters))

    notifyAll = notify_all

4 實現方式:

1 經過event進行相關處理

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            self.data=data  # 產生數據
            self.event.wait(1)  #此處用於一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用於等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
c=threading.Thread(target=d.custom,name='custom')
p.start()

c.start()

python線程同步

此處會使得產生的數據只有一個,而消費者拿到的數據卻有兩份,此處是由消費者來控制其拿出的步驟的。

2 使用Condition 處理方式

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()
        self.conition=threading.Condition()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            with  self.conition:  #此處用於先進行上鎖處理,而後最後釋放鎖
                self.data=data  # 產生數據
                self.conition.notify_all()  #通知,此處表示有等待線程就通知處理
            self.event.wait(1)  #此處用於一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            with  self.conition:
                self.conition.wait()  # 此處用於等待notify產生的數據
                logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用於等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
c=threading.Thread(target=d.custom,name='custom')
p.start()

c.start()

python線程同步

此處是由生產者產生數據,通知給消費者,而後消費者再進行拿取,

有時候可能須要多一點的消費者,來保證生產者無庫存

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()
        self.conition=threading.Condition()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            with  self.conition:  #此處用於先進行上鎖處理,而後最後釋放鎖
                self.data=data  # 產生數據
                self.conition.notify_all()  #通知,通知處理產生的數據
            self.event.wait(1)  #此處用於一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            with  self.conition:
                self.conition.wait()  # 此處用於等待notify產生的數據
                logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用於等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
c1=threading.Thread(target=d.custom,name='custom-1')
c2=threading.Thread(target=d.custom,name='custom-2')

p.start()

c1.start()
c2.start()

結果以下

python線程同步

由於此默認是基於線程的鎖,所以其產生另外一個消費者並不會影響當前消費者的操做,所以能夠拿到兩份生產獲得的數據。

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()
        self.conition=threading.Condition()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            with  self.conition:  #此處用於先進行上鎖處理,而後最後釋放鎖
                self.data=data  # 產生數據
                self.conition.notify(2)  #通知兩個線程來處理數據
            self.event.wait(1)  #此處用於一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            with  self.conition:
                self.conition.wait()  # 此處用於等待notify產生的數據
                logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用於等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
p.start()
for  i in range(5):  # 此處用於配置5個消費者,
    threading.Thread(target=d.custom,name="c-{}".format(i)).start()

python線程同步

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()
        self.conition=threading.Condition()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            with  self.conition:  #此處用於先進行上鎖處理,而後最後釋放鎖
                self.data=data  # 產生數據
                self.conition.notify(5)  #通知所有線程來處理數據
            self.event.wait(1)  #此處用於一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            with  self.conition:
                self.conition.wait()  # 此處用於等待notify產生的數據
                logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用於等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
p.start()
for  i in range(5):  # 此處用於配置5個消費者,
    threading.Thread(target=d.custom,name="c-{}".format(i)).start()

結果以下

python線程同步

注: 上述實例中。程序自己不是線程安全的,程序邏輯有不少瑕疵,可是能夠很好的幫助理解condition的使用,和生產者消費者模式

輪循太消耗CPU時間了

5 Condition 總結

condition 用於生產者消費者模型中,解決生產者消費者速度匹配的問題
採用了通知機制,很是有效率


使用方式
使用condition,必須先acquire,用完了要release,由於內部使用了鎖,默認是Rlock,最好的方式使用with上下文。

消費者wait,等待通知


生產者生產好消息,對消費者發送消息,可使用notifiy或者notify_all方法。

六 線程同步之 barrier

1 簡介

賽馬模式,並行初始化,多線程並行初始化
有人翻譯爲柵欄,有人稱爲屏障,能夠想象爲路障,道閘
python3.2 中引入的新功能

2 相關參數詳解

名稱 含義
Barrier(parties,action=None,timeout=None) 構建 barrier對象,指定參與方數目,timeout是wait方法未指定超時的默認值
n_waiting 當前在屏障中等待的線程數
parties 各方數,須要多少等待
wait(timeout=None) 等待經過屏障,返回0到線程-1的整數,每一個線程返回不一樣,若是wait方法設置了超時,並超時發送,屏障將處於broken狀態

3 相關參數詳解

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def worker(barrier:threading.Barrier):
    logging.info("當前等待線程數量爲:{}".format(barrier.n_waiting))
    # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是須要從新等待第一個等待的線程順序執行
    try:
        bid=barrier.wait() # 此處只有3個線程都存在的狀況下才會直接執行下面的,不然會一直等待
        logging.info("after  barrier:{}".format(bid))
    except  threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread().name))

barrier=threading.Barrier(parties=3)  # 三個線程釋放一次

for x  in range(3):  # 此處表示產生3個線程
    threading.Event().wait(2)
    threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()

結果以下

python線程同步

產生的線程不是等待線程的倍數

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def worker(barrier:threading.Barrier):
    logging.info("當前等待線程數量爲:{}".format(barrier.n_waiting))
    # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是須要從新等待第一個等待的線程順序執行
    try:
        bid=barrier.wait() # 此處只有3個線程都存在的狀況下才會直接執行下面的,不然會一直等待
        logging.info("after  barrier:{}".format(bid))
    except  threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread().name))

barrier=threading.Barrier(parties=3)  # 三個線程釋放一次

for x  in range(4):  # 此處表示產生4個線程,則會有一個一直等待
    threading.Event().wait(2)
    threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()

python線程同步

其第4個線程會一直等待下去,直到有3個線程在等待的同時才進行下一步操做。

從運行結果來看,全部線程衝到了barrier前等待,直到parties的數目,屏障將會打開,全部線程中止等待,繼續執行
再有wait,屏障就就緒等待達到參數數目時再放行

4 barrier 實例的相關屬性

參數 含義
broken 若是屏障處於打破狀態,則返回True
abort() 將屏障處於broken狀態,等待中的線程或調用等待方法的線程都會拋出BrokenbarrierError異常,直到reset方法來恢復屏障
reset() 恢復屏障,從新開始攔截
import  threading
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def worker(barrier:threading.Barrier):
    logging.info("當前等待線程數量爲:{}".format(barrier.n_waiting))
    # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是須要從新等待第一個等待的線程順序執行
    try:
        bid=barrier.wait() # 此處只有3個線程都存在的狀況下才會直接執行下面的,不然會一直等待
        logging.info("after  barrier:{}".format(bid))
    except  threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread().name))

barrier=threading.Barrier(parties=3)  # 三個線程釋放一次

for x  in range(5):  # 此處表示產生5個線程
    threading.Event().wait(2)
    threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()
    if x==4:
        barrier.abort()  # 打破屏障,前三個沒問題,後兩個會致使屏障打破一塊兒走出

結果以下

python線程同步

import  threading
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def worker(barrier:threading.Barrier):
    logging.info("當前等待線程數量爲:{}".format(barrier.n_waiting))
    # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是須要從新等待第一個等待的線程順序執行
    try:
        bid=barrier.wait() # 此處只有3個線程都存在的狀況下才會直接執行下面的,不然會一直等待
        logging.info("after  barrier:{}".format(bid))
    except  threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread().name))

barrier=threading.Barrier(parties=3)  # 三個線程釋放一次

for x  in range(9):  # 此處表示產生5個線程
    if x==2:  #此處第一個和第二個等到,等到了第三個直接打破,前兩個和第三個一塊兒都是打破輸出
        barrier.abort()  # 打破屏障,前三個沒問題,後兩個會致使屏障打破一塊兒走出
    elif x==6:  #x=6表示第7個,直到第6個,到第7個,第8個,第9個,恰好3個直接柵欄退出
        barrier.reset()
    threading.Event().wait(2)
    threading.Thread(target=worker,args=(barrier,)).start()

結果以下

python線程同步

5 barrier 應用

併發初始化
全部的線程都必須初始化完成後,才能繼續工做,例如運行加載數據,檢查,若是這些工做沒有完成,就開始運行,則不能正常工做


10個線程作10種不一樣的工做準備,每一個線程負責一種工做,只有這10個線程都完成後,才能繼續工做,先完成的要等待後完成的線程。


如 啓動了一個線程,須要先加載磁盤,緩存預熱,初始化連接池等工做,這些工做能夠齊頭並進,只不過只有都知足了,程序才能繼續向後執行,假設數據庫連接失敗,則初始化工做就會失敗,就要about,屏蔽broken,全部線程收到異常後直接退出。

七 semaphore 信號量

1 簡介

和Lock 很像,信號量對象內部維護一個倒計數器,每一次acquire都會減1,當acquire方法發現計數爲0時就會阻塞請求的線程,直到其餘線程對信號量release後,計數大於0,恢復阻塞的線程。

2 參數詳解

名稱 含義
Semaphore(value=1) 構造方法,value小於0,拋出ValueError異常
acquire(blocking=True,timeout=None) 獲取信號量,計數器減1,獲取成功返回爲True
release() 釋放信號量,計數器加1

semaphore 默認值是1,表示只能去一個後就等待着,其至關於初始化一個值。
計數器中的數字永遠不可能低於0

import  threading
import  logging
import  time
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def  woker(sem:threading.Semaphore):
    logging.info("in sub  thread")
    logging.info(sem.acquire())
    logging.info("sub  thread  over")

s=threading.Semaphore(3)  # 初始化3個信號量

logging.info(s.acquire())  # 取出三個信號量
logging.info(s.acquire())
logging.info(s.acquire())

threading.Thread(target=woker,args=(s,)).start() # 此處若再想取出,則不能成功,則會阻塞
print  ('----------------------')
logging.info(s.acquire(False))  #此處表示不阻塞
print  ('+++++++++++++++++++++++')
time.sleep(2)
logging.info(s.acquire(timeout=3))  # 此處表示阻塞超時3秒後釋放
print ('%%%%%%%%%%%%%%%%%%%%%')
s.release()  # 此處用於對上述線程中的調用的函數中的內容進行處理

結果以下

python線程同步

都是針對同一個對象進行的處理

3 應用舉例

1 鏈接池

import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Name:
    def __init__(self,name):
        self.name=name

class Pool:
    def __init__(self,count=3):
        self.count=count
        self.pool=[ Name("conn-{}".format(i))  for i in range(3)]   # 初始化連接
    def get_conn(self):
        if len(self.pool)>0:
            data=self.pool.pop() # 從尾部拿出來一個
            logging.info(data)
        else:
            return  None
    def return_conn(self,name:Name):  # 此處添加一個
        self.pool.append(name)
pool=Pool(3)
pool.get_conn()
pool.get_conn()
pool.get_conn()
pool.return_conn(Name('aaa'))
pool.get_conn()

結果以下

python線程同步

2 鎖機制進行處理鏈鏈接池

import  logging
import  threading
import  random
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class Name:
    def __init__(self,name):
        self.name=name

class Pool:
    def __init__(self,count=3):
        self.count=count
        self.sem=threading.Semaphore(count)
        self.event=threading.Event()
        self.pool=[ Name("conn-{}".format(i))   for  i in range(count)]
    def get_conn(self):
        self.sem.acquire()
        data=self.pool.pop()
        return  data
    def return_conn(self,name:Name):  # 此處添加一個
        self.pool.append(name)
        self.sem.release()
def  woker(pool:Pool):
    conn=pool.get_conn()
    logging.info(conn)
    threading.Event().wait(random.randint(1,4))
    pool.return_conn(conn)
pool=Pool(3)
for i in range(8):
    threading.Thread(target=woker,name="worker-{}".format(i),args=(pool,)).start()

結果以下

python線程同步

上述實例中,使用信號量解決資源有限的問題,若是池中有資源,請求者獲取資源時信號量減1,請求者只能等待,當使用者徹底歸資源後信號量加1,等待線程就能夠喚醒拿走資源。

4 BoundedSemaphore

有界信號量,不容許使用release超出初始值範圍,不然,拋出ValueError異常,這個用有界信號修改源代碼,保證若是多return_conn 就會拋出異常,保證了歸還連接拋出異常。


信號量一直release會一直向上加,其會將信號量和連接池都擴容了此處便產生了BoundedSemaphore

import  logging
import  threading
import  random
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
s=threading.BoundedSemaphore(3) # 邊界
s.acquire()  # 此處須要拿取,不然不能直接向其中加
print (1)
s.release()
print (2)
s.release()
print (3)

結果以下

python線程同步

應用以下

import  logging
import  threading
import time
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
class  Conn:
    def  __init__(self,name):
        self.name=name

class Pool:
    def __init__(self,count=3):
        self.count=count  # 初始化連接池
        self.sema=threading.BoundedSemaphore(count)
        self.pool=[Conn("conn-{}".format(i)) for  i in range(count)]  # 初始化連接
    def get_conn(self):
        self.sema.acquire()  # 當拿取的時候,減一
        data=self.pool.pop()  # 從尾部拿出一個
        print (data)

    def return_conn(self,conn:Conn):  #此處返回一個鏈接池
        self.pool.append(conn)  # 必須保證其在拿的時候有  # 使用try 能夠進行處理,下面的必須執行,加成功了,下面的必定要成功的,
        self.sema.release()

pool=Pool(3)
con=Conn('a')
conn=pool.get_conn()
conn=pool.get_conn()
conn=pool.get_conn()

結果以下

python線程同步

5 使用信號量的利端和弊端

若是使用了信號量,仍是沒有用完
self.pool.append(conn)
self.sem.release()
一種極端的狀況下,計數器還差1就滿了,有3個線程A,B,C都執行了第一句,都沒有來得release,這時候輪到線程A release,正常的release,而後輪到線程C先release,必定出現問題,超界了,必定出現問題。


不少線程用完了信號量沒有獲取信號量的線程都會阻塞,沒有線程和歸還的線程爭搶,當append後才release,這時候才能等待的線程被喚醒,才能Pop,也就是沒有獲取信號量就不能pop,這是安全的。

相關文章
相關標籤/搜索