1. 多線程編程與線程安全相關重要概念html
在個人上篇博文 聊聊Python中的GIL 中,咱們熟悉了幾個特別重要的概念:GIL,線程,進程, 線程安全,原子操做。python
如下是簡單回顧,詳細介紹請直接看聊聊Python中的GIL 編程
還有一個重要的結論:當對全局資源存在寫操做時,若是不能保證寫入過程的原子性,會出現髒讀髒寫的狀況,即線程不安全。Python的GIL只能保證原子操做的線程安全,所以在多線程編程時咱們須要經過加鎖來保證線程安全。安全
最簡單的鎖是互斥鎖(同步鎖),互斥鎖是用來解決io密集型場景產生的計算錯誤,即目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據。多線程
下面咱們會來介紹如何使用互斥鎖。併發
2. Threading.Lock實現互斥鎖的簡單示例app
咱們經過Threading.Lock()來實現鎖。ide
如下是線程不安全的例子:函數
>>> import threading >>> import time >>> def sub1(): global count tmp = count time.sleep(0.001) count = tmp + 1 time.sleep(2) >>> count = 0 >>> def verify(sub): global count thread_list = [] for i in range(100): t = threading.Thread(target=sub,args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count) >>> verify(sub1) 14
在這個例子中,咱們把post
count+=1
代替爲
tmp = count time.sleep(0.001) count = tmp + 1
是由於,儘管count+=1是非原子操做,可是由於CPU執行的太快了,比較難以復現出多進程的非原子操做致使的進程不安全。通過代替以後,儘管只sleep了0.001秒,可是對於CPU的時間來講是很是長的,會致使這個代碼塊執行到一半,GIL鎖就釋放了。即tmp已經獲取到count的值了,可是尚未將tmp + 1賦值給count。而此時其餘線程若是執行完了count = tmp + 1, 當返回到原來的線程執行時,儘管count的值已經更新了,可是count = tmp + 1是個賦值操做,賦值的結果跟count的更新的值是同樣的。最終致使了咱們累加的值有不少丟失。
下面是線程安全的例子,咱們能夠用threading.Lock()得到鎖
>>> count = 0 >>> def sub2(): global count if lock.acquire(1):
#acquire()是獲取鎖,acquire(1)返回獲取鎖的結果,成功獲取到互斥鎖爲True,若是沒有獲取到互斥鎖則返回False tmp = count time.sleep(0.001) count = tmp + 1 time.sleep(2) lock.release() 一系列操做結束以後須要釋放鎖 >>> def verify(sub): global count thread_list = [] for i in range(100): t = threading.Thread(target=sub,args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count) >>> verify(sub2) 100
獲取鎖和釋放鎖的語句也能夠用Python的with來實現,這樣更簡潔。
>>> count = 0 >>> def sub3(): global count with lock: tmp = count time.sleep(0.001) count = tmp + 1 time.sleep(2) >>> def verify(sub): global count thread_list = [] for i in range(100): t = threading.Thread(target=sub,args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count) >>> verify(sub3) 100
3. 兩種死鎖狀況及處理
死鎖產生的緣由
兩種死鎖:
3.1 迭代死鎖與遞歸鎖(RLock)
該狀況是一個線程「迭代」請求同一個資源,直接就會形成死鎖。這種死鎖產生的緣由是咱們標準互斥鎖threading.Lock的缺點致使的。標準的鎖對象(threading.Lock)並不關心當前是哪一個線程佔有了該鎖;若是該鎖已經被佔有了,那麼任何其它嘗試獲取該鎖的線程都會被阻塞,包括已經佔有該鎖的線程也會被阻塞。
下面是例子,
#/usr/bin/python3 # -*- coding: utf-8 -*- import threading import time count_list = [0,0] lock = threading.Lock() def change_0(): global count_list with lock: tmp = count_list[0] time.sleep(0.001) count_list[0] = tmp + 1 time.sleep(2) print("Done. count_list[0]:%s" % count_list[0]) def change_1(): global count_list with lock: tmp = count_list[1] time.sleep(0.001) count_list[1] = tmp + 1 time.sleep(2) print("Done. count_list[1]:%s" % count_list[1]) def change(): with lock: change_0()
time.sleep(0.001) change_1() def verify(sub): global count_list thread_list = [] for i in range(100): t = threading.Thread(target=sub, args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count_list) if __name__ == "__main__": verify(change)
示例中,咱們有一個共享資源count_list,有兩個分別取這個共享資源第一部分和第二部分的數字(count_list[0]和count_list[1])。兩個訪問函數都使用了鎖來確保在獲取數據時沒有其它線程修改對應的共享數據。
如今,若是咱們思考如何添加第三個函數來獲取兩個部分的數據。一個簡單的方法是依次調用這兩個函數,而後返回結合的結果。
這裏的問題是,若有某個線程在兩個函數調用之間修改了共享資源,那麼咱們最終會獲得不一致的數據。
最明顯的解決方法是在這個函數中也使用lock。然而,這是不可行的。裏面的兩個訪問函數將會阻塞,由於外層語句已經佔有了該鎖。
結果是沒有任何輸出,死鎖。
爲了解決這個問題,咱們能夠用threading.RLock代替threading.Lock
#/usr/bin/python3 # -*- coding: utf-8 -*- import threading import time count_list = [0,0] lock = threading.RLock() def change_0(): global count_list with lock: tmp = count_list[0] time.sleep(0.001) count_list[0] = tmp + 1 time.sleep(2) print("Done. count_list[0]:%s" % count_list[0]) def change_1(): global count_list with lock: tmp = count_list[1] time.sleep(0.001) count_list[1] = tmp + 1 time.sleep(2) print("Done. count_list[1]:%s" % count_list[1]) def change(): with lock: change_0()
time.sleep(0.001) change_1() def verify(sub): global count_list thread_list = [] for i in range(100): t = threading.Thread(target=sub, args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count_list) if __name__ == "__main__": verify(change)
3.2 互相等待死鎖與鎖的升序使用
死鎖的另一個緣由是兩個進程想要得到的鎖已經被對方進程得到,只能互相等待又沒法釋放已經得到的鎖,而致使死鎖。假設銀行系統中,用戶a試圖轉帳100塊給用戶b,與此同時用戶b試圖轉帳500塊給用戶a,則可能產生死鎖。
2個線程互相等待對方的鎖,互相佔用着資源不釋放。
下面是一個互相調用致使死鎖的例子:
#/usr/bin/python3 # -*- coding: utf-8 -*- import threading import time class Account(object): def __init__(self, name, balance, lock): self.name = name self.balance = balance self.lock = lock def withdraw(self, amount): self.balance -= amount def deposit(self, amount): self.balance += amount def transfer(from_account, to_account, amount): with from_account.lock: from_account.withdraw(amount) time.sleep(1) print("trying to get %s's lock..." % to_account.name) with to_account.lock: to_account_deposit(amount) print("transfer finish") if __name__ == "__main__": a = Account('a',1000, threading.Lock()) b = Account('b',1000, threading.Lock()) thread_list = [] thread_list.append(threading.Thread(target = transfer, args=(a,b,100))) thread_list.append(threading.Thread(target = transfer, args=(b,a,500))) for i in thread_list: i.start() for j in thread_list: j.join()
最終的結果是死鎖:
trying to get account a's lock... trying to get account b's lock...
即咱們的問題是:
你正在寫一個多線程程序,其中線程須要一次獲取多個鎖,此時如何避免死鎖問題。
解決方案:
在多線程程序中,死鎖問題很大一部分是因爲線程同時獲取多個鎖形成的。舉個例子:一個線程獲取了第一個鎖,而後在獲取第二個鎖的 時候發生阻塞,那麼這個線程就可能阻塞其餘線程的執行,從而致使整個程序假死。 其實解決這個問題,核心思想也特別簡單:目前咱們遇到的問題是兩個線程想獲取到的鎖,都被對方線程拿到了,那麼咱們只須要保證在這兩個線程中,獲取鎖的順序保持一致就能夠了。舉個例子,咱們有線程thread_a, thread_b, 鎖lock_1, lock_2。只要咱們規定好了鎖的使用順序,好比先用lock_1,再用lock_2,當線程thread_a得到lock_1時,其餘線程如thread_b就沒法得到lock_1這個鎖,也就沒法進行下一步操做(得到lock_2這個鎖),也就不會致使互相等待致使的死鎖。簡言之,解決死鎖問題的一種方案是爲程序中的每個鎖分配一個惟一的id,而後只容許按照升序規則來使用多個鎖,這個規則使用上下文管理器 是很是容易實現的,示例以下:
#/usr/bin/python3 # -*- coding: utf-8 -*- import threading import time from contextlib import contextmanager thread_local = threading.local() @contextmanager def acquire(*locks): #sort locks by object identifier locks = sorted(locks, key=lambda x: id(x)) #make sure lock order of previously acquired locks is not violated acquired = getattr(thread_local,'acquired',[]) if acquired and (max(id(lock) for lock in acquired) >= id(locks[0])): raise RuntimeError('Lock Order Violation') # Acquire all the locks acquired.extend(locks) thread_local.acquired = acquired try: for lock in locks: lock.acquire() yield finally: for lock in reversed(locks): lock.release() del acquired[-len(locks):] class Account(object): def __init__(self, name, balance, lock): self.name = name self.balance = balance self.lock = lock def withdraw(self, amount): self.balance -= amount def deposit(self, amount): self.balance += amount def transfer(from_account, to_account, amount): print("%s transfer..." % amount) with acquire(from_account.lock, to_account.lock): from_account.withdraw(amount) time.sleep(1) to_account.deposit(amount) print("%s transfer... %s:%s ,%s: %s" % (amount,from_account.name,from_account.balance,to_account.name, to_account.balance)) print("transfer finish") if __name__ == "__main__": a = Account('a',1000, threading.Lock()) b = Account('b',1000, threading.Lock()) thread_list = [] thread_list.append(threading.Thread(target = transfer, args=(a,b,100))) thread_list.append(threading.Thread(target = transfer, args=(b,a,500))) for i in thread_list: i.start() for j in thread_list: j.join()
咱們得到的結果是
100 transfer... 500 transfer... 100 transfer... a:900 ,b:1100 transfer finish 500 transfer... b:600, a:1400 transfer finish
成功的避免了互相等待致使的死鎖問題。
在上述代碼中,有幾點語法須要解釋:
今天咱們主要討論了Python多線程中如何保證線程安全,互斥鎖的使用方法。另外着重討論了兩種致使死鎖的狀況:迭代死鎖與互相等待死鎖,以及這兩種死鎖的解決方案:遞歸鎖(RLock)的使用和鎖的升序使用。
對於多線程編程,咱們將在下一篇文章討論線程同步(Event)問題,以及對Python多線程模塊(threading)進行總結。
參考文獻:
1. 深刻理解 GIL:如何寫出高性能及線程安全的 Python 代碼 http://python.jobbole.com/87743/
2. Python中的原子操做 https://www.jianshu.com/p/42060299c581
3. 詳解python中的Lock與RLock https://blog.csdn.net/ybdesire/article/details/80294638
4. 深刻解析Python中的線程同步方法 https://www.jb51.net/article/86599.htm
5. Python中死鎖的造成示例及死鎖狀況的防止 https://www.jb51.net/article/86617.htm
6. 舉例講解 Python 中的死鎖、可重入鎖和互斥鎖 http://python.jobbole.com/82723/
9. Python3入門之線程threading經常使用方法
10. 淺談 Python 的 with 語句 https://www.ibm.com/developerworks/cn/opensource/os-cn-pythonwith/