一文了解Python的線程

問題python

  • 什麼是線程?算法

  • 如何建立、執行線程?數據庫

  • 如何使用線程池ThreadPoolExecutor?編程

  • 如何避免資源競爭問題?安全

  • 如何使用Python中線程模塊threading提供的經常使用工具?網絡

 

目錄數據結構

1. 什麼是線程多線程

2. 建立線程併發

    2.1. 守護線程app

    2.2. 加入線程

3. 多線程

4. 線程池

5. 競態條件

    5.1. 單線程

    5.2. 兩個線程

    5.3. 示例的意義

6. 同步鎖

7. 死鎖

8. 生產者-消費者模型中的線程

    8.1 在生產者-消費者模型中使用鎖

    8.2 在生產者-消費者模型中使用隊列

9. 線程對象

    9.1 信號量

    9.2 定時器

    9.3 柵欄

1. 什麼是線程

  線程是操做系統可以進行運算調度的最小單位,它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。

  在Python3中實現的大部分運行任務裏,不一樣的線程實際上並無同時運行:它們只是看起來像是同時運行的。

  你們很容易認爲線程化是在程序上運行兩個(或多個)不一樣的處理器,每一個處理器同時執行一個獨立的任務。這種理解並不徹底正確,線程可能會在不一樣的處理器上運行,但一次只能運行一個線程。

  同時執行多個任務須要使用非標準的Python運行方式:用不一樣的語言編寫一部分代碼,或者使用多進程模塊multiprocessing,但這麼作會帶來一些額外的開銷。

  因爲Python默認的運行環境是CPython(C語言開發的Python),因此線程化可能不會提高全部任務的運行速度。這是由於和GIL(Global Interpreter Lock)的交互造成了限制:一次只能運行一個Python線程。

  線程化的通常替代方法是:讓各項任務花費大量時間等待外部事件。但問題是,若是想縮短等待時間,會須要大量的CPU計算,結果是程序的運行速度可能並不會提高。

  當代碼是用Python語言編寫並在默認執行環境CPython上運行時,會出現這種狀況。若是線程代碼是用C語言寫的,那它們就可以釋放GIL並同時運行。若是是在別的Python執行環境(如IPython, PyPy,Jython, IronPython)上運行,請參考相關文檔瞭解它們是如何處理線程的。

  若是隻用Python語言在默認的Python執行環境下運行,而且遇到CPU受限的問題,那就應該用多進程模塊multiprocessing來解決。

  在程序中使用線程也能夠簡化設計。本文中的大部分示例並不保證能夠提高程序運行速度,其目的是使設計結構更加清晰、便於邏輯推理。

2. 建立線程

既然已經對什麼是線程有了初步瞭解,下面讓咱們來學習如何建立一個線程。

Python標準庫提供了threading模塊,裏面包含將在本文中介紹的大部分基本模塊。在這個模塊中,Thread類很好地封裝了有關線程的子類,爲咱們提供了乾淨的接口來使用它們。

要啓動一個線程,須要建立一個Thread實例,而後調用.start()方法:

import logging
import threading
import time


def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)


if __name__ == "__main__":
    log_format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=log_format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

查看日誌語句,能夠看到__main__部分正在建立並啓動線程:

x = threading.Thread(target=thread_function, args=(1,))
x.start()

建立線程時,咱們須要傳遞兩個參數,第一個參數target是函數名,指定這個線程去哪一個函數裏面去執行代碼,第二個參數args是一個元組類型,指定爲這個函數傳遞的參數。在本例中,Thread運行函數thread_function(),並將1做爲參數傳遞給該函數。

在本文中,咱們用連續整數爲線程命名。雖然threading.get_ident()方法可以爲每個線程生成惟一的名稱,但這些名稱一般會比較長,並且可讀性差。

這裏的thread_function()函數自己沒作什麼,它只是簡單地記錄了一些信息,並用time.sleep()隔開。

運行程序(註釋掉倒數第二行代碼),結果以下:

15:42:26: Main    : before creating thread
15:42:26: Main    : before running thread
15:42:26: Thread 1: starting
15:42:26: Main    : wait for the thread to finish
15:42:26: Main    : all done
15:42:28: Thread 1: finishing

能夠看到,線程Thread__main__部分代碼運行完後才結束。下一節會對這一現象作出解釋,並討論被註釋掉那行代碼。

2.1. 守護線程

在計算機科學中,守護進程daemon是一類在後臺運行的特殊進程,用於執行特定的系統任務。

守護進程daemon在Python線程模塊threading中有着特殊的含義。當程序退出時,守護線程將當即關閉。能夠這麼理解,守護線程是一個在後臺運行,且不用費心去關閉它的線程,由於它會隨程序自動關閉。

若是程序運行的線程是非守護線程,那麼程序將等待全部線程結束後再終止。但若是運行的是守護線程,當程序退出時,守護線程會被自動殺死。

咱們仔細研究一下上面程序運行的結果,注意看最後兩行。當運行程序時,在__main__部分打印完all done信息後、線程結束前,有一個大約2秒的停頓。

這時,Python在等待非守護線程完成運行。當Python程序結束時,關閉過程的一部分是清理線程。

查看Python線程模塊的源代碼,能夠看到thread ._shutdown()方法遍歷全部正在運行的線程,並在每一個非守護線程上調用.join()函數,檢查它們是否已經結束運行。

所以,程序退出時須要等待,由於守護線程自己會在休眠中等待其餘非守護線程運行結束。一旦thread ._shutdown()運行完畢並打印出信息,程序就能夠退出。

