Python併發編程之線程消息通訊機制/任務協調(四)

image.png

你們好,併發編程 進入第四篇。
編程

本文目錄


  • 前言安全

  • Event事件併發

  • Conditionapp

  • Queue隊列ide

  • 總結函數

前言

前面我已經向你們介紹了,如何使用建立線程,啓動線程。相信你們都會有這樣一個想法,線程無非就是建立一下,而後再start()下,實在是太簡單了。學習

但是要知道,在真實的項目中,實際場景可要咱們舉的例子要複雜的多得多,不一樣線程的執行多是有順序的,或者說他們的執行是有條件的,是要受控制的。若是僅僅依靠前面學的那點淺薄的知識,是遠遠不夠的。ui

那今天,咱們就來探討一下如何控制線程的觸發執行。spa

要實現對多個線程進行控制,其實本質上就是消息通訊機制在起做用,利用這個機制發送指令,告訴線程,何時能夠執行,何時不能夠執行,執行什麼內容。線程

通過個人總結,線程中通訊方法大體有以下三種:

  • threading.Event

  • threading.Condition

  • queue.Queue

先拋出結論,接下來咱們來一一探討下。


Event事件

Python提供了很是簡單的通訊機制 Threading.Event,通用的條件變量。多個線程能夠等待某個事件的發生,在事件發生後,全部的線程都會被激活

關於Event的使用也超級簡單,就三個函數

event = threading.Event()

# 重置event,使得全部該event事件都處於待命狀態
event.clear()

# 等待接收event的指令,決定是否阻塞程序執行
event.wait()

# 發送event指令,使全部設置該event事件的線程執行
event.set()

舉個例子來看下。

import time
import threading


class MyThread(threading.Thread):
   def __init__(self, name, event):
       super().__init__()
       self.name = name
       self.event = event

   def run(self):
       print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
       # 等待event.set()後,才能往下執行
       self.event.wait()
       print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))


threads = []
event = threading.Event()

# 定義五個線程
[threads.append(MyThread(str(i), event)) for i in range(1,5)]

# 重置event,使得event.wait()起到阻塞做用
event.clear()

# 啓動全部線程
[t.start() for t in threads]

print('等待5s...')
time.sleep(5)

print('喚醒全部線程...')
event.set()

執行一下,看看結果

Thread: 1 start at Sun May 13 20:38:08 2018
Thread: 2 start at Sun May 13 20:38:08 2018
Thread: 3 start at Sun May 13 20:38:08 2018
Thread: 4 start at Sun May 13 20:38:08 2018

等待5s...

喚醒全部線程...
Thread: 1 finish at Sun May 13 20:38:13 2018
Thread: 4 finish at Sun May 13 20:38:13 2018
Thread: 2 finish at Sun May 13 20:38:13 2018
Thread: 3 finish at Sun May 13 20:38:13 2018

可見在全部線程都啓動(start())後,並不會執行完,而是都在self.event.wait()止住了,須要咱們經過event.set()來給全部線程發送執行指令才能往下執行。

Condition

Condition和Event 是相似的,並無多大區別。

一樣,Condition也只須要掌握幾個函數便可。

cond = threading.Condition()

# 相似lock.acquire()
cond.acquire()

# 相似lock.release()
cond.release()

# 等待指定觸發,同時會釋放對鎖的獲取,直到被notify才從新佔有瑣。
cond.wait()

# 發送指定,觸發執行
cond.notify()

舉個網上一個比較趣的捉迷藏的例子來看看

import threading, time

class Hider(threading.Thread):
   def __init__(self, cond, name):
       super(Hider, self).__init__()
       self.cond = cond
       self.name = name

   def run(self):
       time.sleep(1)  #確保先運行Seeker中的方法
       self.cond.acquire()

       print(self.name + ': 我已經把眼睛蒙上了')
       self.cond.notify()
       self.cond.wait()
       print(self.name + ': 我找到你了哦 ~_~')
       self.cond.notify()

       self.cond.release()
       print(self.name + ': 我贏了')

class Seeker(threading.Thread):
   def __init__(self, cond, name):
       super(Seeker, self).__init__()
       self.cond = cond
       self.name = name

   def run(self):
       self.cond.acquire()
       self.cond.wait()
       print(self.name + ': 我已經藏好了,你快來找我吧')
       self.cond.notify()
       self.cond.wait()
       self.cond.release()
       print(self.name + ': 被你找到了,哎~~~')

cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

經過cond來通訊,阻塞本身,並使對方執行。從而,達到有順序的執行。
看下結果

hider:   我已經把眼睛蒙上了
seeker:  我已經藏好了,你快來找我吧
hider:   我找到你了 ~_~
hider:   我贏了
seeker:  被你找到了,哎~~~

Queue隊列

終於到了咱們今天的主角了。

從一個線程向另外一個線程發送數據最安全的方式可能就是使用 queue 庫中的隊列了。建立一個被多個線程共享的 Queue 對象,這些線程經過使用put() 和 get() 操做來向隊列中添加或者刪除元素。

一樣,對於Queue,咱們也只須要掌握幾個函數便可。

from queue import Queue
# maxsize默認爲0,不受限
# 一旦>0,而消息數又達到限制,q.put()也將阻塞
q = Queue(maxsize=0)

# 阻塞程序,等待隊列消息。
q.get()

# 獲取消息,設置超時時間
q.get(timeout=5.0)

# 發送消息
q.put()

# 等待全部的消息都被消費完
q.join()

# 如下三個方法,知道就好,代碼中不要使用

# 查詢當前隊列的消息個數
q.qsize()

# 隊列消息是否都被消費完,True/False
q.empty()

# 檢測隊列裏消息是否已滿
q.full()

函數會比以前的多一些,同時也從另外一方面說明了其功能更加豐富。

我來舉個老師點名的例子,你們感覺一下。

from queue import Queue
from threading import Thread
import time

class Student(Thread):
   def __init__(self, name, queue):
       super().__init__()
       self.name = name
       self.queue = queue

   def run(self):
       while True:
           # 阻塞程序,時刻監聽老師,接收消息
           msg = self.queue.get()
           # 一旦發現點到本身名字,就趕忙答到
           if msg == self.name:
               print("{}:到!".format(self.name))


class Teacher:
   def __init__(self, queue):
       self.queue=queue

   def call(self, student_name):
       print("老師:{}來了沒?".format(student_name))
       # 發送消息,要點誰的名
       self.queue.put(student_name)


queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name="小明", queue=queue)
s2 = Student(name="小亮", queue=queue)
s1.start()
s2.start()

print('開始點名~')
teacher.call('小明')
time.sleep(1)
teacher.call('小亮')

運行結果以下

開始點名~
老師:小明來了沒?
小明:到!
老師:小亮來了沒?
小亮:到!

總結

學習了以上三種通訊方法,咱們很容易就能發現Event 和Condition 是threading模塊原生提供的模塊,原理簡單,功能單一,它能發送 True 和 False 的指令,因此只能適用於某些簡單的場景中。

Queue則是比較高級的模塊,它可能發送任何類型的消息,包括字符串、字典等。其內部實現其實也引用了Condition模塊(譬如putget函數的阻塞),正是其對Condition進行了功能擴展,因此功能更加豐富,更能知足實際應用。

相關文章
相關標籤/搜索