【譯】線程:概念和實現(4)

翻譯:老齊python

譯者注:與本文相關圖書推薦:《Python大學實用教程》《跟老齊學Python:輕鬆入門》安全


第四部分

將隊列應用於PCP

若是你但願一次可以處理管道中的多個值,就須要一種針對管道的數據結構,它至關於producer的備份,能實現數量增長和減小。bash

Python標準庫有一個queue模塊,該模塊有一個Queue 類,下面將Pipeline改成Queue,就能夠再也不使用Lock鎖定某些變量,此外,還將使用Python的threading模塊中的Event來中止工做線程,這是一種與以往不一樣的方法。微信

Event開始。當有不少線程等待threading.Event實例的時候,它可以將一個線程標記爲一個事件。這段代碼的關鍵是,等待事件的線程不必定須要中止它們正在作的事情,它們能夠每隔一段時間檢查一次Event的狀態。markdown

不少事情均可以觸發event。在本例中,主線程將簡單地休眠一段時間,而後運行.set()網絡

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)
    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)
        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()
複製代碼

這裏惟一的變化是建立了event對象,而後將event做爲參數傳給後面的.submit方法,在with語句中,有一句要sleep一秒鐘,再記錄日誌信息,最後調用event.set()數據結構

producer也不須要改變太多:多線程

def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")
    logging.info("Producer received EXIT event. Exiting")
複製代碼

while循環中再也不爲pipeline設置SENTINEL值。consumer須要相應作較大改動:併發

def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")
複製代碼

必須刪除SENTINEL值相關的代碼,while循環的條件也所以更復雜了一些,如今須要考慮not event.is_set()not pipeline.empty()兩個條件,也就是未設置event,或者pipeline未清空時。dom

要確保在consumer進程結束是隊列中已是空的了,不然就會出現如下兩種糟糕的狀況。一是丟失了這些最終消息,但更嚴重的狀況是第二種,producer若是視圖將信息添加到完整隊列中,會被鎖住,從而不能返回。這種事件會發生在producer驗證.is_set()條件以後,調用pipeline.set_message()以前。

這種事件會發生在producer驗證.is_set()條件以後,調用pipeline.set_message()以前。

若是發生這種狀況,producer可能會在隊列仍然全滿的狀況下喚醒並退出。而後,調用.set_message().set_message()將一直等到隊列中有新信息的空間。若consumer已經退出,這種狀況就不會發生,並且producer不會退出。

consumer中的其餘部分看起來應該很熟悉。

然而,Pipeline還須要重寫:

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)
複製代碼

上面的Pipelinequeue.Queue的子類。Queue 在初始化時指定一個可選參數,以指定隊列的最大長度。

若是爲maxsize指定一個正數,則該數字爲隊列元素個數的極限,若是達到該值,.put()方法被鎖定,直到元素的數量少於maxsize才解鎖。若是不指定maxsize,則隊列將增加到計算機內存的所許可的最值。

.get_message().set_message()兩個方法代碼更少了,它們基本上把.get().put()封裝在Queue中。你可能想知道防止線程發生競態條件的鎖都去了哪裏。

編寫標準庫的核心開發人員知道,Queue常常在多線程環境中使用,因而將鎖合併到Queue自己中。Queue對於線程來講是安全的。

此程序的運行以下所示:

$ ./prodcom_queue.py
Producer got message: 32
Producer got message: 51
Producer got message: 25
Producer got message: 94
Producer got message: 29
Consumer storing message: 32 (queue size=3)
Producer got message: 96
Consumer storing message: 51 (queue size=3)
Producer got message: 6
Consumer storing message: 25 (queue size=3)
Producer got message: 31

[many lines deleted]

Producer got message: 80
Consumer storing message: 94 (queue size=6)
Producer got message: 33
Consumer storing message: 20 (queue size=6)
Producer got message: 48
Consumer storing message: 31 (queue size=6)
Producer got message: 52
Consumer storing message: 98 (queue size=6)
Main: about to set event
Producer got message: 13
Consumer storing message: 59 (queue size=6)
Producer received EXIT event. Exiting
Consumer storing message: 75 (queue size=6)
Consumer storing message: 97 (queue size=5)
Consumer storing message: 80 (queue size=4)
Consumer storing message: 33 (queue size=3)
Consumer storing message: 48 (queue size=2)
Consumer storing message: 52 (queue size=1)
Consumer storing message: 13 (queue size=0)
Consumer received EXIT event. Exiting
複製代碼