守護線程這種自動退出的特性很實用,但其實還有其餘的方法能實現相同的功能。咱們先用守護線程重複運行一下上面的程序,看看結果。只需在建立線程時,添加參數daemon=True

x = threading.Thread(target=thread_function, args=(1,), daemon=True)

如今運行程序,結果以下:

15:46:50: Main    : before creating thread
15:46:50: Main    : before running thread
15:46:50: Thread 1: starting
15:46:50: Main    : wait for the thread to finish
15:46:50: Main    : all done
15:46:52: Thread 1: finishing

添加參數daemon=True前

15:46:13: Main    : before creating thread
15:46:13: Main    : before running thread
15:46:13: Thread 1: starting
15:46:13: Main    : wait for the thread to finish
15:46:13: Main    : all done

添加參數daemon=True後

不一樣的地方是,以前輸出的最後一行不見了,說明thread_function()函數沒有機會完成運行。這是一個守護線程,因此當__main__部分運行完最後一行代碼,程序終止,守護線程被殺死。

2.2. 加入一個線程

守護線程用起來很方便,但若是想讓守護線程運行完畢後再結束程序該怎麼辦?或者想讓守護線程運行完後不退出程序呢?

讓咱們來看一下剛剛註釋掉的那行代碼:

# x.join()

要讓一個線程等待另外一個線程完成,能夠調用.join()函數。若是取消對這行代碼的註釋,主線程將會暫停,等待線程x完成運行。

這個功能在守護線程和非守護線程上一樣適用。若是用.join()函數加入了一個線程,則主線程將一直等待,直到被加入的線程運行完成。

15:48:06: Main    : before creating thread
15:48:06: Main    : before running thread
15:48:06: Thread 1: starting
15:48:06: Main    : wait for the thread to finish
15:48:08: Thread 1: finishing
15:48:08: Main    : all done

3. 多線程

到目前爲止,示例代碼中只用到了兩個線程:主線程和一個threading.Thread線程對象。

一般,咱們但願同時啓動多個線程,讓它們執行不一樣的任務。先來看看比較複雜的建立多線程的方法,而後再看簡單的。

這個複雜的建立方法其實前面已經展現過了:

import logging
import threading
import time


def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)


if __name__ == "__main__":
    log_format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=log_format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)

這段代碼和前面提到的建立單線程時的結構是同樣的,建立線程對象,而後調用.start()方法。程序中會保存一個包含多個線程對象的列表,爲稍後使用.join()函數作準備。

屢次運行這段代碼可能會產生一些有趣的結果:

15:51:43: Main    : create and start thread 0.
15:51:43: Thread 0: starting
15:51:43: Main    : create and start thread 1.
15:51:43: Thread 1: starting
15:51:43: Main    : create and start thread 2.
15:51:43: Thread 2: starting
15:51:43: Main    : before joining thread 0.
15:51:45: Thread 0: finishing
15:51:45: Main    : thread 0 done
15:51:45: Main    : before joining thread 1.
15:51:45: Thread 2: finishing
15:51:45: Thread 1: finishing
15:51:45: Main    : thread 1 done
15:51:45: Main    : before joining thread 2.
15:51:45: Main    : thread 2 done

仔細看一下輸出結果,三個線程都按照預想的順序建立0,1,2,但它們的結束順序倒是相反的!屢次運行將會生成不一樣的順序。查看線程Thread x: finish中的信息,能夠知道每一個線程都在什麼時候完成。

線程的運行順序是由操做系統決定的,而且很難預測。頗有可能每次運行所獲得的順序都不同,因此在用線程設計算法時須要注意這一點。

幸運的是,Python中提供了幾個基礎模塊,能夠用來協調線程並讓它們一塊兒運行。在介紹這部份內容以前,讓咱們先看看如何更簡單地建立一組線程。

4. 線程池

咱們能夠用一種更簡單的方法來建立一組線程:線程池ThreadPoolExecutor,它是Python中concurrent.futures標準庫的一部分。(Python 3.2 以上版本適用)。

最簡單的方式是把它建立成上下文管理器,並使用with語句管理線程池的建立和銷燬。

ThreadPoolExecutor重寫上例中的__main__部分,代碼以下:

import concurrent.futures
import logging
import time


def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)


if __name__ == "__main__":
    log_format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=log_format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

這段代碼建立一個線程池ThreadPoolExecutor做爲上下文管理器,並傳入須要的工做線程數量。而後使用.map()遍歷可迭代對象,本例中是range(3),每一個對象生成池中的一個線程。

with模塊的結尾,會讓線程池ThreadPoolExecutor對池中的每一個線程調用.join()。強烈建議使用線程池ThreadPoolExecutor做爲上下文管理器,由於這樣就不會忘記寫.join()

注:

使用線程池ThreadPoolExecutor可能會報一些奇怪的錯誤。例如,調用一個沒有參數的函數,但將參數傳入.map()時,線程將拋出異常。

不幸的是,線程池ThreadPoolExecutor會隱藏該異常,程序會在沒有任何輸出的狀況下終止。剛開始調試時,這會讓人很頭疼。

運行修改後的示例代碼,結果以下:

15:54:29: Thread 0: starting
15:54:29: Thread 1: starting
15:54:29: Thread 2: starting
15:54:31: Thread 0: finishing
15:54:31: Thread 2: finishing
15:54:31: Thread 1: finishing

再提醒一下,這裏的線程1在線程0以前完成,這是由於線程的調度是由操做系統決定的,並不遵循一個特定的順序。

5. 競態條件

在繼續介紹Python線程模塊的一些其餘特性以前,讓咱們先討論一下在編寫線程化程序時會遇到的一個更頭疼的問題: 競態條件。

