一篇文章理清Python多線程之同步條件,信號量和隊列

公衆號:pythonisloverpython

今天這篇文章大概介紹下python多線程中的同步條件Event,信號量(Semaphore)和隊列(queue),這是咱們多線程系列的最後一篇文章,之後將會進入python多進程的系列。編程

同步條件(Event)

先說說爲何咱們須要這個同步條件,咱們的python多線程在執行task過程當中,是相互競爭的,你們均可以先獲取cpu的執行權限,這就是問題所在的地方,每一個線程都是獨立運行且狀態不可預測,可是咱們想一想若是咱們的業務中須要根據狀況來決定線程的執行順序,也就是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時候咱們就須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。bootstrap

在 初始狀況下,Event對象中的信號標誌被設置爲假,若是有線程等待一個Event對象, ,那麼這個線程將會被一直阻塞直至該標誌爲真。安全

一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程繼續執行。bash

Event的方法以下:數據結構

event.isSet():返回event的狀態值
event.wait():若是 event.isSet()==False,將阻塞線程觸發event.wait()
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待執行
event.clear():恢復event的狀態值爲False
複製代碼

下面段代碼例子:多線程

首先咱們描述一個場景併發

1.老師說這堂課咱們要作測試卷子,作完才能放學app

2.學生叫苦不迭,啊啊啊啊啊啊dom

3.學生作試卷中

4.作完試卷放學回家

import threading
import time

class Teacher(threading.Thread):
    def run(self):
        print("你們如今要考試")
        print(event.isSet())
        event.set()
        time.sleep(3)
        print("考試結束")
        print(event.isSet())
        event.set()
class Student(threading.Thread):
    def run(self):
        event.wait()
        print("啊啊啊啊啊啊")
        time.sleep(1)
        event.clear()
        event.wait()
        print("下課回家")

if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(10):
        threads.append(Student())
    threads.append(Teacher())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
複製代碼

咱們來解釋下上面代碼的執行流程

1.模擬1個老師和10個學生,進行考試,咱們須要的目的是學生線程要等待老師線程說完「你們如今考試」,而後學生線程去考試,以後老師線程說「考試結束」,學生線程放學回家,學生線程的執行與否取決於老師線程,因此這裏用的Event
2.學生線程開始event.wait(),這個說明若是event若是一直不設置的話,學生線程就一直等待,等待一個event.set()操做
3.老師線程說完"你們如今要考試",而後event.set(),執行event,設置完執行,學生線程就可以被喚醒繼續執行下面的操做發出"啊啊啊啊啊啊"的叫苦不迭
4.學生線程進行考試,而且執行event.clear(),清除event,由於他們在等老師說「考試結束」,以後他們在等老師線程的event.set()
5.老師線程執行event.set(),喚醒學生線程,而後下課回家.
複製代碼

最後結果是:

你們如今要考試
False
啊啊啊啊啊啊
啊啊啊啊啊啊
啊啊啊啊啊啊
啊啊啊啊啊啊

啊啊啊啊啊啊
啊啊啊啊啊啊
啊啊啊啊啊啊
啊啊啊啊啊啊
啊啊啊啊啊啊
啊啊啊啊啊啊
考試結束
False
下課回家
下課回家
下課回家
下課回家
下課回家
下課回家


下課回家
下課回家
下課回家

下課回家


Process finished with exit code 0
複製代碼

不知道你們有沒有看懂上面的一串解釋,感受仍是比較亂,勉強看看。。。。

信號量(Semaphore)

信號量用來控制線程併發數的,Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。其實就是控制最多幾個線程能夠操做同享資源。

import threading
import time

semaphore = threading.Semaphore(5)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + '獲取共享資源')
        time.sleep(2)
        semaphore.release()

for i in range(10)
  t1 = threading.Thread(target=func)
  t1.start()
複製代碼

上面一個簡單的例子就是建立10個線程,讓每次只讓5個線程去執行func函數。

結果:5個線程一批一批的執行打印,中間停格2s

Thread-1獲取共享資源
Thread-2獲取共享資源
Thread-3獲取共享資源
Thread-4獲取共享資源
Thread-5獲取共享資源

Thread-6獲取共享資源
Thread-8獲取共享資源
Thread-7獲取共享資源
Thread-9獲取共享資源
Thread-10獲取共享資源
複製代碼

隊列(queue)

