【譯】線程:概念和實現(2)

翻譯:老齊python

譯者注:與本文相關圖書推薦:《Python大學實用教程》《跟老齊學Python:輕鬆入門》數據庫


第二部分

競態條件

在討論Python線程的其餘特性以前,讓咱們先討論一下編寫線程程序時遇到的一個更困難的問題:競態條件。安全

一旦你瞭解了什麼是競態條件,並看到了正在發生的狀況,而後就使用標準庫提供的模塊,以防止這些競態條件的出現。bash

當兩個或多個線程訪問共享數據或資源時,可能會出現競態狀況。在本例中,你將建立一個每次都發生的大型競態條件,但請注意,大多數它並非很明顯。示例中的狀況一般不多發生,並且會產生使人困惑的結果。能夠想象,由於競態條件而引發的bug很難被發現。微信

幸運的是,在下述示例中競態問題每次都會發生,你將詳細地瞭解它以便解釋發生了什麼。函數

對於本例,將編寫一個更新數據庫的類。你不會真的有一個數據庫:你只是要僞造它,由於這不是本文的重點。工具

FakeDatabase類中有.__init__().update()方法:post

class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)
複製代碼

FakeDatabase中的屬性.value,用於做爲競態條件中共享的數據。ui

.__init__()中將.value值初始化爲0.,到目前爲止,一切正常。spa

.update() 看起來有點奇怪,它模擬從數據庫中讀取一個值,對其進行一些計算,而後將一個新值寫回數據庫。

所謂從數據庫中讀取,即將.value的值複製到本地變量。計算就是在原值上加1,而後.sleep() 一小會兒。最後,它經過將本地值複製回.value,將值寫回去。

下面是FakeDatabase的使用方法:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)
複製代碼

程序中建立了兩個ThreadPoolExecutor,而後對每一個線程調用.submit(),告訴它們運行database.update()

.submit()有一個明顯特徵,它容許將位置參數和命名參數傳給線程中運行的函數:

.submit(function, *args, **kwargs)
複製代碼

在上面的用法中,index做爲第一個也是惟一一個位置參數傳給database.update()。你將在本文後面看到,能夠用相似的方式傳多個參數。

因爲每一個線程都運行.update(),而.update()會讓.value的值加1,所以在最後打印時,你可能會但願database.value爲2。但若是是這樣的話,你就不會看這個例子了。若是運行上述代碼,則輸出以下:

$ ./racecond.py
Testing unlocked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.
複製代碼

你可能已經預料到這種狀況會發生,可是讓咱們來看看實際狀況的細節,由於這將使這個問題的解決方案更容易理解。

單線程

在用兩個線程深刻討論這個問題以前,讓咱們先退一步,談談線程工做流程的一些細節。

咱們不會在這裏深刻討論全部的細節,由於這種全面深刻的討論如今並不重要。咱們還將簡化一些事情,這種作法雖然在技術上並不許確,但會讓你對正在發生的事情有正確的認識。

當你告訴ThreadPoolExecutor運行每一個線程時,也就是告訴它要運行哪一個函數以及要傳給它的參數:executor.submit(database.update, index)

其結果是線程池中的每一個線程都將調用database.update(index)。注意,database__main__中建立的FakeDatabase實例對象,調用它的方法.update()

每一個線程都將引用同一個FakeDatabase的實例database,每一個線程還將有一個惟一的值index。爲了讓上述過程更容易理解,請看下圖:

當某線程開始運行.update()時,它有此方法的本地的數據,即.update()中的local_copy。這絕對是件好事,不然,在兩個線程中運行同一個函數就會互相干擾了。這意味着該函數的全部做用域(或本地)變量對於線程來講都是安全的。

如今,你已經理解,若是使用單個線程和對.update()的單個調用來運行上面的程序會發生什麼狀況。

若是隻運行一個線程,以下圖所示,會一步一步地執行.update()。下圖中,語句顯示在上面,下面用圖示方式演示了線程中的local_value和共享的database.value 中的值的變化:

按照時間順序,從上到下觀察上面的示意圖,從建立線程Thread 1開始,到Thread 1結束終止。

Thread 1啓動時,FakeDatabase.value爲零。方法中的第一行代碼local_copy=self.value將0複製到局部變量。接下來,使用local_copy+=1語句增長local_copy的值。你能夠看到Thread 1中的.value值爲1。

而後,調用下一個time.sleep(),這將使當前線程暫停並容許其餘線程運行。由於在這個例子中只有一個線程,因此這沒有影響。

Thread 1喚醒並繼續時,它將新值從local_copy複製到FakeDatabase.value,而後線程完成。你能夠看到database.value爲1。

到目前爲止,一切正常。你只運行了一次.update()而且將FakeDatabase.value遞增爲1。

兩個線程

回到競態條件,兩個線程並行,但不是同時運行。每一個線程都有本身的local_copy,並指向相同的database,正是這個共享數據庫對象致使了這些問題。

程序仍是從Thread 1執行.update()開始:

Thread 1調用time.sleep()時,它容許另外一個線程開始運行。這就是事情變得有趣的地方。

Thread 2啓動並執行相同的操做。它也將database.value複製到其私有的local_copy,而此時共享的database.value還沒有更新:

Thread 1進入睡眠狀態時,共享的database.value仍然未被修改,仍是0,而此時的local_copy的兩個私有版本的值都爲1。