咱們先了解一下競態條件的含義,而後看一個實例,再繼續學習標準庫提供的其餘模塊,來防止競態條件的發生。

當兩個或多個線程訪問共享的數據或資源時,可能會出現競態條件。在本例中,咱們建立了一個每次都會發生的大型競態條件,但請注意,大多數競態條件不會如此頻繁發生。一般狀況下,它們不多發生,但一旦發生,會很難進行調試。

在本例中,咱們會寫一個更新數據庫的類,但這裏並不須要一個真正的數據庫,只是一個虛擬的,由於這不是本文討論的重點。

這個FakeDatabase類包括.__init__().update()方法。

class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)

FakeDatabase類會一直跟蹤一個值: .value,它是共享數據,這裏會出現競態條件。

.__init__()方法將.value的值初始化爲0。.update()方法從數據庫中讀取一個值,對其進行一些計算,而後將新值寫回數據庫。

FakeDatabase類的使用實例以下:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

該程序建立一個線程池ThreadPoolExecutor,裏面包含兩個線程,而後在每一個線程上調用.submit()方法,告訴它們運行database.update()函數。

.submit()容許將位置參數和關鍵字參數傳遞給正在線程中運行的函數:

.submit(function, *args, **kwargs)

示例代碼中,index做爲惟一一個位置參數傳遞給database.update()函數,後面會介紹,也能夠用相似的方式傳遞多個參數。

因爲每一個線程都會運行.update(), 讓.value的變量值加上1,因此最後打印出的database.value值應該是2。但若是是這樣的話,舉這個例子就沒有什麼意義了。

實際上,運行上面這段代碼的輸出以下:

16:03:32: Testing update. Starting value is 0.
16:03:32: Thread 0: starting update
16:03:32: Thread 1: starting update
16:03:32: Thread 1: finishing update
16:03:32: Thread 0: finishing update
16:03:32: Testing update. Ending value is 1.

咱們來仔細研究一下這裏究竟發生了什麼,有助於更好地理解有關這個問題的解決方案。 

5.1. 單線程

在深刻研究上面有關兩個線程的問題以前,咱們先回過頭看一下線程究竟是如何工做的。

這裏不會討論全部的細節,由於在目前這個學習階段還不必掌握這麼多內容。咱們還將簡化一些東西,雖然可能在技術上不夠精確,但能夠方便你們理解其中的原理。

當線程池ThreadPoolExecutor運行每一個線程時,咱們會指定運行哪一個函數,以及傳遞給該函數的參數:executor.submit(database.update, index),這裏是指運行database.update函數,並傳入index參數。

這麼作的結果是,線程池中的每一個線程都將調用database.update(index)。注意,主線程__main__中建立的database是對FakeDatabase對象的引用。在這個對象上調用.update(),會調用該對象的實例方法。

每一個線程都將引用同一個FakeDatabase對象:database。每一個線程還有一個獨特的index值,使得日誌語句更易閱讀:

當線程開始運行.update()函數時,它擁有局部變量local_copy。這絕對是一件好事,不然,運行相同函數的兩個線程老是會相互混淆。也就是說,函數內定義的局部變量是線程安全的。

如今咱們能夠看一下,若是使用單線程、調用一次.update()函數運行上面的程序會發生什麼。

下圖展現了在只運行一個線程的狀況下,.update()函數是如何逐步執行的。代碼顯示在左上角,後面跟着一張圖,顯示線程中局部變量local_value和共享數據database.value的值:

 

這張圖是這樣佈局的,從上至下時間增長,它以建立線程1開始,並在線程1終止時結束。

線程1啓動時,FakeDatabase.value的值爲0。第一行代碼將值0複製給局部變量local_copy。接下來,local_copy += 1語句讓local_copy的值增長1,能夠看到線程1中的.value值變成了1。

而後調用time.sleep()方法,暫停當前線程,並容許其餘線程運行。由於本例中只有一個線程,這裏沒什麼影響。

當線程1被喚醒繼續運行時,它將新值從局部變量local_copy複製到FakeDatabase.value,線程完成運行。能夠看到database.value的值被設爲1。

到目前爲止,一切順利。咱們運行了一次.update()函數,FakeDatabase.value值增長到1。

5.2. 兩個線程

回到競態條件,這兩個線程會併發運行,但不會同時運行。它們都有各自的局部變量local_copy,並指向相同的database對象。正是database這個共享數據致使了這些問題。

程序建立線程1,運行update()函數:

當線程1調用time.sleep()方法時,它容許另外一個線程開始運行。這時,線程2啓動並執行相同的操做。它也將database.value的值複製給私有變量local_copy,但共享數據database.value的值還未更新,仍爲0:

當線程2進入休眠狀態時,共享數據database.value的值仍是未被修改的0,並且兩個線程中的私有變量local_copy的值都是1。

如今線程1被喚醒並保存其私有變量local_copy的值,而後終止,線程2繼續運行。線程2在休眠的時候並不知道線程1已經運行完畢並更新了database.value中的值,當繼續運行時, 它將本身私有變量local_copy的值存儲到database.value中,也是1。

這兩個線程交錯訪問同一個共享對象,覆蓋了彼此的結果。當一個線程釋放內存或在另外一個線程完成訪問以前關閉文件句柄時,可能會出現相似的競爭條件。

5.3. 示例的意義

上面的例子是爲了確保每次運行程序時都發生競態條件。由於操做系統能夠在任什麼時候候交換出一個線程,因此有可能在讀取了x的值以後,像x = x + 1這樣的語句會中斷,致使寫回數據庫的值不是咱們想要的。

