翻譯:老齊python
譯者注:與本文相關圖書推薦:《Python大學實用教程》《跟老齊學Python:輕鬆入門》安全
若是你但願一次可以處理管道中的多個值,就須要一種針對管道的數據結構,它至關於producer
的備份,能實現數量增長和減小。bash
Python標準庫有一個queue
模塊,該模塊有一個Queue
類,下面將Pipeline
改成Queue
,就能夠再也不使用Lock
鎖定某些變量,此外,還將使用Python的threading
模塊中的Event
來中止工做線程,這是一種與以往不一樣的方法。微信
從Event
開始。當有不少線程等待threading.Event
實例的時候,它可以將一個線程標記爲一個事件。這段代碼的關鍵是,等待事件的線程不必定須要中止它們正在作的事情,它們能夠每隔一段時間檢查一次Event
的狀態。markdown
不少事情均可以觸發event
。在本例中,主線程將簡單地休眠一段時間,而後運行.set()
:網絡
if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") # logging.getLogger().setLevel(logging.DEBUG) pipeline = Pipeline() event = threading.Event() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: executor.submit(producer, pipeline, event) executor.submit(consumer, pipeline, event) time.sleep(0.1) logging.info("Main: about to set event") event.set() 複製代碼
這裏惟一的變化是建立了event
對象,而後將event
做爲參數傳給後面的.submit
方法,在with語句中,有一句要sleep一秒鐘,再記錄日誌信息,最後調用event.set()
。數據結構
producer
也不須要改變太多:多線程
def producer(pipeline, event): """Pretend we're getting a number from the network.""" while not event.is_set(): message = random.randint(1, 101) logging.info("Producer got message: %s", message) pipeline.set_message(message, "Producer") logging.info("Producer received EXIT event. Exiting") 複製代碼
while
循環中再也不爲pipeline
設置SENTINEL
值。consumer
須要相應作較大改動:併發
def consumer(pipeline, event): """Pretend we're saving a number in the database.""" while not event.is_set() or not pipeline.empty(): message = pipeline.get_message("Consumer") logging.info( "Consumer storing message: %s (queue size=%s)", message, pipeline.qsize(), ) logging.info("Consumer received EXIT event. Exiting") 複製代碼
必須刪除SENTINEL
值相關的代碼,while
循環的條件也所以更復雜了一些,如今須要考慮not event.is_set()
和not pipeline.empty()
兩個條件,也就是未設置event
,或者pipeline
未清空時。dom
要確保在consumer
進程結束是隊列中已是空的了,不然就會出現如下兩種糟糕的狀況。一是丟失了這些最終消息,但更嚴重的狀況是第二種,producer
若是視圖將信息添加到完整隊列中,會被鎖住,從而不能返回。這種事件會發生在producer
驗證.is_set()
條件以後,調用pipeline.set_message()
以前。
這種事件會發生在producer
驗證.is_set()
條件以後,調用pipeline.set_message()
以前。
若是發生這種狀況,producer
可能會在隊列仍然全滿的狀況下喚醒並退出。而後,調用.set_message()
,.set_message()
將一直等到隊列中有新信息的空間。若consumer
已經退出,這種狀況就不會發生,並且producer
不會退出。
consumer
中的其餘部分看起來應該很熟悉。
然而,Pipeline
還須要重寫:
class Pipeline(queue.Queue): def __init__(self): super().__init__(maxsize=10) def get_message(self, name): logging.debug("%s:about to get from queue", name) value = self.get() logging.debug("%s:got %d from queue", name, value) return value def set_message(self, value, name): logging.debug("%s:about to add %d to queue", name, value) self.put(value) logging.debug("%s:added %d to queue", name, value) 複製代碼
上面的Pipeline
是queue.Queue
的子類。Queue
在初始化時指定一個可選參數,以指定隊列的最大長度。
若是爲maxsize
指定一個正數,則該數字爲隊列元素個數的極限,若是達到該值,.put()
方法被鎖定,直到元素的數量少於maxsize
才解鎖。若是不指定maxsize
,則隊列將增加到計算機內存的所許可的最值。
.get_message()
和.set_message()
兩個方法代碼更少了,它們基本上把.get()
和.put()
封裝在Queue
中。你可能想知道防止線程發生競態條件的鎖都去了哪裏。
編寫標準庫的核心開發人員知道,Queue
常常在多線程環境中使用,因而將鎖合併到Queue
自己中。Queue
對於線程來講是安全的。
此程序的運行以下所示:
$ ./prodcom_queue.py Producer got message: 32 Producer got message: 51 Producer got message: 25 Producer got message: 94 Producer got message: 29 Consumer storing message: 32 (queue size=3) Producer got message: 96 Consumer storing message: 51 (queue size=3) Producer got message: 6 Consumer storing message: 25 (queue size=3) Producer got message: 31 [many lines deleted] Producer got message: 80 Consumer storing message: 94 (queue size=6) Producer got message: 33 Consumer storing message: 20 (queue size=6) Producer got message: 48 Consumer storing message: 31 (queue size=6) Producer got message: 52 Consumer storing message: 98 (queue size=6) Main: about to set event Producer got message: 13 Consumer storing message: 59 (queue size=6) Producer received EXIT event. Exiting Consumer storing message: 75 (queue size=6) Consumer storing message: 97 (queue size=5) Consumer storing message: 80 (queue size=4) Consumer storing message: 33 (queue size=3) Consumer storing message: 48 (queue size=2) Consumer storing message: 52 (queue size=1) Consumer storing message: 13 (queue size=0) Consumer received EXIT event. Exiting 複製代碼
通讀上述示例的輸出,會發現,有的地方頗有意思。在頂部,你能夠看到producer
必須建立5條信息並將其中4條放在隊列中,隊列中最前面的一條被操做系統換掉以後,第5條條信息才能加入隊列。
而後consumer
運行,把第1條信息拉了出來,它打印出了該信息以及隊列在此時的長度:
Consumer storing message: 32 (queue size=3)
複製代碼
此時,標明第5條信息尚未進入pipeline
,刪除單個信息後queue
的減少到3。你也知道queue
能夠保存10條消息,所以queue
線程不會被queue
阻塞,它被操做系統置換了。
注意:你調試的輸出結果會有所不一樣。你的輸出將隨着運行次數的不一樣而改變。這就是用線程工做的樂趣所在!
執行代碼,你能看到主線程生成event事件,這會致使producer
當即退出,consumer
還有不少工做要作,因此它會一直運行,直到清理完pipeline
。
嘗試操做大小不一樣的隊列,並調用producer
或consumer
中的time.sleep()
,以分別模擬更長的網絡或磁盤訪問時間。即便對程序的這些內容稍加更改,也會使結果產生很大差別。
這是解決發PCP的一個好方法,可是你能夠進一步簡化它,不須要使用Pipeline
,一旦去掉日誌記錄,它就會變成一個queue.Queue
。
下面是直接使用queue.Queue
的最終代碼:
import concurrent.futures import logging import queue import random import threading import time def producer(queue, event): """Pretend we're getting a number from the network.""" while not event.is_set(): message = random.randint(1, 101) logging.info("Producer got message: %s", message) queue.put(message) logging.info("Producer received event. Exiting") def consumer(queue, event): """Pretend we're saving a number in the database.""" while not event.is_set() or not queue.empty(): message = queue.get() logging.info( "Consumer storing message: %s (size=%d)", message, queue.qsize() ) logging.info("Consumer received event. Exiting") if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") pipeline = queue.Queue(maxsize=10) event = threading.Event() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: executor.submit(producer, pipeline, event) executor.submit(consumer, pipeline, event) time.sleep(0.1) logging.info("Main: about to set event") event.set() 複製代碼
這更易於閱讀,並展現瞭如何使用Python的內置模塊來簡化複雜的問題。
Lock
和 Queue
是便於解決併發問題的類,但標準庫還提供了其餘類。在結束本文以前,讓咱們瀏覽其中一些類。
Python的threading
模塊還提供了一些類,雖然上面的示例不須要這些,可是它們在不一樣的用例中能夠派上用場,因此熟悉它們是有好處的。
threading.Semaphore
有一些特殊屬性的計數器對象,這裏實現的計數具備原子性,意味着能夠保證操做系統不會在遞增或遞減計數器的過程當中交換線程。
內部計數器在調用.release()
時遞增,在調用.acquire()
時遞減。
另一個特殊屬性,若是一個線程在計數器爲零時調用.acquire()
,則該線程將被鎖定,直到另外一個線程調用.release()
,並將計數器增長到1。
Semaphores
一般用於保護容量有限的資源。例如,若是你有一個鏈接池,而且但願將該池的大小限制爲特定的數目。
threading.Timer
用於在通過必定時間後調度要調用的函數,你能夠經過傳入等待的秒數和調用的函數來建立Timer
實例:
t = threading.Timer(30.0, my_function)
複製代碼
經過調用.start()
啓動Timer
。在指定時間以後的某個時間點,將在新線程上調用該函數。但請注意,沒法保證會在你但願的時間準確調用該函數。
若是要中止已經啓動的Timer
,能夠調用.cancel()
。若是在Timer
觸發後調用.cancel()
,不會執行任何操做,也不會產生異常。
Timer
可用於在特定時間後提示用戶執行操做。若是用戶在Timer
過時以前執行操做,則能夠調用.cancel()
。
threading.Barrier
可用於保持固定數量的線程同步。建立Barrier
時,調用方必須指定將要同步的線程數。每一個線程都調用Barrier
的.wait()
方法,它們都將保持封鎖狀態,直到指定數量的線程在等待,而後所有同時釋放。
請記住:線程是由操做系統調度的,所以,即便全部線程都是同時釋放的,它們也將被調度爲一次運行一個線程。
Barrier
的一個用途是容許線程池對自身進行初始化。讓這些線程初始化後在Barrier
上等待,將確保在全部線程完成初始化以前,沒有一個線程開始運行。
如今你已經瞭解了Python的threading
提供的許多功能,以及一些如何寫線程程序和用線程程序解決問題的示例。你還看到了在編寫和調試線程程序時出現的一些問題。
原文連接:realpython.com/intro-to-py…
關注微信公衆號:老齊教室。讀深度文章,得精湛技藝,享絢麗人生。