Thread 1如今醒來並保存其local_copy的值,而後線程終止,給Thread 2機會。Thread 2不知道在它睡眠時Thread 1運行並更新了database.value的值。Thread 2也將它的local_copy值存儲到database.value中,並將其設置爲1:

這兩個線程交替訪問一個共享對象,覆蓋彼此的結果。當一個線程釋放內存或在另外一個線程完成訪問以前關閉文件句柄時,可能會出現相似的競態。

爲何這不是一個愚蠢的示例

上面的例子是刻意而爲,目的是確保每次運行程序時都會發生競態。由於操做系統能夠在任什麼時候候交換線程,因此在讀取x的值以後,而且在寫回遞增的值以前,能夠中斷相似x=x+1的語句。

發生這種狀況的緣由細節很是有趣,但這篇文章的其他部分並不須要這些細節,因此能夠跳過這個隱藏的部分。

既然你已經看到了運行過程當中的競態條件,讓咱們找出解決問題的方法!

使用鎖實現同步

有不少方法能夠避免或解決競態。你不會在這裏看到全部這些方法,可是有一些方法是常用的。讓咱們從Lock開始。

要解決上述競態條件,須要找到一種方法,使得在代碼的「讀-修改-寫」操做中一次只容許一個線程。最多見的方法是使用Python中名爲Lock的方法。在其餘的一些語言中,相似的被稱爲MutexMutex源於MUTual EXclusion,這正是Lock的做用。

Lock像是通行證,一次只能有一個線程擁有Lock,任何其餘想要Lock的線程都必須等到Lock的全部者放棄它。

執行此操做的基本函數是.acquire().release()。線程將調用my_lock.acquire()來獲取本身的鎖。若是鎖已經被其餘線程全部,則將等待它被釋放。這裏有一點很重要,若是一個線程獲得了鎖,但還沒有返回,你的程序將被卡住。你稍後會讀到更多關於這方面的內容。

幸運的是,Python的Lock也將做爲上下文管理器運行,所以你能夠在一個帶有with的語句中使用它,而且當with代碼塊因爲任何緣由退出時,鎖也會自動釋放。

讓咱們看看添加了鎖的FakeDatabase,其所調用函數保持不變:

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)
複製代碼

除了添加一堆調試日誌以便更清楚地看到鎖操做以外,這裏的大變化是添加一個名爲._lock的屬性,它是一個threading.Lock()實例對象。這個._lock在未鎖定狀態下初始化,並由with語句鎖定和釋放。

這裏值得注意的是,運行此方法的線程將一直保持Lock,直到徹底完成對數據庫的更新。在這種狀況下,這意味着函數將在複製、更新、休眠時保持鎖定,而後將值寫回數據庫。

若是在日誌記錄設置爲警告級別的狀況下運行此版本,你將看到如下內容:

$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing locked update. Ending value is 2.
複製代碼

看看這個。你的程序終於成功了!

__main__中配置日誌輸出後,能夠經過添加如下語句將級別設置爲DEBUG來打開完整日誌記錄:

logging.getLogger().setLevel(logging.DEBUG)
複製代碼

在啓用DEBUG後,運行此程序,以下所示:

$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 0 about to lock
Thread 0 has lock
Thread 1: starting update
Thread 1 about to lock
Thread 0 about to release lock
Thread 0 after release
Thread 0: finishing update
Thread 1 has lock
Thread 1 about to release lock
Thread 1 after release
Thread 1: finishing update
Testing locked update. Ending value is 2.
複製代碼

在輸出中,你能夠看到Thread 0獲得了鎖,並在進入睡眠狀態時仍保持鎖定。而後Thread 1啓動並嘗試獲取相同的鎖。由於Thread 0仍在持有鎖,Thread 1必須等待。這就是Lock的互斥性。

本文其他部分中的許多示例將日誌設置爲WARNINGDEBUG級別。咱們一般只是DEBUG級別的輸出,由於DEBUG日誌可能很是長。在日誌記錄打開的狀況下嘗試這些程序,看看它們能作什麼。

死鎖

在繼續探索以前,應該先看看使用鎖時的一個常見問題。如你所見,若是已經獲取了Lock,則對.acquire()的二次調用將等到持有Lock的線程調用.release()。運行此代碼時,你認爲會發生什麼狀況?

import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")
複製代碼

當程序第二次調用l.acquire()時,該函數將掛起,等待Lock的釋放。在本例中,能夠經過刪除第二次調用來修復死鎖,但死鎖一般發生在如下兩個微妙的事情之一:

  1. 未正確釋放Lock的錯誤。
  2. 設計問題,其中一個函數須要由某些函數調用,這些函數可能具備或可能不具備Lock

第一種狀況有時會發生,但使用Lock做爲上下文管理器會大大減小錯誤出現的頻率。建議儘量使用上下文管理器編寫代碼,由於它們有助於避免異常跳過.release()調用的狀況。

在某些語言中,設計問題可能要複雜一些。值得慶幸的是,Python線程的又一個對象RLock就是爲這種狀況而設計的。它容許線程在調用.release()以前屢次經過.acquire()實現RLock。該線程中調用.release()的次數與調用.acquire()的次數相同。

LockRLock是線程中用來防止競態條件的兩個基本工具,還有一些其餘工具以不一樣的方式發揮做用。在你查看它們以前,讓咱們轉到一個稍微不一樣的問題上。

未完待續

原文連接:realpython.com/intro-to-py…

關注微信公衆號:老齊教室。讀深度文章,得精湛技藝,享絢麗人生。

相關文章
相關標籤/搜索