這一過程當中的細節很是有趣,但本文剩下部分的學習不須要了解具體細節,因此能夠先跳過。

看完有關競態條件的實例,讓咱們接下來看看如何解決它們!

6. 同步鎖

有不少方法能夠避免或解決競態條件,這裏不會介紹全部的解決方法,但會提到一些會常常用到的。讓咱們先從鎖Lock開始學習。

要解決上述競態條件問題,須要找到一種方法,每次只容許一個線程進入代碼的read-modify-write部分。最經常使用就是Python中的鎖。在一些其餘語言中,一樣的思想被稱爲互斥鎖mutex。互斥鎖mutex屬於進程互斥MUTual EXclusion的一部分,它和鎖所作的工做是同樣的。

鎖是一種相似於通行證的東西,每次只有一個線程能夠擁有鎖,任何其餘想要得到鎖的線程必須等待,直到該鎖的全部者將它釋放出來。

完成此任務的基本函數是.acquire().release()。線程將調用my_lock.acquire()來獲取鎖。若是鎖已經存在,則調用線程將會等待,直到鎖被釋放。這裏有一點很重要,若是一個線程得到了鎖,但從未釋放,程序會被卡住。稍後會介紹更多關於這方面的內容。

幸運的是,Python的鎖也將做爲上下文管理器運行,因此能夠在with語句中使用它,而且當with模塊出於任何緣由退出時,鎖會自動釋放。

讓咱們看看添加了鎖的FakeDatabase,調用函數保持不變:

import logging
import time
import threading


class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)

除了添加一些調試日誌以便更清楚地查看鎖的運行以外,這裏最大的變化是添加了一個叫._lock的成員,它是一個thread . lock()對象。這個._lock在未鎖定狀態下被初始化,並由with語句鎖定和釋放。

值得注意的是,運行該函數的線程將一直持有這個鎖,直到它徹底更新完數據庫。在本例中,這意味着它將在複製、更新、休眠並將值寫回數據庫的整個過程當中持有鎖。

日誌設置爲警告級別,運行程序,結果以下:

16:08:44: Testing update. Starting value is 0.
16:08:44: Thread 0: starting update
16:08:44: Thread 1: starting update
16:08:44: Thread 0: finishing update
16:08:44: Thread 1: finishing update
16:08:44: Testing update. Ending value is 2.

在主線程__main__中配置完日誌輸出後,將日誌級別設置爲DEBUG能夠打開完整的日誌:

logging.getLogger().setLevel(logging.DEBUG) 

用調試日誌運行程序的結果以下:

16:09:59: Testing update. Starting value is 0.
16:09:59: Thread 0: starting update
16:09:59: Thread 0 about to lock
16:09:59: Thread 1: starting update
16:09:59: Thread 0 has lock
16:09:59: Thread 1 about to lock
16:09:59: Thread 0 about to release lock
16:09:59: Thread 0 after release
16:09:59: Thread 1 has lock
16:09:59: Thread 0: finishing update
16:10:00: Thread 1 about to release lock
16:10:00: Thread 1 after release
16:10:00: Thread 1: finishing update
16:10:00: Testing update. Ending value is 2.

線程0得到鎖,而且在它進入睡眠狀態時仍然持有鎖。而後線程1啓動並嘗試獲取同一個鎖,由於線程0仍然持有它,線程1就必須等待。這就是互斥鎖。

本文其他部分的許多示例都有警告和調試級別的日誌記錄。咱們一般只顯示警告級別的輸出,由於調試日誌可能很是長。

7. 死鎖

在繼續學習以前,咱們先看一下使用鎖時會出現的常見問題。在上例中,若是鎖已經被某個線程獲取,那麼第二次調用.acquire()時將一直等待,直到持有鎖的線程調用.release()將鎖釋放。

思考一下,運行下面這段代碼會獲得什麼結果:

import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")

當程序第二次調用l.acquire()時,它須要等待鎖被釋放。在本例中,能夠刪除第二次調用修復死鎖,可是死鎖一般在如下兩種狀況下會發生:

① 鎖沒有被正確釋放時會產生運行錯誤;

② 在一個實用程序函數須要被其餘函數調用的地方會出現設計問題,這些函數可能已經擁有或者沒有鎖。

第一種狀況有時會發生,可是使用鎖做爲上下文管理器能夠大大減小這種狀況發生的頻率。建議充分利用上下文管理器來編寫代碼,由於它們有助於避免出現異常跳過.release()調用的狀況。

在某些語言中,設計問題可能有點棘手。慶幸的是,Python的線程模塊還提供了另外一個鎖對象RLock。它容許線程在調用.release()以前屢次獲取.acquire()鎖,且程序不會阻塞。該線程仍須要保證.release().acquire()的調用次數相同,但它是用了另外一種方式而已。

LockRLock是線程化編程中用來防止競爭條件的兩個基本工具,還有一些其餘的工具。在研究它們以前,咱們先轉移到一個稍微不一樣的領域。

8. 生產者-消費者模型中的線程

生產者-消費者模型是一個標準的計算機科學領域的問題,用於解決線程同步或進程同步。咱們先介紹一個它的變形,大體瞭解一下Python中的線程模塊提供了哪些基礎模塊。 

本例中,假設須要寫一個從網絡讀取消息並將其寫入磁盤的程序。該程序不會主動請求消息,它必須在消息傳入時偵聽並接受它們。並且這些消息不會以固定的速度傳入,而是以突發的方式傳入。這一部分程序叫作生產者。

