PyTips 0x 12 - Python 線程與協程(1)

項目地址:https://git.io/pytipshtml

要說到線程(Thread)與協程(Coroutine)彷佛老是須要從並行(Parallelism)與併發(Concurrency)談起,關於並行與併發的問題,Rob Pike 用 Golang 小地鼠燒書的例子給出了很是生動形象的說明。簡單來講並行就是咱們現實世界運行的樣子,每一個人都是獨立的執行單元,各自完成本身的任務,這對應着計算機中的分佈式(多臺計算機)或多核(多個CPU)運做模式;而對於併發,我看到最生動的解釋來自Quora 上 Jan Christian Meyer 回答的這張圖python

concurrency

併發對應計算機中充分利用單核(一個CPU)實現(看起來)多個任務同時執行。咱們在這裏將要討論的 Python 中的線程與協程僅是基於單核的併發實現,隨便去網上搜一搜(Thread vs Coroutine)能夠找到一大批關於它們性能的爭論、benchmark,此次話題的目的不在於討論誰好誰壞,套用一句很是套路的話來講,拋開應用場景爭好壞都是耍流氓。固然在硬件支持的條件下(多核)也能夠利用線程和協程實現並行計算,並且 Python 2.6 以後新增了標準庫 multiprocessingPEP 371)突破了 GIL 的限制能夠充分利用多核,但因爲協程是基於單個線程的,所以多進程的並行對它們來講狀況是相似的,所以這裏只討論單核併發的實現。git

要了解線程以及協程的原理和由來能夠查看參考連接中的前兩篇文章。Python 3.5 中關於線程的標準庫是 threading,以前在 2.x 版本中的 thread 在 3.x 以後改名爲 _thread ,不管是2.7仍是3.5都應該儘可能避免使用較爲底層的 thread/_thread 而應該使用 threadinggithub

建立一個線程能夠經過實例化一個 threading.Thread 對象:golang

from threading import Thread
import time

def _sum(x, y):
    print("Compute {} + {}...".format(x, y))
    time.sleep(2.0)
    return x+y
def compute_sum(x, y):
    result = _sum(x, y)
    print("{} + {} = {}".format(x, y, result))

start = time.time()    
threads = [
    Thread(target=compute_sum, args=(0,0)),
    Thread(target=compute_sum, args=(1,1)),
    Thread(target=compute_sum, args=(2,2)),
]
for t in threads:
    t.start()
for t in threads:
    t.join()
print("Total elapsed time {} s".format(time.time() - start))

# Do not use Thread
start = time.time()
compute_sum(0,0)
compute_sum(1,1)
compute_sum(2,2)
print("Total elapsed time {} s".format(time.time() - start))
Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 = 0
1 + 1 = 2
2 + 2 = 4
Total elapsed time 2.002729892730713 s
Compute 0 + 0...
0 + 0 = 0
Compute 1 + 1...
1 + 1 = 2
Compute 2 + 2...
2 + 2 = 4
Total elapsed time 6.004806041717529 s

除了經過將函數傳遞給 Thread 建立線程實例以外,還能夠直接繼承 Thread 類:算法

from threading import Thread
import time
class ComputeSum(Thread):
    def __init__(self, x, y):
        super().__init__()
        self.x = x
        self.y = y
    def run(self):
        result = self._sum(self.x, self.y)
        print("{} + {} = {}".format(self.x, self.y, result))
    def _sum(self, x, y):
        print("Compute {} + {}...".format(x, y))
        time.sleep(2.0)
        return x+y 
threads = [ComputeSum(0,0), ComputeSum(1,1), ComputeSum(2,2)]
start = time.time()
for t in threads:
    t.start()
for t in threads:
    t.join()
print("Total elapsed time {} s".format(time.time() - start))
Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 = 0
1 + 1 = 2
2 + 2 = 4
Total elapsed time 2.001662015914917 s

根據上面代碼執行的結果能夠發現,compute_sum/t.run 函數的執行是按照 start() 的順序,但 _sum 結果的輸出順序倒是隨機的。由於 _sum 中加入了 time.sleep(2.0) ,讓程序執行到這裏就會進入阻塞狀態,可是幾個線程的執行看起來卻像是同時進行的(併發)。編程