通讀上述示例的輸出,會發現,有的地方頗有意思。在頂部,你能夠看到producer必須建立5條信息並將其中4條放在隊列中,隊列中最前面的一條被操做系統換掉以後,第5條條信息才能加入隊列。

而後consumer運行,把第1條信息拉了出來,它打印出了該信息以及隊列在此時的長度:

Consumer storing message: 32 (queue size=3)
複製代碼

此時,標明第5條信息尚未進入pipeline ,刪除單個信息後queue的減少到3。你也知道queue能夠保存10條消息,所以queue線程不會被queue阻塞,它被操做系統置換了。

注意:你調試的輸出結果會有所不一樣。你的輸出將隨着運行次數的不一樣而改變。這就是用線程工做的樂趣所在!

執行代碼,你能看到主線程生成event事件,這會致使producer當即退出,consumer還有不少工做要作,因此它會一直運行,直到清理完pipeline

嘗試操做大小不一樣的隊列,並調用producerconsumer中的time.sleep(),以分別模擬更長的網絡或磁盤訪問時間。即便對程序的這些內容稍加更改,也會使結果產生很大差別。

這是解決發PCP的一個好方法,可是你能夠進一步簡化它,不須要使用Pipeline,一旦去掉日誌記錄,它就會變成一個queue.Queue

下面是直接使用queue.Queue的最終代碼:

import concurrent.futures
import logging
import queue
import random
import threading
import time

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            "Consumer storing message: %s (size=%d)", message, queue.qsize()
        )

    logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()
複製代碼

這更易於閱讀,並展現瞭如何使用Python的內置模塊來簡化複雜的問題。

LockQueue是便於解決併發問題的類,但標準庫還提供了其餘類。在結束本文以前,讓咱們瀏覽其中一些類。

Threading

Python的threading模塊還提供了一些類,雖然上面的示例不須要這些,可是它們在不一樣的用例中能夠派上用場,因此熟悉它們是有好處的。

Semaphore

threading.Semaphore有一些特殊屬性的計數器對象,這裏實現的計數具備原子性,意味着能夠保證操做系統不會在遞增或遞減計數器的過程當中交換線程。

內部計數器在調用.release()時遞增,在調用.acquire()時遞減。

另一個特殊屬性,若是一個線程在計數器爲零時調用.acquire(),則該線程將被鎖定,直到另外一個線程調用.release(),並將計數器增長到1。

Semaphores一般用於保護容量有限的資源。例如,若是你有一個鏈接池,而且但願將該池的大小限制爲特定的數目。

Timer

threading.Timer用於在通過必定時間後調度要調用的函數,你能夠經過傳入等待的秒數和調用的函數來建立Timer實例:

t = threading.Timer(30.0, my_function)
複製代碼

經過調用.start()啓動Timer。在指定時間以後的某個時間點,將在新線程上調用該函數。但請注意,沒法保證會在你但願的時間準確調用該函數。

若是要中止已經啓動的Timer,能夠調用.cancel()。若是在Timer觸發後調用.cancel(),不會執行任何操做,也不會產生異常。

Timer可用於在特定時間後提示用戶執行操做。若是用戶在Timer過時以前執行操做,則能夠調用.cancel()

Barrier

threading.Barrier可用於保持固定數量的線程同步。建立Barrier時,調用方必須指定將要同步的線程數。每一個線程都調用Barrier.wait()方法,它們都將保持封鎖狀態,直到指定數量的線程在等待,而後所有同時釋放。

請記住:線程是由操做系統調度的,所以,即便全部線程都是同時釋放的,它們也將被調度爲一次運行一個線程。

Barrier的一個用途是容許線程池對自身進行初始化。讓這些線程初始化後在Barrier上等待,將確保在全部線程完成初始化以前,沒有一個線程開始運行。

結論:Python中的線程

如今你已經瞭解了Python的threading提供的許多功能,以及一些如何寫線程程序和用線程程序解決問題的示例。你還看到了在編寫和調試線程程序時出現的一些問題。

原文連接:realpython.com/intro-to-py…

關注微信公衆號:老齊教室。讀深度文章,得精湛技藝,享絢麗人生。

相關文章
相關標籤/搜索