另外一方面,一旦傳入了消息,就須要將其寫入數據庫。數據庫訪問很慢,但訪問速度足以跟上消息傳入的平均速度。但當大量消息同時傳入時,速度會跟不上。這部分程序叫消費者。

在生產者和消費者之間,須要建立一個管道Pipeline,隨着對不一樣同步對象的深刻了解,咱們須要對管道里面的內容進行修改。

這就是基本的框架。讓咱們看看使用Lock的解決方案。雖然它並非最佳的解決方法,但它運用的是前面已經介紹過的工具,因此比較容易理解。

8.1. 在生產者-消費者模型中使用鎖

既然這是一篇關於Python線程的文章,並且剛剛已經閱讀了有關鎖的內容,因此讓咱們嘗試用鎖解決競態條件問題。

先寫一個生產者線程,從虛擬網絡中讀取消息並放入管道中:

SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")

生產者得到一個介於1到100之間的隨機數,做爲生成的虛擬消息。它調用管道上的.set_message()方法將其發送給消費者。

生產者還用一個SENTINEL值來警告消費者,在它發送10個值以後中止。這有點奇怪,但沒必要擔憂,在完成本示例後,會介紹如何去掉這個SENTINEL值。

管道pipeline的另外一端是消費者:

def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)

消費者從管道中讀取一條消息並將其寫入虛擬數據庫,在本例中,只是將其儲存到磁盤中。若是消費者獲取了SENTINEL值,線程會終止。

在研究管道Pipeline以前,先看一下生成這些線程的主線程__main__部分:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

看起來應該很熟悉,由於它和前面示例中介紹過的__main__部分相似。

注意,打開調試日誌能夠查看全部的日誌消息,方法是取消對這一行的註釋:

# logging.getLogger().setLevel(logging.DEBUG)

咱們有必要遍歷調試日誌消息,來查看每一個線程是在何處得到和釋放鎖的。

如今讓咱們看一下將消息從生產者傳遞給消費者的管道Pipeline:

class Pipeline(object):
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)

好長一段代碼!別懼怕,大部分是日誌語句,刪除全部日誌語句後的代碼以下:

class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        self.consumer_lock.acquire()
        message = self.message
        self.producer_lock.release()
        return message

    def set_message(self, message, name):
        self.producer_lock.acquire()
        self.message = message
        self.consumer_lock.release()

這樣看起來更清晰,管道類中有三個成員:

 .message存儲要傳遞的消息;

 .producer_lock是一個線程鎖對象,限制生產者線程對消息的訪問;

.consumer_lock也是一個線程鎖,限制消費者線程對消息的訪問。

__init__() 初始化這三個成員,而後在.consumer_lock上調用.acquire(),消費者得到鎖。生產者能夠添加新消息,但消費者須要等待消息出現。

get_message().set_messages()幾乎是相反的操做。.get_message()consumer_lock上調用.acquire(),這麼作的目的是讓消費者等待,直到有消息傳入。

一旦消費者得到了鎖.consumer_lock,它會將self.message的值複製給.message,而後在.producer_lock上調用.release()。釋放此鎖容許生產者在管道中插入下一條消息。

.get_message()函數中有一些細節很容易被忽略。你們思考一下,爲何不把message變量刪掉,直接返回self.message的值呢?

答案以下。

只要消費者調用.producer_lock.release(),它就被交換出去,生產者開始運行,這可能發生在鎖被徹底釋放以前!也就是說,存在一種微小的可能性,當函數返回self.message時,這個值是生產者生成的下一條消息,致使第一條消息丟失。這是競態條件的另外一個例子。

咱們繼續看事務的另外一端:.set_message()。生產者經過傳入一條消息來調用該函數,得到鎖.producer_lock,傳入.message值,而後調用consumer_lock.release()釋放鎖,這將容許消費者讀取該值。

運行代碼,日誌設置爲警告級別,結果以下:

16:17:31: Producer got message: 3
16:17:31: Producer got message: 98
16:17:31: Consumer storing message: 3
16:17:31: Producer got message: 83
16:17:31: Consumer storing message: 98
16:17:31: Producer got message: 96
16:17:31: Consumer storing message: 83
16:17:31: Producer got message: 34
16:17:31: Consumer storing message: 96
16:17:31: Producer got message: 71
16:17:31: Consumer storing message: 34
16:17:31: Producer got message: 79
16:17:31: Consumer storing message: 71
16:17:31: Producer got message: 90
16:17:31: Consumer storing message: 79
16:17:31: Producer got message: 3
16:17:31: Consumer storing message: 90
16:17:31: Producer got message: 58
16:17:31: Consumer storing message: 3
16:17:31: Consumer storing message: 58

你們可能會以爲奇怪,生產者在消費者還沒運行以前就得到了兩條消息。回過頭仔細看一下生產者和.set_message()函數,生產者先獲取消息,打印出日誌語句,而後試圖將消息放入管道中,這時才須要等待鎖。

當生產者試圖傳入第二條消息時,它會第二次調用.set_message(),發生阻塞。

操做系統能夠在任什麼時候候交換線程,但它一般會容許每一個線程在交換以前有一段合理的運行時間。這就是爲何生產者會一直運行,直到第二次調用.set_message()時被阻塞。

一旦線程被阻塞,操做系統老是會把它交換出去,並找到另外一個線程去運行。在本例中,就是消費者線程。

消費者調用.get_message()函數,它讀取消息並在.producer_lock上調用.release()方法,釋放鎖,容許生產者再次運行。

注意,第一個值是43,正是消費者所讀取的值,雖然生產者已經生成了新值45。