有時候咱們既須要併發地「跳過「阻塞的部分,又須要有序地執行其它部分,例如操做共享數據的時候,這時就須要用到」鎖「。在上述」求和線程「的例子中,假設每次求和都須要加上額外的 _base 並把計算結果累積到 _base 中。儘管這個例子不太恰當,但它說明了線程鎖的用途:segmentfault

from threading import Thread, Lock
import time
_base = 1
_lock = Lock()
class ComputeSum(Thread):
    def __init__(self, x, y):
        super().__init__()
        self.x = x
        self.y = y
    def run(self):
        result = self._sum(self.x, self.y)
        print("{} + {} + base = {}".format(self.x, self.y, result))
    def _sum(self, x, y):
        print("Compute {} + {}...".format(x, y))
        time.sleep(2.0)
        global _base
        with _lock:
            result = x + y + _base
            _base = result
        return result
threads = [ComputeSum(0,0), ComputeSum(1,1), ComputeSum(2,2)]

start = time.time()
for t in threads:
    t.start()
for t in threads:
    t.join()
print("Total elapsed time {} s".format(time.time() - start))
Compute 0 + 0...
Compute 1 + 1...
Compute 2 + 2...
0 + 0 + base = 1
1 + 1 + base = 3
2 + 2 + base = 7
Total elapsed time 2.0064051151275635 s

這裏用上下文管理器來管理鎖的獲取和釋放,至關於:併發

_lock.acquire()
try:
    result = x + y + _base
    _base  = result
finally:
    _lock.release()

死鎖異步

線程的一大問題就是經過加鎖來」搶奪「共享資源的時候有可能形成死鎖,例以下面的程序:

from threading import Lock
_base_lock = Lock()
_pos_lock  = Lock()
_base = 1

def _sum(x, y):
    # Time 1
    with _base_lock:
        # Time 3
        with _pos_lock:
            result = x + y
    return result
def _minus(x, y):
    # Time 0
    with _pos_lock:
        # Time 2
        with _base_lock:
            result = x - y
    return result

因爲線程的調度執行順序是不肯定的,在執行上面兩個線程 _sum/_minus 的時候就有可能出現註釋中所標註的時間順序,即 # Time 0 的時候運行到 with _pos_lock 獲取了 _pos_lock 鎖,而接下來因爲阻塞立刻切換到了 _sum 中的 # Time 1 ,並獲取了 _base_lock,接下來因爲兩個線程互相鎖定了彼此須要的下一個鎖,將會致使死鎖,即程序沒法繼續運行。根據 我是一個線程 中所描述的,爲了不死鎖,須要全部的線程按照指定的算法(或優先級)來進行加鎖操做。無論怎麼說,死鎖問題都是一件很是傷腦筋的事,緣由之一在於無論線程實現的是併發仍是並行,在編程模型和語法上看起來都是並行的,而咱們的大腦雖然是一個(內隱的)絕對並行加工的機器,卻很是不善於將並行過程具象化(至少在未經足夠訓練的時候)。而與線程相比,協程(尤爲是結合事件循環)不管在編程模型仍是語法上,看起來都是很是友好的單線程同步過程。後面第二部分咱們再來討論 Python 中協程是如何從」小三「一步步扶正上位的:D


歡迎關注公衆號 PyHub 每日推送

歡迎關注公衆號 PyHub!

參考

  1. Python 中的進程、線程、協程、同步、異步、回調

  2. 我是一個線程

  3. Concurrency is not Parallelism

  4. A Curious Course on Coroutines and Concurrency

  5. PyDocs: 17.1. threading — Thread-based parallelism

  6. PyDocs: 18.5.3. Tasks and coroutines

  7. [譯] Python 3.5 協程到底是個啥

  8. 協程的好處是什麼? - crazybie 的回答

  9. Py3-cookbook:第十二章:併發編程

  10. Quora: What are the differences between parallel, concurrent and asynchronous programming?

相關文章
相關標籤/搜索