Queue是python標準庫中的線程安全的隊列實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞

說到線程安全,那麼下面咱們看看咱們經常使用的list列表是否是線程安全的

import threading,time

m=[1,2,3,4,5]
print(m[-1])

def remove_last():
    a=m[-1]
    time.sleep(1)
    m.remove(a)


t1=threading.Thread(target=remove_last)
t1.start()

t2=threading.Thread(target=remove_last)
t2.start()
複製代碼

結果

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Users\aryin\AppData\Local\Programs\Python\Python37\lib\threading.py", line 917, in _bootstrap_inner
    self.run()
  File "C:\Users\aryin\AppData\Local\Programs\Python\Python37\lib\threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "C:/Users/aryin/Desktop/mysite2/Event_Semaphore_queue.py", line 58, in remove_last
    m.remove(a)
ValueError: list.remove(x): x not in list
複製代碼

上面的代碼很簡單就是開2個線程去上次隊列M的最後一個數,按道理是最後m的數據會被remove完,可是結果卻報錯了,這是由於list不是線程安全的,線程一取了最後一個數據刪除可是sleep 1秒的時候,線程2也拿到了相同的數,那麼等線程1 remove這個數以後,線程2再去remove就會報錯了

下面看看隊列的經常使用方法

建立一個「隊列」對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,
get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。

Python Queue模塊有三種隊列及構造函數:
一、Python Queue模塊的FIFO隊列先進先出。   class queue.Queue(maxsize)
二、LIFO相似於堆,即先進後出。               class queue.LifoQueue(maxsize)
三、還有一種是優先級隊列級別越低越先出來。        class queue.PriorityQueue(maxsize)

此包中的經常使用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 若是隊列爲空,返回True,反之False
q.full() 若是隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 至關q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 至關q.put(item, False)
q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列爲空,再執行別的操做
複製代碼

隊列(queue)通常會被用在生產者和消費者模型上。

生產者消費者模型:

爲何要使用生產者和消費者模式

在python線程中,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

下面咱們看看生產者消費者的代碼,就拿你們常說的吃包子爲例子吧

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("製造包子ing")
    time.sleep(random.randrange(3))
    q.put(count)
    print('生產者 %s 生產了 %s 包子..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()

def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('消費者 %s 消費了 %s 包子...' %(name, data))
    else:
        print("包子吃完了")
    count +=1

c1 = threading.Thread(target=Producer, args=('小明',))
c2 = threading.Thread(target=Consumer, args=('小花',))
c3 = threading.Thread(target=Consumer, args=('小灰',))
c1.start()
c2.start()
c3.start()

c1.join()
c2.join()
c3.join()

print('結束')
複製代碼

結果你們能夠有興趣本身執行下看看,代碼沒什麼難度,就是2個消費者和一個生產者的故事。

上面的代碼咱們還可使用下面的方法來實現

q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列爲空,再執行別的操做
複製代碼
import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("製造包子ing")
    time.sleep(random.randrange(3))
    q.put(count)
    print('生產者 %s 生產了 %s 包子..' %(name, count))
    count +=1
    q.task_done()
    #q.join()

def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    data = q.get()
    #q.task_done()
    print('等待中')
    q.join()
    print('消費者 %s 消費了 %s 包子...' %(name, data))
    count +=1

c1 = threading.Thread(target=Producer, args=('小明',))
c2 = threading.Thread(target=Consumer, args=('小花',))
c3 = threading.Thread(target=Consumer, args=('小灰',))
c4 = threading.Thread(target=Consumer, args=('小天',))

c1.start()
c2.start()
c3.start()
c4.start()
複製代碼

更多文章: 1.Python基礎-不同的切片操做 --mp.weixin.qq.com/s/3pute7-xr…

2.Python面向對象三大特徵之繼承 --mp.weixin.qq.com/s/RZrClpQ-Y…

3.Python面向對象三大特徵之封裝 --mp.weixin.qq.com/s/rC-f0nUaL…

4.Python面向對象三大特徵之多態 --mp.weixin.qq.com/s/j16IkZJ2Z…

5.Python面向對象中的反射 --mp.weixin.qq.com/s/RD-Id8THx…

6.Python的裝飾器原來是這麼用的 --mp.weixin.qq.com/s/Ak6RjM-I6…

關注公衆號送百G視頻和百本電子書

相關文章
相關標籤/搜索