儘管使用鎖的這種方法適用於本例,但對於常見的生產者-消費者模式問題,這不是一個很好的解決方法,由於它一次只容許管道中有一個值。當生產者收到大量值時,將無處安放。

讓咱們繼續看一個更好的解決方法:使用隊列Queue.

8.2. 在生產者-消費者模型中使用隊列

若是想在管道中一次處理多個值,咱們須要爲管道提供一個數據結構,當從生產者線程備份數據時,該結構容許管道中的數據量靈活變更,再也不是單一值。

Python標準庫中有一個模塊叫隊列queue,裏面有一個類叫Queue。讓咱們用隊列Queue改寫一下上面受鎖保護的管道。

此外,咱們還會介紹另外一種中止工做線程的方法,使用Python線程模塊中的事件Event對象。

事件的觸發機制能夠是多種多樣的。在本例中,主線程只是休眠一段時間,而後調用event.set()方法,通知全部處於等待阻塞狀態的線程恢復運行狀態:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

這裏唯一的變化是在第8行建立了事件對象event,在第10行和第11行傳遞了event參數,代碼的最後一個部分13-15行,先休眠0.1秒,記錄一條消息,而後在事件上調用.set()方法。

生產者也不用變太多:

def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
    message = random.randint(1, 101)
    logging.info("Producer got message: %s", message)
    pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")

在第3行循環部分設置了事件,並且也不用再把SENTINEL值放入管道中。

消費者的變化稍多:

def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting") 

除了須要刪掉和SENTINEL值相關的代碼,還要執行稍微複雜一點的循環條件。它會一直循環,直到事件結束,管道中的數據被清空。

必定要確保當消費者退出時,隊列是空的。若是消費者在管道包含消息時退出,可能會出現兩個問題。一是會丟失那部分數據,但更嚴重的是生產者會被鎖住。

在生產者檢查.is_set()條件後、但在調用pipeline.set_message()前觸發事件,則會發生這種狀況。

一旦發生這種狀況,生產者可能被喚醒並退出,但此時鎖仍被消費者持有。而後,生產者將嘗試用.acquire()方法獲取鎖,可是消費者已經退出,並且永遠不會釋放鎖,因此生產者就會一直等下去。

消費者的其他部分看起來應該很熟悉。

管道類的寫法變化最大:

Pipelinequeue.Queue的一個子類。Queue隊列裏面有一個可選參數,在初始化時指定隊列所能容納的最大數據量。

.get_message().set_message()變得更簡短,被隊列中的.get().put()方法替代。

你們可能想知道,防止競爭條件的代碼都跑哪裏去了?

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)

編寫標準庫的核心開發人員知道,在多線程環境中常用隊列Queue,所以將全部鎖定代碼合併到了隊列Queue模塊內部。隊列Queue自己就是線程安全的。

程序運行結果以下:

