在多線程(multithreaded, MT)編程出現以前,計算機程序的執行是由單個步驟序列組成的,該序列在主機的 CPU 中按照同步順序執行。不管是任務自己須要按照步驟順序執行,仍是整個程序實際上包含多個子任務,都須要按照這種順序方式執行。那麼,假如這些子任務相互獨立,沒有因果關係(也就是說,各個子任務的結果並不影響其餘子任務的結果),這種作法是否是不符合邏輯呢?要是讓這些獨立的任務同時運行,會怎麼樣呢?很明顯,這種並行處理方式能夠顯著地提升整個任務的性能。這就是多線程編程。html
多線程編程對於具備以下特色的編程任務而言是很是理想的:本質上是異步的;須要多個併發活動;每一個活動的處理順序多是不肯定的,或者說是隨機的、不可預測的。這種編程任務能夠被組織或劃分紅多個執行流,其中每一個執行流都有一個指定要完成的任務。根據應用的不一樣,這些子任務可能須要計算出中間結果,而後合併爲最終的輸出結果。html5
計算密集型的任務能夠比較容易地劃分紅多個子任務,而後按順序執行或按照多線程方式執行。而那種使用單線程處理多個外部輸入源的任務就不那麼簡單了。若是不使用多線程,要實現這種編程任務就須要爲串行程序使用一個或多個計時器,並實現一個多路複用方案。python
一個串行程序須要從每一個 I/O 終端通道來檢查用戶的輸入;然而,有一點很是重要,程序在讀取 I/O 終端通道時不能阻塞,由於用戶輸入的到達時間是不肯定的,而且阻塞會妨礙其餘 I/O 通道的處理。串行程序必須使用非阻塞 I/O 或擁有計時器的阻塞 I/O(以保證阻塞只是暫時的)。程序員
因爲串行程序只有惟一的執行線程,所以它必須兼顧須要執行的多個任務,確保其中的某個任務不會佔用過多時間,並對用戶的響應時間進行合理的分配。這種任務類型的串行程序的使用,每每形成很是複雜的控制流,難以理解和維護。web
使用多線程編程,以及相似 Queue 的共享數據結構(本章後面會討論的一種多線程隊列數據結構),這個編程任務能夠規劃成幾個執行特定函數的線程。正則表達式
使用多線程來規劃這種編程任務能夠下降程序的複雜性,使其實現更加清晰、高效、簡潔。每一個線程中的邏輯都不復雜,由於它只有一個要完成的特定做業。好比,UserRequestThread 的功能僅僅是讀取用戶輸入,而後把輸入數據放到隊列裏,以供其餘線程後續處理。每一個線程都有其明確的做業,你只須要設計每類線程去作一件事,並把這件事情作好就能夠了。這種特定任務線程的使用與亨利·福特生產汽車的流水線模型有些許類似。數據庫
計算機程序只是存儲在磁盤上的可執行二進制(或其餘類型)文件。只有把它們加載到內存中並被操做系統調用,才擁有其生命期。 進程(有時稱爲重量級進程)則是一個執行中的程序。每一個進程都擁有本身的地址空間、內存、數據棧以及其餘用於跟蹤執行的輔助數據。操做系統管理其上全部進程的執行,併爲這些進程合理地分配時間。進程也能夠經過派生(fork 或 spawn)新的進程來執行其餘任務,不過由於每一個新進程也都擁有本身的內存和數據棧等,因此只能採用進程間通訊(IPC)的方式共享信息。編程
線程(有時候稱爲輕量級進程)與進程相似,不過它們是在同一個進程下執行的,並共享相同的上下文。能夠將它們認爲是在一個主進程或「主線程」中並行運行的一些「迷你進程」。bootstrap
線程包括開始、執行順序和結束三部分。它有一個指令指針,用於記錄當前運行的上下文。當其餘線程運行時,它能夠被搶佔(中斷)和臨時掛起(也稱爲睡眠) ——這種作法叫作讓步(yielding)。緩存
一個進程中的各個線程與主線程共享同一片數據空間,所以相比於獨立的進程而言,線程間的信息共享和通訊更加容易。線程通常是以併發方式執行的,正是因爲這種並行和數據共享機制,使得多任務間的協做成爲可能。固然,在單核 CPU 系統中,由於真正的併發是不可能的,因此線程的執行其實是這樣規劃的:每一個線程運行一小會兒,而後讓步給其餘線程(再次排隊等待更多的 CPU 時間)。在整個進程的執行過程當中,每一個線程執行它本身特定的任務,在必要時和其餘線程進行結果通訊。
固然,這種共享並非沒有風險的。若是兩個或多個線程訪問同一片數據,因爲數據訪問順序不一樣,可能致使結果不一致。這種狀況一般稱爲競態條件(race condition)。幸運的是,大多數線程庫都有一些同步原語,以容許線程管理器控制執行和訪問。
另外一個須要注意的問題是,線程沒法給予公平的執行時間。這是由於一些函數會在完成前保持阻塞狀態,若是沒有專門爲多線程狀況進行修改,會致使 CPU 的時間分配向這些貪婪的函數傾斜。
本節將討論在如何在 Python 中使用線程,其中包括全局解釋器鎖對線程的限制和一個快速的演示腳本。
Python 代碼的執行是由Python虛擬機
(又名解釋器主循環)進行控制的。 Python 在設計時是這樣考慮的,在主循環中同時只能有一個控制線程在執行;就像單核 CPU 系統中的多進程同樣,內存中能夠有許多程序,可是在任意給定時刻只能有一個程序在運行。同理,儘管 Python 解釋器中能夠運行多個線程,可是在任意給定時刻只有一個線程會被解釋器執行。
對 Python 虛擬機的訪問是由全局解釋器鎖(GIL)控制的。這個鎖就是用來保證同時只能有一個線程運行的。在多線程環境中, Python 虛擬機將按照下面所述的方式執行:
time.sleep(0)
來完成)。當調用外部代碼(即,任意 C/C++擴展的內置函數)時, GIL 會保持鎖定,直至函數執行結束(由於在這期間沒有 Python 字節碼計數)。編寫擴展函數的程序員有能力解鎖 GIL,然而,做爲 Python 開發者, 你並不須要擔憂 Python 代碼會在這些狀況下被鎖住。例如,對於任意麪向 I/O 的 Python 例程(調用了內置的操做系統 C 代碼的那種),GIL 會在 I/O 調用前被釋放,以容許其餘線程在 I/O 執行的時候運行。而對於那些沒有太多 I/O 操做的代碼而言,更傾向於在該線程整個時間片內始終佔有處理器(和 GIL)。換句話說就是, I/O 密集型的 Python 程序要比計算密集型的代碼可以更好地利用多線程環境。
若是你對源代碼、解釋器主循環和 GIL 感興趣,能夠看看Python/ceval.c
文件。
當一個線程完成函數的執行時,它就會退出。另外,還能夠經過調用諸如thread.exit()
之類的退出函數,或者 sys.exit()
之類的退出 Python 進程的標準方法,亦或者拋出SystemExit
異常,來使線程退出。不過,你不能直接「終止」一個線程。
下一節將會詳細討論兩個與線程相關的 Python 模塊,不過在這兩個模塊中,不建議使用thread模塊。給出這個建議有不少緣由,其中最明顯的一個緣由是模塊thread在主線程退出以後,全部其餘線程都會在沒有清理的狀況下直接退出。而另外一個模塊 threading 會確保在全部「重要的」子線程退出前,保持整個進程的存活(對於「重要的」這個含義的說明,請閱讀下面的核心提示:「避免使用 thread 模塊」)。
而主線程應該作一個好的管理者,負責瞭解每一個單獨的線程須要執行什麼,每一個派生的線程須要哪些數據或參數,這些線程執行完成後會提供什麼結果。這樣,主線程就能夠收集每一個線程的結果,而後彙總成一個有意義的最終結果。
Python 雖然支持多線程編程,可是還須要取決於它所運行的操做系統。以下操做系統是支持多線程的:絕大多數類 UNIX 平臺(如 Linux、 Solaris、 Mac OS X、 *BSD 等),以及Windows 平臺。 Python 使用兼容 POSIX 的線程,也就是衆所周知的 pthread。
默認狀況下,從源碼構建的 Python(2.0 及以上版本)或者 Win32 二進制安裝的 Python,線程支持是已經啓用的。要肯定你的解釋器是否支持線程,只須要從交互式解釋器中嘗試導入 thread 模塊便可,以下所示(若是線程是可用的,則不會產生錯誤)。
>>> import thread >>>
若是你的 Python 解釋器沒有將線程支持編譯進去,模塊導入將會失敗。
>>> import thread Traceback (innermost last): File "<stdin>", line 1, in ? ImportError: No module named thread
這種狀況下,你可能須要從新編譯你的 Python 解釋器纔可以使用線程。通常能夠在調用configure 腳本的時候使用–with-thread 選項。查閱你所使用的發行版本的 README 文件,來獲取如何在你的系統中編譯線程支持的 Python 的指定指令。
注意:Python3.x 線程中 thread 模塊已被廢棄,用戶可使用 threading 模塊代替。在 Python3.x 中不能再使用"thread" 模塊。爲了兼容性,Python3.x 將 thread 重命名爲 「_thread」。
In [1]: import thread --------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) <ipython-input-1-e75c663b2a08> in <module> ----> 1 import thread ModuleNotFoundError: No module named 'thread' In [2]: import threading In [3]: import _thread In [4]:
在第一個例子中, 咱們將使用 time.sleep()函數來演示線程是如何工做的。 time.sleep()函數須要一個浮點型的參數,而後以這個給定的秒數進行「睡眠」,也就是說,程序的執行會暫時中止指定的時間。
建立兩個時間循環:一個睡眠 4 秒(loop0());另外一個睡眠 2 秒(loop1())(這裏使用「loop0」和「loop1」做爲函數名,暗示咱們最終會有一個循環序列)。若是在一個單進程或單線程的程序中順序執行 loop0()和 loop1(),就會像示例 4-1 中的 onethr.py 同樣,整個執行時間至少會達到 6 秒鐘。而在啓動 loop0()和 loop1()以及執行其餘代碼時,也有可能存在 1 秒的開銷,使得整個時間達到 7 秒。
示例1 使用單線程執行循環(
onethr.py
)
該腳本在一個單線程程序裏連續執行兩個循環。一個循環必須在另外一個開始前完成。總共消耗的時間是每一個循環所用時間之和。
from time import sleep, ctime def loop0(): print('start loop 0 at:', ctime()) sleep(4) print('loop 0 done at:', ctime()) def loop1(): print('start loop 1 at:', ctime()) sleep(2) print('loop 1 done at:', ctime()) def main(): print('starting at:', ctime()) loop0() loop1() print('all DONE at:', ctime()) if __name__ == '__main__': main()
能夠經過執行onethr.py
來驗證這一點,下面是輸出結果:
$python onethr.py starting at: Thu Feb 28 10:54:40 2019 start loop 0 at: Thu Feb 28 10:54:40 2019 loop 0 done at: Thu Feb 28 10:54:44 2019 start loop 1 at: Thu Feb 28 10:54:44 2019 loop 1 done at: Thu Feb 28 10:54:46 2019 all DONE at: Thu Feb 28 10:54:46 2019
如今,假設 loop0()和 loop1()中的操做不是睡眠,而是執行獨立計算操做的函數,全部結果彙總成一個最終結果。那麼,讓它們並行執行來減小總的執行時間是否是有用的呢?這就是如今要介紹的多線程編程的前提。
Python 提供了多個模塊來支持多線程編程,包括 thread、 threading 和 Queue 模塊等。程序是可使用 thread 和 threading 模塊來建立與管理線程。 thread 模塊提供了基本的線程和鎖定支持;而 threading 模塊提供了更高級別、功能更全面的線程管理。使用 Queue 模塊,用戶能夠建立一個隊列數據結構,用於在多線程之間進行共享。咱們將分別來查看這幾個模塊,並給出幾個例子和中等規模的應用。
核心提示:避免使用 thread 模塊
推薦使用更高級別的 threading 模塊,而不使用 thread 模塊有不少緣由。 threading 模塊更加先進,有更好的線程支持,而且 thread 模塊中的一些屬性會和 threading 模塊有衝突。另外一個緣由是低級別的 thread 模塊擁有的同步原語不多(實際上只有一個),而 threading模塊則有不少。
不過,出於對 Python 和線程學習的興趣,咱們將給出使用 thread 模塊的一些代碼。給出這些代碼只是出於學習目的,但願它可以讓你更好地領悟爲何應該避免使用thread 模塊。咱們還將展現如何使用更加合適的工具,如 threading 和 Queue 模塊中的那些方法。
避免使用 thread 模塊的另外一個緣由是它對於進程什麼時候退出沒有控制。當主線程結束時,全部其餘線程也都強制結束,不會發出警告或者進行適當的清理。如前所述,至少threading 模塊能確保重要的子線程在進程退出前結束。
咱們只建議那些想訪問線程的更底層級別的專家使用 thread 模塊。爲了強調這一點,在 Python3 中該模塊被重命名爲_thread。你建立的任何多線程應用都應該使用 threading 模塊或其餘更高級別的模塊。
讓咱們先來看看 thread 模塊提供了什麼。除了派生線程外, thread 模塊還提供了基本的同步數據結構,稱爲鎖對象(lock object,也叫原語鎖、 簡單鎖、 互斥鎖、 互斥和二進制信號量)。如前所述,這個同步原語和線程管理是密切相關的。
表 4-1 列出了一些經常使用的線程函數,以及 LockType 鎖對象的方法。
thread 模塊的核心函數是 start_new_thread()。它的參數包括函數(對象)、函數的參數以及可選的關鍵字參數。將專門派生新的線程來調用這個函數。
把多線程整合進onethr.py
這個例子中。把對 loop*()函數的調用稍微改變一下,獲得示例4-2 中的mtsleepA.py
文件。
表 4-1 thread 模塊和鎖對象
函數/方法 | 描 述 |
---|---|
thread |
模塊的函數 |
start_new_thread(function, args, kwargs=None) |
派生一個新的線程,使用給定的 args 和可選的 kwargs 來執行 function |
allocate_lock() |
分配 LockType 鎖對象 |
exit() |
給線程退出指令 |
LockType |
鎖對象的方法 |
acquire(wait=None) |
嘗試獲取鎖對象 |
locked() |
若是獲取了鎖對象則返回 True,不然,返回 False |
release() |
釋放鎖 |
示例2 使用thread模塊(
mtsleepA.py
)
這裏執行的循環和onethr.py
是同樣的,不過此次使用了thread模塊提供的簡單多線程機制。兩個循環是併發執行的(很明顯,短的那個先結束),所以總的運行時間只與最慢的那個線程相關,而不是每一個線程運行時間之和。
import _thread as thread from time import sleep, ctime def loop0(): print('start loop 0 at:', ctime()) sleep(4) print('loop 0 done at:', ctime()) def loop1(): print('start loop 1 at:', ctime()) sleep(2) print('loop 1 done at:', ctime()) def main(): print('starting at:', ctime()) thread.start_new_thread(loop0, ()) thread.start_new_thread(loop1, ()) sleep(6) print('all DONE at:', ctime()) if __name__ == '__main__': main()
start_new_thread()
必須包含開始的兩個參數,因而即便要執行的函數不須要參數,也須要傳遞一個空元組。
與以前的代碼相比,本程序執行後的輸出結果有很大不一樣。原來須要運行 6~7 秒的時間,而如今的腳本只須要運行 4 秒,也就是最長的循環加上其餘全部開銷的時間之和。
$python mtsleepA.py starting at: Thu Feb 28 11:09:56 2019 start loop 0 at: Thu Feb 28 11:09:56 2019 start loop 1 at: Thu Feb 28 11:09:56 2019 loop 1 done at: Thu Feb 28 11:09:58 2019 loop 0 done at: Thu Feb 28 11:10:00 2019 all DONE at: Thu Feb 28 11:10:02 2019
睡眠 4 秒和睡眠 2 秒的代碼片斷是併發執行的,這樣有助於減小總體的運行時間。你甚至能夠看到 loop 1 是如何在 loop 0 以前結束的。
這個應用程序中剩下的一個主要區別是增長了一個 sleep(6)調用。爲何必需要這樣作呢?這是由於若是咱們沒有阻止主線程繼續執行,它將會繼續執行下一條語句,顯示「all DONE」而後退出,而 loop0()和 loop1()這兩個線程將直接終止。
咱們沒有寫讓主線程等待子線程所有完成後再繼續的代碼,即咱們所說的線程須要某種形式的同步。在這個例子中,調用 sleep()來做爲同步機制。將其值設定爲 6 秒是由於咱們知道全部線程(用時 4 秒和 2 秒的)會在主線程計時到 6 秒以前完成。
你可能會想到,確定會有比在主線程中額外延時 6 秒更好的線程管理方式。因爲這個延時,整個程序的運行時間並無比單線程的版本更快。像這樣使用 sleep()來進行線程同步是不可靠的。若是循環有獨立且不一樣的執行時間要怎麼辦呢?咱們可能會過早或過晚退出主線程。這就是引出鎖的緣由。
再一次修改代碼,引入鎖,並去除單獨的循環函數,修改後的代碼爲mtsleepB.py
,如示例 4-3 所示。咱們能夠看到輸出結果與mtsleepA.py
類似。惟一的區別是咱們不須要再像mtsleepA.py
那樣等待額外的時間後才能結束。經過使用鎖,咱們能夠在全部線程所有完成執行後當即退出。其輸出結果以下所示。
$ mtsleepB.py starting at Thu Feb 28 11:38:17 2019 start loop 0 at: Thu Feb 28 11:38:17 2019 start loop 1 at: Thu Feb 28 11:38:17 2019 loop 1 done at: Thu Feb 28 11:38:19 2019 loop 0 done at: Thu Feb 28 11:38:21 2019 all DONE at: Thu Feb 28 11:38:21 2019
那麼咱們是如何使用鎖來完成任務的呢?下面詳細分析源代碼。
示例 3 使用線程和鎖(
mtsleepB.py
)
與mtsleepA.py
中調用 sleep()來掛起主線程不一樣,鎖的使用將更加合理。
import _thread as thread from time import sleep, ctime loops = [4, 2] def loop(nloop, nsec, lock): print('start loop', nloop, 'at:', ctime()) sleep(nsec) print('loop', nloop, 'done at:', ctime()) lock.release() def main(): print('starting at', ctime()) locks = [] nloops = range(len(loops)) # 返回[0, len(loops))範圍內的一個可迭代對象(類型是對象),而不是列表類型,因此打印的時候不會打印列表。 for i in nloops: lock = thread.allocate_lock() # 獲得鎖對象 lock.acquire() # 取得鎖,效果等同於將鎖鎖上 locks.append(lock) # 將鎖添加到鎖列表中 for i in nloops: thread.start_new_thread(loop, (i, loops[i], locks[i])) for i in nloops: while locks[i].locked(): pass print('all DONE at:', ctime()) if __name__ == '__main__': main()
逐行解釋
第 1~4 行
在 UNIX 啓動行後,導入了 time 模塊的幾個熟悉屬性以及 thread 模塊。咱們再也不把 4秒和 2 秒硬編碼到不一樣的函數中,而是使用了惟一的 loop()函數,並把這些常量放進列表loops 中。
第 6~10 行
loop()函數代替了以前例子中的 loop*()函數。所以,咱們必須在 loop()函數中作一些修改,以便它能使用鎖來完成本身的任務。其中最明顯的變化是咱們須要知道如今處於哪一個循環中,以及須要睡眠多久。最後一個新的內容是鎖自己。每一個線程將被分配一個已得到的鎖。當sleep()的時間到了的時候,釋放對應的鎖,向主線程代表該線程已完成。
第 12~29 行
大部分工做是在 main()中完成的,這裏使用了 3 個獨立的 for 循環。首先建立一個鎖的列表,經過使用 thread.allocate_lock()
函數獲得鎖對象,而後經過 acquire()方法取得(每一個鎖)。取得鎖效果至關於「把鎖鎖上」。一旦鎖被鎖上後,就能夠把它添加到鎖列表 locks 中。下一個循環用於派生線程,每一個線程會調用 loop()函數,並傳遞循環號、睡眠時間以及用於該線程的鎖這幾個參數。那麼爲何咱們不在上鎖的循環中啓動線程呢?這有兩個緣由:其一,咱們想要同步線程,以便「全部的馬同時衝出圍欄」;其二,獲取鎖須要花費一點時間。若是線程執行得太快,有可能出現獲取鎖以前線程就執行結束的狀況。
在每一個線程執行完成時,它會釋放本身的鎖對象。最後一個循環只是坐在那裏等待(暫停主線程),直到全部鎖都被釋放以後纔會繼續執行。由於咱們按照順序檢查每一個鎖,全部可能會被排在循環列表前面可是執行較慢的循環所拖累。這種狀況下,大部分時間是在等待最前面的循環。當這種線程的鎖被釋放時,剩下的鎖可能早已被釋放(也就是說,對應的線程已經執行完畢)。結果就是主線程會飛快地、沒有停頓地完成對剩下鎖的檢查。最後,你應該知道只有當咱們直接調用這個腳本時,最後幾行語句纔會執行 main()函數。
正如在前面的核心筆記中所提示的,這裏使用 thread 模塊只是爲了介紹多線程編程。多線程應用程序應當使用更高級別的模塊,好比下一節將要討論到的 threading 模塊。
如今介紹更高級別的 threading 模塊。除了 Thread 類之外,該模塊還包括許多很是好用的同步機制。表 4-2 給出了 threading 模塊中全部可用對象的列表。
表 4-2 threading 模塊的對象:
對 象 | 描 述 |
---|---|
Thread |
表示一個執行線程的對象 |
Lock |
鎖原語對象(和 thread 模塊中的鎖同樣) |
RLock |
可重入鎖對象,使單一線程能夠(再次)得到已持有的鎖(遞歸鎖) |
Condition |
條件變量對象,使得一個線程等待另外一個線程知足特定的「條件」,好比改變狀態或某個數據值 |
Event |
條件變量的通用版本,任意數量的線程等待某個事件的發生,在該事件發生後全部線程將被激活 |
Semaphore |
爲線程間共享的有限資源提供了一個「計數器」,若是沒有可用資源時會被阻塞 |
BoundedSemaphore |
與 Semaphore 類似,不過它不容許超過初始值 |
Timer |
與 Thread 類似,不過它要在運行前等待一段時間 |
Barrier 1 |
建立一個「障礙」,必須達到指定數量的線程後才能夠繼續 |
本節將研究如何使用 Thread 類來實現多線程。因爲以前已經介紹過鎖的基本概念,所以這裏不會再對鎖原語進行介紹。由於 Thread()類一樣包含某種同步機制,因此鎖原語的顯式使用再也不是必需的了。
核心提示:守護線程
避免使用 thread 模塊的另外一個緣由是該模塊不支持守護線程這個概念。當主線程退出時,全部子線程都將終止,無論它們是否仍在工做。若是你不但願發生這種行爲,就要引入守護線程的概念了。
threading 模塊支持守護線程,其工做方式是:守護線程通常是一個等待客戶端請求服務的服務器。若是沒有客戶端請求,守護線程就是空閒的。若是把一個線程設置爲守護線程,就表示這個線程是不重要的,進程退出時不須要等待這個線程執行完成。如同在第 2 章中看到的那樣,服務器線程遠行在一個無限循環裏,而且在正常狀況下不會退出。
若是主線程準備退出時,不須要等待某些子線程完成,就能夠爲這些子線程設置守護線程標記。該標記值爲真時,表示該線程是不重要的,或者說該線程只是用來等待客戶端請求而不作任何其餘事情。
要將一個線程設置爲守護線程,須要在啓動線程以前執行以下賦值語句:thread.daemon = True
(調用thread.setDaemon(True)
的舊方法已經棄用了)。一樣,要檢查線程的守護狀態,也只須要檢查這個值便可(對比過去調用 thread.isDaemon()的方法)。一個新的子線程會繼承父線程的守護標記。整個 Python 程序(能夠解讀爲:主線程)將在全部非守護線程退出以後才退出,換句話說,就是沒有剩下存活的非守護線程時。
threading 模塊的 Thread 類是主要的執行對象。它有 thread 模塊中沒有的不少函數。表 4-3 給出了它的屬性和方法列表。
表 4-3 Thread 對象的屬性和方法
屬 性 | 描 述 |
---|---|
Thread對象數據屬性: | |
name |
線程名 |
ident |
線程的標識符 |
daemon |
布爾標誌,表示這個線程是不是守護線程 |
Thread對象方法: | |
_init_(group=None, tatget=None, name=None, args=(),kwargs ={}, verbose=None, daemon=None) ③ |
實例化一個線程對象,須要有一個可調用的 target,以及其參數 args或 kwargs。還能夠傳遞 name 或 group 參數,不事後者還未實現。此外 , verbose 標 志 也 是 可 接 受 的。 而 daemon 的 值 將 會 設定thread.daemon 屬性/標誌 |
start() |
開始執行該線程 |
run() |
定義線程功能的方法(一般在子類中被應用開發者重寫) |
join(timeout=None) |
直至啓動的線程終止以前一直掛起;除非給出了 timeout(秒),不然會一直阻塞 |
getName() ① |
返回線程名 |
setName(name) ① |
設定線程名 |
isAlivel /is_alive () ② |
布爾標誌,表示這個線程是否還存活 |
isDaemon() ③ |
若是是守護線程,則返回 True;不然,返回 False |
setDaemon(daemonic) ③ |
把線程的守護標誌設定爲布爾值 daemonic(必須在線程 start()以前調用) |
注:
① 該方法已棄用,更好的方式是設置(或獲取)thread.name
屬性,或者在實例化過程當中傳遞該屬性。
② 駝峯式命名已經棄用,而且從 Python 2.6 版本起已經開始被取代。
③ is/setDaemon()已經棄用,應當設置 thread.daemon 屬性;從 Python 3.3 版本起,也能夠經過可選的 daemon 值在實例化過程當中設定 thread.daemon 屬性。
使用 Thread 類,能夠有不少方法來建立線程。咱們將介紹其中比較類似的三種方法。選擇你以爲最舒服的,或者是最適合你的應用和將來擴展的方法(咱們更傾向於最後一種方案)。
你會發現你將選擇第一個或第三個方案。當你須要一個更加符合面向對象的接口時,會選擇後者;不然會選擇前者。老實說,你會發現第二種方案顯得有些尷尬而且稍微難以閱讀。
在第一個例子中,咱們只是把 Thread 類實例化,而後將函數(及其參數)傳遞進去,和以前例子中採用的方式同樣。當線程開始執行時,這個函數也會開始執行。把示例 4-3 的mtsleepB.py
腳本進行修改,添加使用 Thread 類,獲得示例 4-4 中的 mtsleepC.py
文件。
示例 4 使用 threading 模塊(
mtsleepC.py
)
threading 模塊的 Thread 類有一個 join()方法,可讓主線程等待全部線程執行完畢。
import threading from time import sleep, ctime loops = [4, 2] def loop(nloop, nsec): print('start loop', nloop, 'at:', ctime()) sleep(nsec) print('loop', nloop, 'done at:', ctime()) def main(): print('starting at', ctime()) threads = [] nloops = range(len(loops)) # 返回[0, len(loops))範圍內的一個可迭代對象(類型是對象),而不是列表類型,因此打印的時候不會打印列表。 for i in nloops: t = threading.Thread(target=loop, args=(i, loops[i])) # args是調用target的參數 threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() print('all DONE at:', ctime()) if __name__ == '__main__': main()
當運行示例 4 中的腳本時,能夠獲得和以前類似的輸出。
$ mtsleepC.py starting at Thu Feb 28 16:05:05 2019 start loop 0 at: Thu Feb 28 16:05:05 2019 start loop 1 at: Thu Feb 28 16:05:05 2019 loop 1 done at: Thu Feb 28 16:05:07 2019 loop 0 done at: Thu Feb 28 16:05:09 2019 all DONE at: Thu Feb 28 16:05:09 2019
那麼,這裏到底作了哪些修改呢?使用 thread 模塊時實現的鎖沒有了,取而代之的是一組 Thread 對象。當實例化每一個 Thread 對象時,把函數(target)和參數(args)傳進去,而後獲得返回的 Thread 實例。==實例化 Thread(調用 Thread())和調用 thread.start_new_thread()的最大區別是新線程不會當即開始執行。==這是一個很是有用的同步功能,尤爲是當你並不但願線程當即開始執行時。
當全部線程都分配完成以後,經過調用每一個線程的 start()方法讓它們開始執行,而不是在這以前就會執行。相比於管理一組鎖(分配、獲取、釋放、檢查鎖狀態等)而言,這裏只須要爲每一個線程調用 join()方法便可。 join()方法將等待線程結束,或者在提供了超時時間的狀況下,達到超時時間。使用 join()方法要比等待鎖釋放的無限循環更加清晰(這也是這種鎖又稱爲自旋鎖的緣由)。
對於 join()方法而言,其另外一個重要方面是其實它根本不須要調用。一旦線程啓動,它們就會一直執行,直到給定的函數完成後退出。若是主線程還有其餘事情要去作,而不是等待這些線程完成(例如其餘處理或者等待新的客戶端請求),就能夠不調用 join()。 join()方法只有在你須要等待線程完成的時候纔是有用的。
在建立線程時,與傳入函數類似的一個方法是傳入一個可調用的類的實例,用於線程執行——這種方法更加接近面向對象的多線程編程。這種可調用的類包含一個執行環境,比起一個函數或者從一組函數中選擇而言,有更好的靈活性。如今你有了一個類對象,而不只僅是單個函數或者一個函數列表/元組。
在 mtsleepC.py
的代碼中添加一個新類 ThreadFunc,並進行一些其餘的輕微改動,獲得mtsleepD.py
,如示例 5 所示。
示例 5 使用可調用的類(
mtsleepD.py
)
本例中,將傳遞進去一個可調用類(實例)而不只僅是一個函數。相比於 mtsleepC.py,這個實現中提供了更加面向對象的方法。
import threading from time import sleep, ctime loops = [4, 2] class ThreadFunc(object): def __init__(self, func, args, # name='' ): # self.name = name self.func = func self.args = args # __call__(self,\*args) 把實例對象做爲函數調用 # 一個類實例也能夠變成一個可調用對象,只須要實現一個特殊方法__call__()。 def __call__(self): self.func(*self.args) # 拆包 def loop(nloop, nsec): print('start loop', nloop, 'at:', ctime()) sleep(nsec) print('loop', nloop, 'done at:', ctime()) def main(): print('starting at', ctime()) threads = [] nloops = range(len(loops)) # 返回[0, len(loops))範圍內的一個可迭代對象(類型是對象),而不是列表類型,因此打印的時候不會打印列表。 for i in nloops: # create all threads t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), # loop.__name__ )) # print("loop.__name__", loop.__name__, type(loop.__name__)) # loop.__name__類名 # out: loop.__name__ loop <class 'str'> threads.append(t) for i in nloops: # start all threads threads[i].start() for i in nloops: # wait for completion threads[i].join() print('all DONE at:', ctime()) if __name__ == '__main__': main()
當運行 mtsleepD.py
時,獲得了下面的輸出:
$ mtsleepD.py starting at Thu Feb 28 16:39:03 2019 start loop 0 at: Thu Feb 28 16:39:03 2019 start loop 1 at: Thu Feb 28 16:39:03 2019 loop 1 done at: Thu Feb 28 16:39:05 2019 loop 0 done at: Thu Feb 28 16:39:07 2019 all DONE at: Thu Feb 28 16:39:07 2019
那麼,此次又修改了什麼呢?主要是添加了 ThreadFunc 類,並在實例化 Thread 對象時作了一點小改動,同時實例化了可調用類 ThreadFunc。實際上,這裏完成了兩個實例化。讓咱們先仔細看看 ThreadFunc 類吧。
咱們但願這個類更加通用,而不是侷限於 loop()函數,所以添加了一些新的東西,好比讓這個類保存了函數的參數、函數自身以及函數名的字符串。而構造函數__init__()
用於設定上述這些值。
當建立新線程時, Thread 類的代碼將調用 ThreadFunc 對象,此時會調用__call__()
這個特殊方法。因爲咱們已經有了要用到的參數,這裏就不須要再將其傳遞給 Thread()的構造函數了,直接調用便可。
最後要介紹的這個例子要調用 Thread()的子類,和上一個建立可調用類的例子有些類似。當建立線程時使用子類要相對更容易閱讀(第 29~30 行)。示例 4~6 中給出mtsleepE.py
的代碼,並給出它執行的輸出結果,最後會留給讀者一個比較mtsleepE.py
和mtsleepD.py
的練習。
下面是 mtsleepE.py
的輸出,和預期的同樣。
$ mtsleepE.py starting at Thu Feb 28 17:46:32 2019 start loop 0 at: Thu Feb 28 17:46:32 2019 start loop 1 at: Thu Feb 28 17:46:32 2019 loop 1 done at: Thu Feb 28 17:46:34 2019 loop 0 done at: Thu Feb 28 17:46:36 2019 all DONE at: Thu Feb 28 17:46:36 2019
示例 6 子類化的 Thread(
mtsleepE.py
)
本例中將對 Thread 子類化,而不是直接對其實例化。這將使咱們在定製線程對象時擁有更多的靈活性,也可以簡
化線程建立的調用過程。
import threading from time import sleep, ctime loops = [4, 2] class MyThread(threading.Thread): def __init__(self, func, args, # name='' ): threading.Thread.__init__(self) # self.name = name self.func = func self.args = args def run(self): self.func(*self.args) def loop(nloop, nsec): print('start loop', nloop, 'at:', ctime()) sleep(nsec) print('loop', nloop, 'done at:', ctime()) def main(): print('starting at', ctime()) threads = [] nloops = range(len(loops)) for i in nloops: # create all threads t = MyThread(loop, (i, loops[i]) # , loop.__name__ ) threads.append(t) for i in nloops: # start all threads threads[i].start() for i in nloops: # wait for completion threads[i].join() print('all DONE at:', ctime()) if __name__ == '__main__': main()
當比較mtsleepD和mstsleepE這兩個模塊的代碼時,注意其中的幾個重要變化:
1)MyThread子類的構造函數必須先調用其基類的構造函數(第 9 行);
2)以前的特殊方法__call__()
在這個子類中必需要寫爲 run()。
如今,對 MyThread 類進行修改,增長一些調試信息的輸出,並將其存儲爲一個名爲myThread 的獨立模塊(見示例 4-7),以便在接下來的例子中導入這個類。除了簡單地調用函數外,還將把結果保存在實例屬性 self.res 中,並建立一個新的方法 getResult()
來獲取這個值。
示例 7 Thread 子類 MyThread(
myThread.py
)
爲了讓 mtsleepE.py
中實現的 Thread 的子類更加通用,將這個子類移到一個專門的模塊中,並添加了可調用的getResult()
方法來取得返回值。
import threading from time import sleep, ctime loops = [4, 2] class MyThread(threading.Thread): def __init__(self, func, args, # name='' ): threading.Thread.__init__(self) # self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): print('starting', self.name, 'at:', ctime()) self.res = self.func(*self.args) print(self.name, 'finished at:', ctime()) def loop(nloop, nsec): # print('start loop', nloop, 'at:', ctime()) sleep(nsec) # print('loop', nloop, 'done at:', ctime()) def main(): print('starting at', ctime()) threads = [] nloops = range(len(loops)) for i in nloops: # create all threads t = MyThread(loop, (i, loops[i]) # , loop.__name__ ) threads.append(t) for i in nloops: # start all threads threads[i].start() for i in nloops: # wait for completion threads[i].join() print('all DONE at:', ctime()) if __name__ == '__main__': main()
執行結果:
$ python mtsleepE.py starting at Thu Feb 28 21:53:19 2019 starting Thread-1 at: Thu Feb 28 21:53:19 2019 starting Thread-2 at: Thu Feb 28 21:53:19 2019 Thread-2 finished at: Thu Feb 28 21:53:21 2019 Thread-1 finished at: Thu Feb 28 21:53:23 2019 all DONE at: Thu Feb 28 21:53:23 2019
除了各類同步和線程對象外, threading 模塊還提供了一些函數,如表 4-4 所示。
表 4-4 threading 模塊的函數
函 數 | 描 述 |
---|---|
activeCount/ active_count() ① |
當前活動的 Thread 對象個數 |
current Thread() / current_thread ① |
返回當前的 Thread 對象 |
enumerate() |
返回當前活動的 Thread 對象列表 |
settrace(func) ② |
爲全部線程設置一個 trace 函數 |
setprofile(func) ② |
爲全部線程設置一個 profile 函數 |
stack_size(size=0) ③ |
返回新建立線程的棧大小;或爲後續建立的線程設定棧的大小爲 size |
① 駝峯式命名已經棄用,而且從 Python 2.6 版本起已經開始被取代。 ② 自 Python 2.3 版本開始引入。 ③ thread.stack_size()的一個別名,(都是)從 Python 2.5 版本開始引入的。
示例 8 的mtfacfib.py
腳本比較了遞歸求斐波那契、階乘與累加函數的執行。該腳本按照單線程的方式運行這三個函數。以後使用多線程的方式執行一樣的任務,用來講明多線程環境的優勢。
示例 8 斐波那契、階乘與累加(
mtfacfib.py
)
在這個多線程應用中,將前後使用單線程和多線程的方式分別執行三個獨立的遞歸函數。
import threading from time import sleep, ctime class MyThread(threading.Thread): def __init__(self, func, args, name=''): threading.Thread.__init__(self) self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): print('starting', self.name, 'at:', ctime()) self.res = self.func(*self.args) print(self.name, 'finished at:', ctime()) def fib(x): sleep(0.005) if x < 2: return 1 return (fib(x - 2) + fib(x - 1)) def fac(x): sleep(0.1) if x < 2: return 1 return (x * fac(x - 1)) def sum(x): sleep(0.1) if x < 2: return 1 return (x + sum(x - 1)) funcs = [fib, fac, sum] n = 12 def main(): nfuncs = range(len(funcs)) print('*** SINGLE THREAD') for i in nfuncs: print('starting', funcs[i].__name__, 'at:', ctime()) print(funcs[i](n)) print(funcs[i].__name__, 'finished at:', ctime()) print('\n*** MULTIPLE THREADS') threads = [] for i in nfuncs: t = MyThread(funcs[i], (n, ), funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print(threads[i].getResult()) print('all DONE') if __name__ == '__main__': main()
以單線程模式運行時,只是簡單地依次調用每一個函數,並在函數執行結束後當即顯示相應的結果。
而以多線程模式運行時,並不會當即顯示結果。 由於咱們但願讓 MyThread 類越通用越好(有輸出和沒有輸出的調用都可以執行),咱們要一直等到全部線程都執行結束,而後調用getResult()方法來最終顯示每一個函數的返回值。
由於這些函數執行起來都很是快(也許斐波那契函數除外),因此你會發如今每一個函數中都加入了 sleep()調用,用於減慢執行速度,以便讓咱們看到多線程是如何改善性能的。在實際工做中,若是確實有不一樣的執行時間,你確定不會在其中調用 sleep()函數。不管如何,下面是程序的輸出結果。
$ python mtfacfib.py *** SINGLE THREAD starting fib at: Fri Mar 1 15:55:40 2019 233 fib finished at: Fri Mar 1 15:55:43 2019 starting fac at: Fri Mar 1 15:55:43 2019 479001600 fac finished at: Fri Mar 1 15:55:44 2019 starting sum at: Fri Mar 1 15:55:44 2019 78 sum finished at: Fri Mar 1 15:55:45 2019 *** MULTIPLE THREADS starting fib at: Fri Mar 1 15:55:45 2019 starting fac at: Fri Mar 1 15:55:45 2019 starting sum at: Fri Mar 1 15:55:45 2019 fac finished at: Fri Mar 1 15:55:47 2019 sum finished at: Fri Mar 1 15:55:47 2019 fib finished at: Fri Mar 1 15:55:48 2019 233 479001600 78 all DONE
到目前爲止,咱們已經見到的這些簡單的示例片斷都沒法表明你要在實踐中寫出的代碼。除了演示多線程和建立線程的不一樣方式外,以前的代碼實際上什麼有用的事情都沒有作。咱們啓動這些線程以及等待它們結束的方式都是同樣的,它們也全都睡眠。
3.1 節曾提到,因爲 Python 虛擬機是單線程(GIL)的緣由,只有線程在執行 I/O 密集型的應用時才能更好地發揮 Python 的併發性(對比計算密集型應用,它只須要作輪詢),所以讓咱們看一個 I/O 密集型的例子,而後做爲進一步的練習,嘗試將其移植到 Python 3 中,以讓你對向 Python 3 移植的處理有必定認識。
示例 9 的 bookrank.py
腳本很是直接。它將前往我最喜歡的在線零售商之一 Amazon,而後請求你但願查詢的圖書的當前排名。在這個示例代碼中,你能夠看到函數 getRanking()使用正則表達式來拉取和返回當前的排名,而函數_showRanking()用於向用戶顯示結果。
請記住,根據 Amazon 的使用條件,「Amazon 對您在本網站的訪問和我的使用授予有限許可,未經 Amazon 明確的書面贊成,不容許對所有或部份內容進行下載(頁面緩存除外)或修改。」在該程序中,咱們所作的只是查詢指定書籍的當前排名,沒有任何其餘操做,甚至都不會對頁面進行緩存。
示例 4-9 是咱們對於 bookrank.py
的第一次(不過與最終版本也很接近了)嘗試,這是一個沒有使用線程的版本。
示例 9 圖書排名「Screenscraper」(
bookrank.py
)
該腳本經過單線程進行下載圖書排名信息的調用。
逐行解釋
第 1~7 行
這些行用於啓動和導入模塊。這裏將使用 atexit.register()函數來告知腳本什麼時候結束(你將在後面看到緣由)。這裏還將使用正則表達式的 re.compile()函數,用於匹配 Amazon 商品頁中圖書排名的模式。而後,爲將來的改進(很快就會出現)導入了 threading.Thread 模塊,爲顯示時間戳字符串導入了 time.ctime(),爲訪問每一個連接導入了 urllib2.urlopen()。
第 9~15 行
在腳本里使用了 3 個常量:正則表達式對象 REGEX(對匹配圖書排名的正則模式進行了編譯); Amazon 商品頁基本連接 AMZN,爲了使這個連接完整,咱們只須要在最後填充上這本書的國際標準書號(ISBN),即用於區分不一樣做品的圖書 ID。 ISBN 有兩種標準: 10 字符長的 ISBN-10,以及它的新版——13 字符長的 ISBN-13。目前, Amazon 的系統對於兩種
ISBN 格式均可以識別,這裏使用了更短的 ISBN-10。在 ISBNs 字典中存儲了這個值及其對應的書名。
第 17~21 行
getRanking()函數的用途是根據 ISBN,建立與 Amazon 服務器通訊的最終 URL,而後調用 urllib2.urlopen()來打開這個地址。這裏使用字符串格式化操做符來拼接 URL(第 18 行),若是你使用的是 2.6 或以上版本,也能夠嘗試 str.format()方法,好比'{0}{1}'.format(AMZN,isbn)
。
獲得完整的 URL 以後,調用 urllib2.urlopen()函數——這裏簡寫爲 uopen(),一旦 Web 服務器鏈接成功,就能夠獲得服務器返回的相似文件的對象。而後調用 read()函數下載整個網頁,以及關閉這個「文件」。若是正則表達式與預期同樣精確,應當有且只有一個匹配,所以從生成的列表中抓取這個值(任何額外的結果都將丟棄),並將其返回。
第 23~25 行
_showRanking()函數只有一小段代碼,經過 ISBN,查詢其對應的書名,調用 getRanking()函數從 Amazon 網站上得到這本書的當前排名,而後把這些值輸出給用戶。函數名最前面的單下劃線表示這是一個特殊函數,只能被本模塊的代碼使用,不能被其餘使用本文件做爲庫或者工具模塊的應用導入。
第 27~30 行
_main()函數一樣是一個特殊函數,只有這個模塊從命令行直接運行時纔會執行該函數(而且不能被其餘模塊導入)。該函數會顯示起止時間(讓用戶瞭解整個腳本運行了多久),爲每一個 ISBN 調用_showRanking()函數以查詢和顯示其在 Amazon 上的當前排名。
第 32~37 行
這些行展示了一些徹底不一樣的東西。 atexit.register()是什麼呢?這個函數(這裏使用了裝飾器的方式)會在 Python 解釋器中註冊一個退出函數,也就是說,它會在腳本退出之 前 請 求 調 用 這 個 特 殊 函 數 。( 如 果 不 使 用 裝 飾 器 的 方 式 , 也 可 以 直 接 使 用register(_atexit()))。
爲何要在這裏使用這個函數呢?固然,目前而言,它並非必需的。輸出語句也能夠放在第 27~31 行的_main()函數結尾,不過那裏並非一個真的好位置。另外,這也是一個可能會在某種狀況下用於實際生產應用的功能。假設你知道第 36~37 行的含義,能夠獲得以下輸出結果:
$ python bookrank.py At Wed Mar 30 22:11:19 2011 PDT on Amazon... - 'Core Python Programming' ranked 87,118 - 'Python Fundamentals' ranked 851,816 - 'Python Web Development with Django' ranked 184,735 all DONE at: Wed Mar 30 22:11:25 2011
你可能會感到疑惑,爲何咱們會把數據的獲取(getRanking())和顯示(_showRanking()和_main())過程分開呢?這樣作是爲了防止你產生除了經過終端向用戶顯示結果之外的想法。在實踐中,你可能會有將數據經過 Web 模板返回、存儲在數據庫中或者發送結果文本到手機上等需求。若是把全部代碼都放在一個函數裏,會難以複用和/或從新調整。
此外,若是 Amazon 修改了商品頁的佈局,你可能須要修改正則表達式「screenscraper」以繼續從商品頁提取數據。還須要說明的是,在這個簡單的例子中使用正則表達式(或者只是簡學的舊式字符串處理)是沒有問題的,不過你可能須要一個更強大的標記解析器,好比標準庫中的 HTMLParser,第三方工具 BeautifulSoup、 html5lib 或者 lxml(第 9 章會演示其中部分工具)。
不須要你告訴我這仍然是一個愚蠢的單線程程序,咱們接下來就要使用多線程來修改這個應用。因爲這是一個 I/O 密集型應用,所以這個程序使用多線程是一個好的選擇。簡單起見,咱們不會在這裏使用任何類和麪向對象編程,而是使用 threading 模塊。咱們將直接使用Thread 類,因此你能夠認爲這更像是 mtsleepC.py 的衍生品,而不是它以後的例子。咱們將只是派生線程,而後當即啓動這些線程。
將應用中的_showRanking(isbn)進行以下修改。
Thread(target=_showRanking, args=(isbn,)).start().
就是這樣!如今,你獲得了 bookrank.py 的最終版本,能夠看出因爲增長了併發,這個應用(通常)會運行得更快。不過,程序可以運行多快還取決於最慢的那個響應。
$ python bookrank.py At Thu Mar 31 10:11:32 2011 on Amazon... - 'Python Fundamentals' ranked 869,010 - 'Core Python Programming' ranked 36,481 - 'Python Web Development with Django' ranked 219,228 all DONE at: Thu Mar 31 10:11:35 2011
正如你在輸出中所看到的,相比於單線程版本的 6 秒,多線程版本只須要運行 3 秒。而另一個須要注意的是,多線程版本按照完成的順序輸出,而單線程版本按照變量的順序。
在單線程版本中,順序是由字典的鍵決定的,而如今查詢是併發產生的,輸出的前後則會由每一個線程完成任務的順序來決定。
在以前 mtsleepX.py 的例子中,對全部線程使用了 Thread.join()用於阻塞執行,直到每一個線程都已退出。這能夠有效阻止主線程在全部子線程都完成以前繼續執行,因此輸出語句「all DONE at」能夠在正確的時間調用。
在這些例子中,對全部線程調用 join()並非必需的,由於它們不是守護線程。不管如何主線程都不會在派生線程完成以前退出腳本。因爲這個緣由,咱們將在 mtsleepF.py中刪除全部的 join()操做。不過,要意識到若是咱們在同一個地方顯示「all done」這是不正確的。
主線程會在其餘線程完成以前顯示「 all done」,因此咱們不能再把 print 調用放在_main()裏了。有兩個地方能夠放置 print 語句:第 37 行_main()返回以後(腳本最後一行),或者使用 atexit.register()來註冊一個退出函數。由於以前沒有討論事後面這種方法,並且它可能對你之後更有幫助,因此咱們認爲這是一個介紹它的好位置。此外,這仍是一個在 Python 2 和 3 中保持一致的接口,接下來咱們就要挑戰如何將這個程序移植到 Python 3中了。
下面咱們但願這個腳本可以在 Python 3 中運行。對於項目和應用而言,都須要繼續進行遷移,這是你必需要熟悉的事情。幸運的是,有一些工具能夠幫助咱們,其中之一是 2to3 這個工具。它的通常用法以下。
$ 2to3 foo.py # only output diff $ 2to3 -w foo.py # overwrites w/3.x code
在第一條命令中, 2to3 工具只是顯示原始腳本的 2.x 版本與其生成的等價的 3.x 版本的區別。而-w 標誌則讓 2to3 工具使用新生成的 3.x 版本的代碼重寫原始腳本,並將 2.x 版本重命名爲 foo.py.bak。
讓咱們對 bookrank.py 運行 2to3 工具,在已有的文件上進行改寫。除了給出區別外,它還會像以前描述的那樣保存新版本的腳本。
$ 2to3 -w bookrank.py RefactoringTool: Skipping implicit fixer: buffer RefactoringTool: Skipping implicit fixer: idioms RefactoringTool: Skipping implicit fixer: set_literal RefactoringTool: Skipping implicit fixer: ws_comma --- bookrank.py (original) +++ bookrank.py (refactored) @@ -4,7 +4,7 @@ from re import compile from threading import Thread from time import ctime -from urllib2 import urlopen as uopen +from urllib.request import urlopen as uopen REGEX = compile('#([\d,]+) in Books ') AMZN = 'http://amazon.com/dp/' @@ -21,17 +21,17 @@ return REGEX.findall(data)[0] def _showRanking(isbn): - print '- %r ranked %s' % ( - ISBNs[isbn], getRanking(isbn)) + print('- %r ranked %s' % ( + ISBNs[isbn], getRanking(isbn))) def _main(): - print 'At', ctime(), 'on Amazon...' + print('At', ctime(), 'on Amazon...') for isbn in ISBNs: Thread(target=_showRanking, args=(isbn,)).start()#_showRanking(isbn) @register def _atexit(): - print 'all DONE at:', ctime() + print('all DONE at:', ctime()) if __name__ == '__main__': _main() RefactoringTool: Files that were modified: RefactoringTool: bookrank.py
接下來的步驟對於讀者而言是可選的,咱們使用 POSIX 命令行將文件重命名爲bookrank.py 和 bookrank3.py(Windows 用戶應當使用 ren 命令)。
$ mv bookrank.py bookrank3.py $ mv bookrank.py.bak bookrank.py
若是你嘗試運行新生成的代碼,就會發現假定它是一個完美翻譯,不須要你再作任何操做的想法只是你的一廂情願。糟糕的事情發生了,你會在每一個線程執行時獲得以下異常信息(下面的輸出只針對一個線程,由於每一個線程的輸出都同樣)。
$ python3 bookrank3.py Exception in thread Thread-1: Traceback (most recent call last): File "/Library/Frameworks/Python.framework/Versions/ 3.2/lib/python3.2/threading.py", line 736, in _bootstrap_inner self.run() File "/Library/Frameworks/Python.framework/Versions/ 3.2/lib/python3.2/threading.py", line 689, in run self._target(*self._args, **self._kwargs) File "bookrank3.py", line 25, in _showRanking ISBNs[isbn], getRanking(isbn))) File "bookrank3.py", line 21, in getRanking return REGEX.findall(data)[0] TypeError: can't use a string pattern on a bytes-like object :
問題看起來是:正則表達式是一個 Unicode 字符串,而 urlopen()返回來的相似文件對象的結果通過 read()方法獲得的是一個 ASCII/bytes
字符串。這裏的修復方案是將其編譯爲一個bytes 對象,而不是文本字符串。所以,修改第 9 行,讓 re.compile()編譯一個 bytes 字符串(經過添加 bytes 字符串)。爲了作到這個,能夠在左側的引號前添加一個 bytes 字符串的標記 b,以下所示。
REGEX = compile(b'#([\d,]+) in Books ')
如今,讓咱們再試一次。
$ python3 bookrank3.py At Sun Apr 3 00:45:46 2011 on Amazon... - 'Core Python Programming' ranked b'108,796' - 'Python Web Development with Django' ranked b'268,660' - 'Python Fundamentals' ranked b'969,149' all DONE at: Sun Apr 3 00:45:49 2011
如今又是什麼問題呢?雖然這個輸出結果比以前要好一些(沒有錯誤),可是它看起來仍是有些奇怪。當傳給 str()時,正則表達式抓取的排名值顯示了 b 和引號。你的第一直覺多是嘗試醜陋的字符串切片。
>>> x = b'xxx' >>> repr(x) "b'xxx'" >>> str(x) "b'xxx'" >>> str(x)[2:-1] 'xxx'
不過,更合適的方法是將其轉換爲一個真正的(Unicode)字符串,可能會用到 UTF-8。
>>> str(x, 'utf-8') 'xxx'
爲了實現這一點,在腳本里,對第 53 行進行一個相似的修改,以下所示。
return str(REGEX.findall(data)[0], 'utf-8')
如今, Python 3 版本的腳本輸出就和 Python 2 的腳本一致了。
$ python3 bookrank3.py At Sun Apr 3 00:47:31 2011 on Amazon... - 'Python Fundamentals' ranked 969,149 - 'Python Web Development with Django' ranked 268,660 - 'Core Python Programming' ranked 108,796 all DONE at: Sun Apr 3 00:47:34 2011
通常來講,你會發現從 2.x 版本移植到 3.x 版本會遵循相似下面的模式:你須要確保全部的單元測試和集成測試都已經經過,使用 2to3(或其餘工具)進行全部的基礎修改,而後進行一些善後工做,讓代碼運行起來並經過相同的測試。咱們將在下一個例子中再次嘗試這個練習,這個例子將演示線程同步的使用。
在本章的主要部分,咱們瞭解了線程的基本概念,以及如何在 Python 應用中利用線程。然而,咱們遺漏了多線程編程中一個很是重要的方面:同步。通常在多線程代碼中,總會有一些特定的函數或代碼塊不但願(或不該該)被多個線程同時執行,一般包括修改數據庫、更新文件或其餘會產生競態條件的相似狀況。回顧本章前面的部分,若是兩個線程運行的順序發生變化,就有可能形成代碼的執行軌跡或行爲不相同,或者產生不一致的數據(能夠在 Wikipedia 頁面上閱讀有關競態條件的更多信息: http://en.wikipedia.org/wiki/Race_condition )。
這 就 是 需 要 使 用 同 步 的 情 況 。 當 任 意 數 量 的 線 程 可 以 訪 問 臨 界 區 的 代 碼
( http://en.wikipedia.org/wiki/Critical_section )但在給定的時刻只有一個線程能夠經過時,就是使用同步的時候了。程序員選擇適合的同步原語,或者線程控制機制來執行同步。進程同步有不一樣的類型(參見 http://en.wikipedia.org/wiki/Synchronization_(computer_science) ), Python 支持多種同步類型,能夠給你足夠多的選擇,以便選出最適合完成任務的那種類型。
本章前面對同步進行過一些介紹,因此這裏就使用其中兩種類型的同步原語演示幾個示例程序:鎖/互斥,以及信號量。鎖是全部機制中最簡單、最低級的機制,而信號量用於多線程競爭有限資源的狀況。鎖比較容易理解,所以先從鎖開始,而後再討論信號量。
鎖有兩種狀態:鎖定和未鎖定。並且它也只支持兩個函數:得到鎖和釋放鎖。它的行爲和你想象的徹底同樣。
當多線程爭奪鎖時,容許第一個得到鎖的線程進入臨界區,並執行代碼。全部以後到達的線程將被阻塞,直到第一個線程執行結束,退出臨界區,並釋放鎖。此時,其餘等待的線程能夠得到鎖並進入臨界區。不過請記住,那些被阻塞的線程是沒有順序的(即不是先到先執行),勝出線程的選擇是不肯定的,並且還會根據 Python 實現的不一樣而有所區別。
讓咱們來看看爲何鎖是必需的。 mtsleepF.py 應用派生了隨機數量的線程,當每一個線程執行結束時它會進行輸出。下面是其核心部分的源碼(Python 2)。
from atexit import register
from random import randrange
from threading import Thread, currentThread
from time import sleep, ctime
class CleanOutputSet(set):
def str(self):
return ', ‘.join(x for x in self)
loops = (randrange(2,5) for x in xrange(randrange(3,7)))
remaining = CleanOutputSet()
def loop(nsec):
myname = currentThread().name
remaining.add(myname)
print ‘[%s] Started %s’ % (ctime(), myname)
sleep(nsec)
remaining.remove(myname)
print ‘[%s] Completed %s (%d secs)’ % (
ctime(), myname, nsec)
print ’ (remaining: %s)’ % (remaining or ‘NONE’)
def _main():
for pause in loops:
Thread(target=loop, args=(pause,)).start()
@register
def _atexit():
print ‘all DONE at:’, ctime()
當咱們完成這個使用鎖的代碼後,會有一個比較詳細的逐行解釋,不過mtsleepF.py
所作的基本上就是以前例子的擴展。和 bookrank.py 同樣,爲了簡化代碼,沒有使用面向對象編程,刪除了線程對象列表和線程的 join(),重用了 atexit.register()(和 bookrank.py相同的緣由)。
另外一個和以前的那些 mtsleepX.py 例子不一樣的地方是,這裏再也不把循環/線程對硬編碼成睡眠 4 秒和 2 秒,而是將它們隨機地混合在一塊兒,建立 3~6 個線程,每一個線程睡眠2~4 秒。
這裏還有一個新功能,使用集合來記錄剩下的還在運行的線程。咱們對集合進行了子類化而不是直接使用,這是由於咱們想要演示另外一個用例:變動集合的默承認印字符串。當顯示一個集合時,你會獲得相似 set([X, Y, Z,…])這樣的輸出。而應用的用戶並不須要(也不該該)知道關於集合的信息,或者咱們使用了這些集合。咱們只須要顯示成相似 X, Y, Z, …這樣便可。這也就是派生了 set 類並實現了它的__str__()方法的緣由。
若是幸運,進行了這些改變以後,輸出將會按照適當的順序給出。
$ python mtsleepF.py [Sat Apr 2 11:37:26 2011] Started Thread-1 [Sat Apr 2 11:37:26 2011] Started Thread-2 [Sat Apr 2 11:37:26 2011] Started Thread-3 [Sat Apr 2 11:37:29 2011] Completed Thread-2 (3 secs) (remaining: Thread-3, Thread-1) [Sat Apr 2 11:37:30 2011] Completed Thread-1 (4 secs) (remaining: Thread-3) [Sat Apr 2 11:37:30 2011] Completed Thread-3 (4 secs) (remaining: NONE) all DONE at: Sat Apr 2 11:37:30 2011
不過,若是不幸,你將會獲得像下面幾對執行示例這樣奇怪的輸出結果。
$ python mtsleepF.py [Sat Apr 2 11:37:09 2011] Started Thread-1 [Sat Apr 2 11:37:09 2011] Started Thread-2 [Sat Apr 2 11:37:09 2011] Started Thread-3 [Sat Apr 2 11:37:12 2011] Completed Thread-1 (3 secs) [Sat Apr 2 11:37:12 2011] Completed Thread-2 (3 secs) (remaining: Thread-3) (remaining: Thread-3) [Sat Apr 2 11:37:12 2011] Completed Thread-3 (3 secs) (remaining: NONE) all DONE at: Sat Apr 2 11:37:12 2011 $ python mtsleepF.py [Sat Apr 2 11:37:56 2011] Started Thread-1 [Sat Apr 2 11:37:56 2011] Started Thread-2 [Sat Apr 2 11:37:56 2011] Started Thread-3 [Sat Apr 2 11:37:56 2011] Started Thread-4 [Sat Apr 2 11:37:58 2011] Completed Thread-2 (2 secs) [Sat Apr 2 11:37:58 2011] Completed Thread-4 (2 secs) (remaining: Thread-3, Thread-1) (remaining: Thread-3, Thread-1) [Sat Apr 2 11:38:00 2011] Completed Thread-1 (4 secs) (remaining: Thread-3) [Sat Apr 2 11:38:00 2011] Completed Thread-3 (4 secs) (remaining: NONE) all DONE at: Sat Apr 2 11:38:00 2011
那麼出現什麼問題了呢?一個問題是,輸出可能部分混亂(由於多個線程可能並行執行I/O)。一樣地,以前的幾個示例代碼也都有交錯輸出的問題存在。而另外一問題則出如今兩個線程修改同一個變量(剩餘線程名集合)時。
I/O 和訪問相同的數據結構都屬於臨界區,所以須要用鎖來防止多個線程同時進入臨界區。爲了加鎖,須要添加一行代碼來引入 Lock(或 RLock),而後建立一個鎖對象,所以須要添加/修改代碼以便在合適的位置上包含這些行。
from threading import Thread, Lock, currentThread
lock = Lock()
如今應該使用剛建立的這個鎖了。下面代碼中突出顯示的 acquire()和 release()調用就是應當在 loop()函數中添加的語句。
def loop(nsec):
myname = currentThread().name
lock.acquire()
remaining.add(myname)
print ‘[%s] Started %s’ % (ctime(), myname)
lock.release()
sleep(nsec)
lock.acquire()
remaining.remove(myname)
print ‘[%s] Completed %s (%d secs)’ % (
ctime(), myname, nsec)
print ’ (remaining: %s)’ % (remaining or ‘NONE’)
lock.release()
一旦作了這個改變,就不會再產生那種奇怪的輸出了。
$ python mtsleepF.py
[Sun Apr 3 23:16:59 2011] Started Thread-1
[Sun Apr 3 23:16:59 2011] Started Thread-2
[Sun Apr 3 23:16:59 2011] Started Thread-3
[Sun Apr 3 23:16:59 2011] Started Thread-4
[Sun Apr 3 23:17:01 2011] Completed Thread-3 (2 secs)
(remaining: Thread-4, Thread-2, Thread-1)
[Sun Apr 3 23:17:01 2011] Completed Thread-4 (2 secs)
(remaining: Thread-2, Thread-1)
[Sun Apr 3 23:17:02 2011] Completed Thread-1 (3 secs)
(remaining: Thread-2)
[Sun Apr 3 23:17:03 2011] Completed Thread-2 (4 secs)
(remaining: NONE)
all DONE at: Sun Apr 3 23:17:03 2011
修改後的最終版 mtsleepF.py
如示例 10 所示。
示例 10 鎖和更多的隨機性(
mtsleepF.py
)
在本示例中,演示了鎖和一些其餘線程工具的使用。
逐行解釋
第 1~6 行
這部分按照慣例是啓動行和導入模塊的行。請注意, threading.currentThread()從 2.6 版本開始重命名爲 threading.current_thread(),不過爲了保持後向兼容性,舊的寫法仍舊保留了下來。
第 8~10 行
這是以前提到過的集合的子類。它包括一個對__str__()的實現,能夠將默認輸出改變爲將其全部元素按照逗號分隔的字符串。
第 12~14 行
該部分包含 3 個全局變量:鎖;上面提到的修改後的集合類的實例;隨機數量的線程(3~6 個線程),每一個線程暫停或睡眠 2~4 秒。
第 16~28 行
loop()函數首先保存當前執行它的線程名,而後獲取鎖,以便使添加該線程名到 remaining集合以及指明啓動線程的輸出操做是原子的(沒有其餘線程能夠進入臨界區)。釋放鎖以後,這個線程按照預先指定的隨機秒數執行睡眠操做,而後從新得到鎖,進行最終輸出,最後釋放鎖。
第 30~39 行
只有不是爲了在其餘地方使用而導入的狀況下, _main()函數纔會執行。它的任務是派生和執行每一個線程。和以前提到的同樣,使用 atexit.register()來註冊_atexit()函數,以便讓解釋器在腳本退出前執行該函數。
做爲維護你本身的當前運行線程集合的一種替代方案,能夠考慮使用 threading.enumerate(),該方法會返回仍在運行的線程列表(包括守護線程,但不包括沒有啓動的線程)。在本例中並無使用這個方案,由於它會顯示兩個額外的線程,因此咱們須要刪除這兩個線程以保持輸出的簡潔。這兩個線程是當前線程(由於它還沒結束),以及主線程(沒有必要去顯示)。
此外,若是你使用的是 Python 2.6 或更新的版本(包括 3.x 版本),別忘了還可使用str.format()方法來代替字符串格式化操做符。換句話說, print 語句
print ‘[%s] Started %s’ % (ctime(), myname)
能夠在 2.6+版本中被替換成
print ‘[{0}] Started {1}’.format(ctime(), myname)
或者在 3.x 版本中調用 print()函數:
print(’[{0}] Started {1}’.format(ctime(), myname))
若是隻須要對當前運行的線程進行計數,那麼可使用 threading.activeCount()(2.6 版本開始重命名爲 active_count())來代替。
若是你使用 Python 2.5 或更新版本,還有一種方案能夠再也不調用鎖的 acquire()和 release()方法,從而更進一步簡化代碼。這就是使用 with 語句,此時每一個對象的上下文管理器負責在進入該套件以前調用 acquire()並在完成執行以後調用 release()。
threading 模塊的對象 Lock、 RLock、 Condition、 Semaphore 和 BoundedSemaphore 都包含上下文管理器,也就是說,它們均可以使用 with 語句。當使用 with 時,能夠進一步簡化 loop()循環,以下面的代碼所示。
from __future__ import with_statement # 2.5 only def loop(nsec): myname = currentThread().name with lock: remaining.add(myname) print '[%s] Started %s' % (ctime(), myname) sleep(nsec) with lock: remaining.remove(myname) print '[%s] Completed %s (%d secs)' % ( ctime(), myname, nsec) print ' (remaining: %s)' % ( remaining or 'NONE',)
如今經過對以前的腳本運行 2to3 工具,進行向 Python 3.x 版本的移植(下面的輸出進行了截斷,由於以前已經看到過完整的 diff 轉儲)。
$ 2to3 -w mtsleepF.py
RefactoringTool: Skipping implicit fixer: buffer
RefactoringTool: Skipping implicit fixer: idioms
RefactoringTool: Skipping implicit fixer: set_literal
RefactoringTool: Skipping implicit fixer: ws_comma
:
RefactoringTool: Files that were modified:
RefactoringTool: mtsleepF.py
當把 mtsleepF.py 重命名爲 mtsleepF3.py 並把 mtsleep.py.bak 重命名爲 mtsleepF.py後,咱們會發現,這一次出乎咱們的意料,這個腳本移植得很是完美,沒有出現任何問題。
$ python3 mtsleepF3.py
[Sun Apr 3 23:29:39 2011] Started Thread-1
[Sun Apr 3 23:29:39 2011] Started Thread-2
[Sun Apr 3 23:29:39 2011] Started Thread-3
[Sun Apr 3 23:29:41 2011] Completed Thread-3 (2 secs)
(remaining: Thread-2, Thread-1)
[Sun Apr 3 23:29:42 2011] Completed Thread-2 (3 secs)
(remaining: Thread-1)
[Sun Apr 3 23:29:43 2011] Completed Thread-1 (4 secs)
(remaining: NONE)
all DONE at: Sun Apr 3 23:29:43 2011
如今讓咱們帶着關於鎖的知識,開始介紹信號量,而後看一個既使用了鎖又使用了信號量的例子。
如前所述,鎖很是易於理解和實現,也很容易決定什麼時候須要它們。然而,若是狀況更加複雜,你可能須要一個更強大的同步原語來代替鎖。對於擁有有限資源的應用來講,使用信號量多是個不錯的決定。
信號量是最古老的同步原語之一。它是一個計數器,當資源消耗時遞減,當資源釋放時遞增。你能夠認爲信號量表明它們的資源可用或不可用。消耗資源使計數器遞減的操做習慣上稱爲 P() (來源於荷蘭單詞 probeer/proberen),也稱爲 wait、try、acquire、pend 或 procure。
相對地,當一個線程對一個資源完成操做時,該資源須要返回資源池中。這個操做通常稱爲 V()(來源於荷蘭單詞 verhogen/verhoog),也稱爲 signal、 increment、 release、 post、 vacate。
Python 簡化了全部的命名,使用和鎖的函數/方法同樣的名字: acquire 和 release。信號量比鎖更加靈活,由於能夠有多個線程,每一個線程擁有有限資源的一個實例。
在下面的例子中,咱們將模擬一個簡化的糖果機。這個特製的機器只有 5 個可用的槽來保持庫存(糖果)。若是全部的槽都滿了,糖果就不能再加到這個機器中了;類似地,若是每一個槽都空了,想要購買的消費者就沒法買到糖果了。咱們可使用信號量來跟蹤這些有限的資源(糖果槽)。示例 11 爲其源代碼(candy.py
)。
示例 4-11 糖果機和信號量(
candy.py
)
該腳本使用了鎖和信號量來模擬一個糖果機。
第 1~6 行
啓動行和導入模塊的行與本章中以前的例子很是類似。惟一新增的東西是信號量。
threading 模塊包括兩種信號量類: Semaphore 和 BoundedSemaphore。如你所知,信號量實際上就是計數器,它們從固定數量的有限資源起始。
當分配一個單位的資源時,計數器值減 1,而當一個單位的資源返回資源池時,計數器值加 1。 BoundedSemaphore 的一個額外功能是這個計數器的值永遠不會超過它的初始值,換句話說,它能夠防範其中信號量釋放次數多於得到次數的異經常使用例。
第 8~10 行
這個腳本的全局變量包括:一個鎖,一個表示庫存商品最大值的常量,以及糖果托盤。
第 12~21 行
當虛構的糖果機全部者向庫存中添加糖果時,會執行 refill()函數。這段代碼是一個臨界區,這就是爲何獲取鎖是執行全部行的僅有方法。代碼會輸出用戶的行動,並在某人添加的糖果超過最大庫存時給予警告(第 17~18 行)。
第 23~30 行
buy()是和 refill()相反的函數,它容許消費者獲取一個單位的庫存。條件語句(第 26 行)檢測是否全部資源都已經消費完。計數器的值不能小於 0,所以這個調用通常會在計數器再次增長以前被阻塞。經過傳入非阻塞的標誌 False,讓調用再也不阻塞,而在應當阻塞的時候返回一個 False,指明沒有更多的資源了。
第 32~40 行
producer()和 consumer()函數都只包含一個循環,進行對應的 refill()和 buy()調用,並在調用間暫停。
第 42~55 行
代碼的剩餘部分包括:對_main()的調用(若是腳本從命令行執行),退出函數的註冊,以及最後的_main()函數提供表示糖果庫存生產者和消費者的新建立線程對。
建立消費者/買家的線程時進行了額外的數學操做,用於隨機給出正誤差,使得消費者真正消費的糖果數可能會比供應商/生產者放入機器的更多(不然,代碼將永遠不會進入消費者嘗試從空機器購買糖果的狀況)。運行腳本,會產生相似下面的輸出結果。
$ python candy.py
starting at: Mon Apr 4 00:56:02 2011
THE CANDY MACHINE (full with 5 bars)!
Buying candy… OK
Refilling candy… OK
Refilling candy… full, skipping
Buying candy… OK
Buying candy… OK
Refilling candy… OK
Buying candy… OK
Buying candy… OK
Buying candy… OK
all DONE at: Mon Apr 4 00:56:08 2011
與 mtsleepF.py 相似 candy.py,又是一個使用 2to3 工具生成可運行的 Python 3 版本的例子,這裏將其重命名爲 candy3.py。將把此次移植做爲一個練習留給讀者來完成。
這裏只演示了 threading 模塊的兩個同步原語,還有不少同步原語須要你去探索。不過,請記住它們只是原語。雖然使用它們來構建你本身的線程安全的類和數據結構沒有問題,可是要了解 Python 標準庫中也包含了一個實現: Queue 對象。
核心提示:進行調試
在某種狀況下,你可能須要調試一個使用了信號量的腳本,此時你可能須要知道在任意給定時刻信號量計數器的精確值。在本章結尾的一個練習中,你將爲 candy.py 實現一個顯示計數器值的解決方案,或許能夠將其稱爲 candydebug.py。爲了作到這一點,須要查閱threading.py 的源碼(可能須要查閱 Python 2 和 Python 3 兩個版本)。
你會發現 threading 模塊的同步原語並非類名,即使它們使用了駝峯式拼寫方法,看起來像是類名。實際上,它們是僅有一行的函數,用來實例化你認爲的那個類的對象。
這裏有兩個問題須要考慮:其一,你不能對它們子類化(由於它們是函數);其二,變量名在 2.x 和 3.x 版本間發生了改變。
若是這個對象能夠給你整潔/簡單地訪問計數器的方法,整個問題就能夠避免了,但實際上並無。如前所述,計數器的值只是類的一個屬性,因此能夠直接訪問它,這個變量名從 Python2 版本的 self.__value,即 self._Semaphore__value,變成了 Python 3 版本的 self.value。
對於開發者而言,最簡潔的 API(至少咱們的意見)是繼承 threading.BoundedSemaphore類,並實現一個__len()方法,不過要注意,若是你計劃對 2.x 和 3.x 版本都支持,仍是須要使用剛纔討論過的那個正確的計數器值。
最後一個例子演示了生產者-消費者模型這個場景。在這個場景下,商品或服務的生產者生產商品,而後將其放到相似隊列的數據結構中。生產商品的時間是不肯定的,一樣消費者消費生產者生產的商品的時間也是不肯定的。
咱們使用 Queue 模塊(Python 2.x 版本,在 Python 3.x 版本中重命名爲 queue)來提供線程間通訊的機制,從而讓線程之間能夠互相分享數據。具體而言,就是建立一個隊列,讓生產者(線程)在其中放入新的商品,而消費者(線程)消費這些商品。表 4-5 列舉了這個模塊中的一些屬性。
表 4-5 Queue/queue 模塊經常使用屬性
屬 性 描 述
Queue/queue 模塊的類
Queue(maxsize=0) 建立一個先入先出隊列。若是給定最大值,則在隊列沒有空間時阻塞;不然(沒
有指定最大值),爲無限隊列
LifoQueue(maxsize=0) 建立一個後入先出隊列。若是給定最大值,則在隊列沒有空間時阻塞;不然(沒
有指定最大值),爲無限隊列
屬 性 描 述
PriorityQueue(maxsize=0) 建立一個優先級隊列。若是給定最大值,則在隊列沒有空間時阻塞,不然(沒
有指定最大值) ,爲無限隊列
Queue/queue 異常
Empty 當對空隊列調用 get*()方法時拋出異常
Full 當對已滿的隊列調用 put*()方法時拋出異常
Queue/queue 對象方法
qsize () 返回隊列大小(因爲返回時隊列大小可能被其餘線程修改,因此該值爲近似值)
empty() 若是隊列爲空,則返回 True;不然,返回 False
full() 若是隊列已滿,則返回 True;不然,返回 False
put (item, block=Ture, timeout=None) 將 item 放入隊列。若是 block 爲 True(默認)且 timeout 爲 None,則在有可用
空間以前阻塞;若是 timeout 爲正值,則最多阻塞 timeout 秒;若是 block 爲 False,
則拋出 Empty 異常
put_nowait(item) 和 put(item, False)相同
get (block=True, timeout=None) 從隊列中取得元素。若是給定了 block(非 0),則一直阻塞到有可用的元素
爲止
get_nowait() 和 get(False)相同
task_done() 用於表示隊列中的某個元素已執行完成,該方法會被下面的 join()使用
join() 在隊列中全部元素執行完畢並調用上面的 task_done()信號以前,保持阻塞
咱們將使用示例 4-12( prodcons.py)來演示生產者-消費者 Queue/queue。下面是這個腳本某次執行的輸出。
$ prodcons.py
starting writer at: Sun Jun 18 20:27:07 2006
producing object for Q… size now 1
starting reader at: Sun Jun 18 20:27:07 2006
consumed object from Q… size now 0
producing object for Q… size now 1
consumed object from Q… size now 0
producing object for Q… size now 1
producing object for Q… size now 2
producing object for Q… size now 3
consumed object from Q… size now 2
consumed object from Q… size now 1
writer finished at: Sun Jun 18 20:27:17 2006
consumed object from Q… size now 0
reader finished at: Sun Jun 18 20:27:25 2006
all DONE
示例 12 生產者-消費者問題(
prodcons.py
)
該生產者-消費者問題的實現使用了 Queue 對象,以及隨機生產(消費)的商品的數量。生產者和消費者獨立且併發地執行線程。
如你所見,生產者和消費者並不須要輪流執行。(感謝隨機數!)嚴格來講,現實生活一般都是隨機和不肯定的。
第 1~6 行
在本模塊中,使用了 Queue.Queue 對象,以及以前給出的 myThread.MyThread 線程類。另外還使用了 random.randint()以使生產和消費的數量有所不一樣(注意, random.randint()與random.randrange()相似,不過它會包括其上限值)。
第 8~16 行
writeQ()和 readQ()函數分別用於將一個對象(例如,咱們這裏使用的字符串’xxx’)放入隊列中和消費隊列中的一個對象。注意,咱們每次只會生產或讀取一個對象。
第 18~26 行
writer()將做爲單個線程運行,其目的只有一個:向隊列中放入一個對象,等待片刻,而後重複上述步驟,直至達到每次腳本執行時隨機生成的次數爲止。 reader()與之相似,只不過變成了消耗對象。
你會注意到, writer 睡眠的隨機秒數一般比 reader 的要短。這是爲了阻礙 reader 從空隊列中獲取對象。經過給 writer 一個更短的等候時間,使得輪到 reader 時,已存在可消費對象的可能性更大。
第 28~29 行
這兩行用於設置派生和執行的線程總數。
第 31~47 行
最後是 main()函數,該函數和本章中其餘腳本的 main()函數都很是類似。這裏建立合適的線程並讓它們執行,當兩個線程都執行完畢後結束。
從本例中能夠得出,對於一個要執行多個任務的程序,可讓每一個任務使用單獨的線程。相比於使用單線程程序完成全部任務,這種程序設計方式更加整潔。
本章闡述了單線程進程是如何限制應用的性能的。尤爲是對於那些任務執行順序存在着獨立性、不肯定性以及非因果性的程序而言,把多個任務分配到不一樣線程執行對性能的改善會很是大。因爲線程的開銷以及 Python 解釋器是單線程應用這個事實,並非全部應用均可以從多線程中獲益,不過如今你已經瞭解到了 Python 多線程的功能,你能夠在適當的時候使用該工具來發揮它的優點。
在開始編寫多線程應用以前,先作一個快速回顧:一般來講,多線程是一個好東西。不過,因爲 Python 的 GIL 的限制,多線程更適合於 I/O 密集型應用(I/O 釋放了 GIL,能夠容許更多的併發),而不是計算密集型應用。對於後一種狀況而言,爲了實現更好的並行性,你須要使用多進程,以便讓 CPU 的其餘內核來執行。
這裏將再也不進行詳細介紹(這個主題內已經在 Core Python Programming 或 Core Python Language Fundamentals 的「執行環境」章節中有所涵蓋),對於多線程或多進程而言, threading模塊的主要替代品包括如下幾個。
這是派生進程的主要替代方案,能夠單純地執行任務,或者經過標準文件(stdin、 stdout、stderr)進行進程間通訊。該模塊自 Python 2.4 版本起引入。
該模塊自 Python 2.6 版本起引入,容許爲多核或多 CPU 派生進程,其接口與 threading模塊很是類似。該模塊一樣也包括在共享任務的進程間傳輸數據的多種方式。
這是一個新的高級庫,它只在「任務」級別進行操做,也就是說,你再也不須要過度關注同步和線程/進程的管理了。你只須要指定一個給定了「worker」數量的線程/進程池,提交任務,而後整理結果。該模塊自 Python 3.2 版本起引入,不過有一個 Python 2.6+可以使用的移植版本,其網址爲 http://code.google.com/p/pythonfutures。
使用該模塊重寫後 bookrank3.py 會是什麼樣子呢?假定代碼的其餘部分保持不變,下面的代碼是新模塊的導入以及對_main()函數的修改。
from concurrent.futures import ThreadPoolExecutor
. . .
def _main():
print(‘At’, ctime(), ‘on Amazon…’)
with ThreadPoolExecutor(3) as executor:
for isbn in ISBNs:
executor.submit(_showRanking, isbn)
print(‘all DONE at:’, ctime())
傳遞給 concurrent.futures.ThreadPoolExecutor 的參數是線程池的大小,在這個應用裏就是指要查閱排名的 3 本書。固然,這是個 I/O 密集型應用,所以多線程更有用。而對於計算密集型應用而言,可使用 concurrent.futures.ProcessPoolExecutor 來代替。
當咱們獲得執行器(不管線程仍是進程)以後,它負責調度任務和整理結果,就能夠調用它的 submit()方法,來執行以前須要派生線程才能運行的那些操做了。
若是咱們作一個到Python 3的徹底移植,方法是將字符串格式化操做符替換爲str.format()方法,自由利用 with 語句,並使用執行器的 map()方法,那麼咱們徹底能夠刪除_showRanking()函數並將其功能混入_main()函數中。示例 4-13 的 bookrank3CF.py 是該腳本的最終版本。
示例 13 高級任務管理(
bookrank3CF.py
)
使用了 concurrent.futures 模塊的圖書排名 screenscraper。
逐行解釋
第 1~14 行
除了新的 import 語句之外,該腳本的前半部分都和本章以前的 bookrank3.py 相同。
第 16~18 行
新的 getRanking()函數使用了 with 語句以及 str.format()。也能夠對 bookrank.py 進行相同的修改,由於這些功能在 Python 2.6+版本上都是可用的(它們不僅用於 3.x 版本)。
第 20~26 行
在前面的例子中,使用了 executor.submit()來派生做業。這裏使用 executor.map()進行輕微的調整,從而將_showRanking()函數的功能合併進來,而後將該函數從代碼中徹底刪除。
輸出結果與以前看到的基本一致。
$ python3 bookrank3CF.py
At Wed Apr 6 00:21:50 2011 on Amazon…
能夠在如下連接中獲取到更多關於 concurrent.futures 模塊的信息。
• http://docs.python.org/dev/py3k/library/concurrent.futures.html
• http://code.google.com/p/pythonfutures/
• http://www.python.org/dev/peps/pep-3148/
下一節將對上述這些選擇以及其餘與線程相關的模塊和包進行總結。
表 4-6 列出了多線程應用編程中可能會使用到的一些模塊。
表 4-6 與線程相關的標準庫模塊
模 塊 描 述
thread① 基本的、低級別的線程模塊
threading 高級別的線程和同步對象
multiprocessing② 使用「threading」接口派生/使用子進程
subprocess③ 徹底跳過線程,使用進程來執行
Queue 供多線程使用的同步先入先出隊列
mutex④ 互斥對象
concurrent.futures⑤ 異步執行的高級別庫
SocketServer 建立/管理線程控制的 TCP/UDP 服務器
① 在 Python 3.0 中重命名爲_thread。 ② 自 Python 2.6 版本開始引入。 ③ 自 Python 2.4 版本開始引入。 ④ 自 Python 2.6 版本起不建議使用,並在 Python 3.0 版本移除。 ⑤ 自 Python 3.2 版本引入(可是能夠經過非標準庫的方式在 2.6+版本上使用)。
參考:
Python3入門之線程threading經常使用方法 https://www.cnblogs.com/chengd/articles/7770898.html
Python 3.2 版本中引入。 ↩︎