16:24:24: Producer got message: 96
16:24:24: Producer got message: 90
16:24:24: Consumer storing message: 96  (queue size=0)
16:24:24: Producer got message: 92
16:24:24: Consumer storing message: 90  (queue size=0)
16:24:24: Producer got message: 40
16:24:24: Consumer storing message: 92  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 40  (queue size=0)
16:24:24: Producer got message: 19
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 29
16:24:24: Consumer storing message: 19  (queue size=0)
16:24:24: Producer got message: 43
16:24:24: Consumer storing message: 29  (queue size=0)
16:24:24: Producer got message: 76
16:24:24: Consumer storing message: 43  (queue size=0)
16:24:24: Producer got message: 4
16:24:24: Consumer storing message: 76  (queue size=0)
16:24:24: Producer got message: 38
16:24:24: Consumer storing message: 4  (queue size=0)
16:24:24: Producer got message: 83
16:24:24: Consumer storing message: 38  (queue size=0)
16:24:24: Producer got message: 38
16:24:24: Consumer storing message: 83  (queue size=0)
16:24:24: Producer got message: 54
16:24:24: Consumer storing message: 38  (queue size=0)
16:24:24: Producer got message: 80
16:24:24: Consumer storing message: 54  (queue size=0)
16:24:24: Producer got message: 94
16:24:24: Consumer storing message: 80  (queue size=0)
16:24:24: Producer got message: 11
16:24:24: Consumer storing message: 94  (queue size=0)
16:24:24: Producer got message: 98
16:24:24: Consumer storing message: 11  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 98  (queue size=0)
16:24:24: Producer got message: 31
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 78
16:24:24: Consumer storing message: 31  (queue size=0)
16:24:24: Producer got message: 84
16:24:24: Consumer storing message: 78  (queue size=0)
16:24:24: Producer got message: 47
16:24:24: Consumer storing message: 84  (queue size=0)
16:24:24: Producer got message: 60
16:24:24: Consumer storing message: 47  (queue size=0)
16:24:24: Producer got message: 29
16:24:24: Consumer storing message: 60  (queue size=0)
16:24:24: Producer got message: 59
16:24:24: Consumer storing message: 29  (queue size=0)
16:24:24: Producer got message: 19
16:24:24: Consumer storing message: 59  (queue size=0)
16:24:24: Producer got message: 97
16:24:24: Consumer storing message: 19  (queue size=0)
16:24:24: Producer got message: 37
16:24:24: Consumer storing message: 97  (queue size=0)
16:24:24: Producer got message: 39
16:24:24: Consumer storing message: 37  (queue size=0)
16:24:24: Producer got message: 78
16:24:24: Consumer storing message: 39  (queue size=0)
16:24:24: Producer got message: 63
16:24:24: Consumer storing message: 78  (queue size=0)
16:24:24: Producer got message: 51
16:24:24: Consumer storing message: 63  (queue size=0)
16:24:24: Producer got message: 37
16:24:24: Consumer storing message: 51  (queue size=0)
16:24:24: Producer got message: 34
16:24:24: Consumer storing message: 37  (queue size=0)
16:24:24: Producer got message: 46
16:24:24: Consumer storing message: 34  (queue size=0)
16:24:24: Producer got message: 33
16:24:24: Consumer storing message: 46  (queue size=0)
16:24:24: Producer got message: 32
16:24:24: Consumer storing message: 33  (queue size=0)
16:24:24: Producer got message: 39
16:24:24: Consumer storing message: 32  (queue size=0)
16:24:24: Producer got message: 18
16:24:24: Consumer storing message: 39  (queue size=0)
16:24:24: Producer got message: 68
16:24:24: Consumer storing message: 18  (queue size=0)
16:24:24: Producer got message: 28
16:24:24: Consumer storing message: 68  (queue size=0)
16:24:24: Producer got message: 32
16:24:24: Consumer storing message: 28  (queue size=0)
16:24:24: Producer got message: 35
16:24:24: Consumer storing message: 32  (queue size=0)
16:24:24: Producer got message: 20
16:24:24: Consumer storing message: 35  (queue size=0)
16:24:24: Producer got message: 100
16:24:24: Consumer storing message: 20  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 100  (queue size=0)
16:24:24: Producer got message: 84
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 87
16:24:24: Consumer storing message: 84  (queue size=0)
16:24:24: Producer got message: 90
16:24:24: Consumer storing message: 87  (queue size=0)
16:24:24: Producer got message: 65
16:24:24: Consumer storing message: 90  (queue size=0)
16:24:24: Producer got message: 29
16:24:24: Consumer storing message: 65  (queue size=0)
16:24:24: Producer got message: 91
16:24:24: Consumer storing message: 29  (queue size=0)
16:24:24: Producer got message: 71
16:24:24: Consumer storing message: 91  (queue size=0)
16:24:24: Producer got message: 10
16:24:24: Consumer storing message: 71  (queue size=0)
16:24:24: Producer got message: 9
16:24:24: Consumer storing message: 10  (queue size=0)
16:24:24: Producer got message: 44
16:24:24: Consumer storing message: 9  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 44  (queue size=0)
16:24:24: Producer got message: 28
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 69
16:24:24: Consumer storing message: 28  (queue size=0)
16:24:24: Producer got message: 83
16:24:24: Consumer storing message: 69  (queue size=0)
16:24:24: Producer got message: 81
16:24:24: Consumer storing message: 83  (queue size=0)
16:24:24: Producer got message: 65
16:24:24: Consumer storing message: 81  (queue size=0)
16:24:24: Producer got message: 26
16:24:24: Consumer storing message: 65  (queue size=0)
16:24:24: Producer got message: 74
16:24:24: Consumer storing message: 26  (queue size=0)
16:24:24: Producer got message: 33
16:24:24: Consumer storing message: 74  (queue size=0)
16:24:24: Producer got message: 89
16:24:24: Consumer storing message: 33  (queue size=0)
16:24:24: Producer got message: 27
16:24:24: Consumer storing message: 89  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 27  (queue size=0)
16:24:24: Producer got message: 75
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 74
16:24:24: Consumer storing message: 75  (queue size=0)
16:24:24: Producer got message: 79
16:24:24: Consumer storing message: 74  (queue size=0)
16:24:24: Producer got message: 66
16:24:24: Consumer storing message: 79  (queue size=0)
16:24:24: Producer got message: 87
16:24:24: Consumer storing message: 66  (queue size=0)
16:24:24: Producer got message: 47
16:24:24: Consumer storing message: 87  (queue size=0)
16:24:24: Producer got message: 13
16:24:24: Consumer storing message: 47  (queue size=0)
16:24:24: Producer got message: 9
16:24:24: Consumer storing message: 13  (queue size=0)
16:24:24: Producer got message: 62
16:24:24: Consumer storing message: 9  (queue size=0)
16:24:24: Producer got message: 6
16:24:24: Consumer storing message: 62  (queue size=0)
16:24:24: Producer got message: 70
16:24:24: Consumer storing message: 6  (queue size=0)
16:24:24: Producer got message: 18
16:24:24: Consumer storing message: 70  (queue size=0)
16:24:24: Producer got message: 44
16:24:24: Consumer storing message: 18  (queue size=0)
16:24:24: Producer got message: 14
16:24:24: Consumer storing message: 44  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 14  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 28
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 86
16:24:24: Consumer storing message: 28  (queue size=0)
16:24:24: Producer got message: 55
16:24:24: Consumer storing message: 86  (queue size=0)
16:24:24: Producer got message: 75
16:24:24: Consumer storing message: 55  (queue size=0)
16:24:24: Producer got message: 78
16:24:24: Consumer storing message: 75  (queue size=0)
16:24:24: Producer got message: 72
16:24:24: Consumer storing message: 78  (queue size=0)
16:24:24: Producer got message: 36
16:24:24: Consumer storing message: 72  (queue size=0)
16:24:24: Producer got message: 45
16:24:24: Consumer storing message: 36  (queue size=0)
16:24:24: Producer got message: 59
16:24:24: Consumer storing message: 45  (queue size=0)
16:24:24: Producer got message: 66
16:24:24: Consumer storing message: 59  (queue size=0)
16:24:24: Producer got message: 67
16:24:24: Consumer storing message: 66  (queue size=0)
16:24:24: Producer got message: 70
16:24:24: Consumer storing message: 67  (queue size=0)
16:24:24: Producer got message: 41
16:24:24: Consumer storing message: 70  (queue size=0)
16:24:24: Producer got message: 91
16:24:24: Consumer storing message: 41  (queue size=0)
16:24:24: Producer got message: 85
16:24:24: Consumer storing message: 91  (queue size=0)
16:24:24: Producer got message: 59
16:24:24: Consumer storing message: 85  (queue size=0)
16:24:24: Producer got message: 46
16:24:24: Consumer storing message: 59  (queue size=0)
16:24:24: Producer got message: 14
16:24:24: Consumer storing message: 46  (queue size=0)
16:24:24: Producer got message: 9
16:24:24: Consumer storing message: 14  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 9  (queue size=0)
16:24:24: Producer got message: 16
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 82
16:24:24: Consumer storing message: 16  (queue size=0)
16:24:24: Producer got message: 42
16:24:24: Consumer storing message: 82  (queue size=0)
16:24:24: Producer got message: 7
16:24:24: Consumer storing message: 42  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 7  (queue size=0)
16:24:24: Producer got message: 84
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 2
16:24:24: Consumer storing message: 84  (queue size=0)

16:24:24: Producer got message: 84
16:24:24: Consumer storing message: 48  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 84  (queue size=0)
16:24:24: Producer got message: 22
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 39
16:24:24: Consumer storing message: 22  (queue size=0)
16:24:24: Producer got message: 52
16:24:24: Consumer storing message: 39  (queue size=0)
16:24:24: Main: about to set event
16:24:24: Producer got message: 98
16:24:24: Consumer storing message: 52  (queue size=0)
16:24:24: Producer received EXIT event. Exiting
16:24:24: Consumer storing message: 98  (queue size=0)
16:24:24: Consumer received EXIT event. Exiting

生產者建立了5條消息,並將其中4條放到隊列中。但在放置第5條消息以前,它被操做系統交換出去了。

而後消費者開始運行並儲存第1條消息,打印出該消息和隊列大小:

Consumer storing message: 32 (queue size=3)

這就是爲何第5條消息沒有成功進入管道。刪除一條消息後,隊列的大小縮減到3個。由於隊列最多能夠容納10條消息,因此生產者線程沒有被隊列阻塞,而是被操做系統交換出去了。

注意:每次運行所獲得的結果會不一樣。這就是使用線程的樂趣所在!

當程序開始結束時,主線程觸發事件,生產者當即退出。但消費者仍有不少工做要作,因此它會繼續運行,直到清理完管道中的數據爲止。

嘗試修改生產者或消費者中的隊列大小和time.sleep()中的休眠時間,來分別模擬更長的網絡或磁盤訪問時間。即便是輕微的更改,也會對結果產生很大的影響。

對於生產者-消費者模型,這是一個更好的解決方法,但其實能夠進一步簡化。去掉管道Pipeline和日誌語句,就只剩下和queue.Queue相關的語句了。

直接使用queue.Queue的最終代碼以下:

import concurrent.futures
import logging
import random
import threading
import time
import queue


class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)


def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")


def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

能夠看到,使用Python的內置基礎模塊可以簡化複雜的問題,讓代碼閱讀起來更清晰。

Lock和隊列Queue是解決併發問題很是方便的兩個類,但其實標準庫還提供了其餘類。在結束本教程以前,讓咱們快速瀏覽一下還有哪些類。

9. 線程對象

Python的線程threading模塊還有其餘一些基本類型。雖然在上面的例子中沒有用到,但它們會在不一樣的狀況下派上用場,因此熟悉一下仍是很好處的。

9.1 信號量

首先要介紹的是信號量thread.semaphore,信號量是具備一些特殊屬性的計數器。

第一個屬性是計數的原子性,能夠保證操做系統不會在計數器遞增或遞減的過程當中交換線程。

內部計數器在調用.release()時遞增,在調用.acquire()時遞減。

另外一個特殊屬性是,若是線程在計數器爲0時調用.acquire(),那麼該線程將阻塞,直到另外一個線程調用.release()並將計數器的值增長到1。

信號量一般用於保護容量有限的資源。例如,咱們有一個鏈接池,而且但願限制該鏈接池中的元素數量,就能夠用信號量來進行管理。

9.2 定時器

threading.Timer是一個定時器功能的類,指定函數在間隔特定時間後執行任務。咱們能夠經過傳入須要等待的時間和函數來建立一個定時器:

t = threading.Timer(30.0, my_function)

調用.start()啓動定時器,函數將在指定時間事後的某個時間點上被新線程調用。但請注意,這裏並不能保證函數會在咱們所指望的確切時間被調用,可能會存在偏差。  

若是想要中止已經啓動的定時器,能夠調用.cancel()。在定時器觸發後調用.cancel()不會執行任何操做,也不會產生異常。

定時器可用於在特定時間以後提示用戶執行操做。若是用戶在定時器過期以前執行了操做,能夠調用.cancel()取消定時。

9.3 柵欄

threading模塊中的柵欄Barrier能夠用來指定須要同步運行的線程數量。建立柵欄Barrier時,咱們必須指定所需同步的線程數。每一個線程都會在Barrier上調用.wait()方法,它們會先保持阻塞狀態,直到等待的線程數量達到指定值時,會被同時釋放。

注意,線程是由操做系統調度的,所以,即便全部線程同時被釋放,一次也只能運行一個線程。

柵欄能夠用來初始化一個線程池。讓線程初始化後在柵欄裏等待,能夠確保程序在全部線程都完成初始化後再開始運行。

相關文章
相關標籤/搜索