線程有5種狀態,狀態轉換的過程以下圖所示:html
多 線程的優點在於能夠同時運行多個任務(至少感受起來是這樣)。可是當線程須要共享數據時,可能存在數據不一樣步的問題。考慮這樣一種狀況:一個列表裏全部元 素都是0,線程"set"從後向前把全部元素改爲1,而線程"print"負責從前日後讀取列表並打印。那麼,可能線程"set"開始改的時候,線 程"print"便來打印列表了,輸出就成了一半0一半1,這就是數據的不一樣步。爲了不這種狀況,引入了鎖的概念。編程
鎖有兩種狀態—— 鎖定和未鎖定。每當一個線程好比"set"要訪問共享數據時,必須先得到鎖定;若是已經有別的線程好比"print"得到鎖定了,那麼就讓線程"set" 暫停,也就是同步阻塞;等到線程"print"訪問完畢,釋放鎖之後,再讓線程"set"繼續。通過這樣的處理,打印列表時要麼所有輸出0,要麼所有輸出 1,不會再出現一半0一半1的尷尬場面。編程語言
線程與鎖的交互以下圖所示:ide
然 而還有另一種尷尬的狀況:列表並非一開始就有的;而是經過線程"create"建立的。若是"set"或者"print" 在"create"尚未運行的時候就訪問列表,將會出現一個異常。使用鎖能夠解決這個問題,可是"set"和"print"將須要一個無限循環——他們 不知道"create"何時會運行,讓"create"在運行後通知"set"和"print"顯然是一個更好的解決方案。因而,引入了條件變量。函數
條件變量容許線程好比"set"和"print"在條件不知足的時候(列表爲None時)等待,等到條件知足的時候(列表已經建立)發出一個通知,告訴"set" 和"print"條件已經有了,大家該起牀幹活了;而後"set"和"print"才繼續運行。ui
線程與條件變量的交互以下圖所示:spa
最後看看線程運行和阻塞狀態的轉換。線程
阻塞有三種狀況:
同步阻塞是指處於競爭鎖定的狀態,線程請求鎖定時將進入這個狀態,一旦成功得到鎖定又恢復到運行狀態;
等待阻塞是指等待其餘線程通知的狀態,線程得到條件鎖定後,調用「等待」將進入這個狀態,一旦其餘線程發出通知,線程將進入同步阻塞狀態,再次競爭條件鎖定;
而其餘阻塞是指調用time.sleep()、anotherthread.join()或等待IO時的阻塞,這個狀態下線程不會釋放已得到的鎖定。 設計
tips: 若是能理解這些內容,接下來的主題將是很是輕鬆的;而且,這些內容在大部分流行的編程語言裏都是同樣的。(意思就是非看懂不可 >_< 嫌做者水平低找別人的教程也要看懂)code
Python經過兩個標準庫thread和threading提供對線程的支持。thread提供了低級別的、原始的線程以及一個簡單的鎖。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# encoding: UTF-8
import
thread
import
time
# 一個用於在線程中執行的函數
def
func():
for
i
in
range
(
5
):
print
'func'
time.sleep(
1
)
# 結束當前線程
# 這個方法與thread.exit_thread()等價
thread.exit()
# 當func返回時,線程一樣會結束
# 啓動一個線程,線程當即開始運行
# 這個方法與thread.start_new_thread()等價
# 第一個參數是方法,第二個參數是方法的參數
thread.start_new(func, ())
# 方法沒有參數時須要傳入空tuple
# 建立一個鎖(LockType,不能直接實例化)
# 這個方法與thread.allocate_lock()等價
lock
=
thread.allocate()
# 判斷鎖是鎖定狀態仍是釋放狀態
print
lock.locked()
# 鎖一般用於控制對共享資源的訪問
count
=
0
# 得到鎖,成功得到鎖定後返回True
# 可選的timeout參數不填時將一直阻塞直到得到鎖定
# 不然超時後將返回False
if
lock.acquire():
count
+
=
1
# 釋放鎖
lock.release()
# thread模塊提供的線程都將在主線程結束後同時結束
time.sleep(
6
)
|
thread 模塊提供的其餘方法:
thread.interrupt_main(): 在其餘線程中終止主線程。
thread.get_ident(): 得到一個表明當前線程的魔法數字,經常使用於從一個字典中得到線程相關的數據。這個數字自己沒有任何含義,而且當線程結束後會被新線程複用。
thread還提供了一個ThreadLocal類用於管理線程相關的數據,名爲 thread._local,threading中引用了這個類。
因爲thread提供的線程功能很少,沒法在主線程結束後繼續運行,不提供條件變量等等緣由,通常不使用thread模塊,這裏就很少介紹了。
threading基於Java的線程模型設計。鎖(Lock)和條件變量(Condition)在Java中是對象的基本行爲(每個對象都自帶 了鎖和條件變量),而在Python中則是獨立的對象。Python Thread提供了Java Thread的行爲的子集;沒有優先級、線程組,線程也不能被中止、暫停、恢復、中斷。Java Thread中的部分被Python實現了的靜態方法在threading中以模塊方法的形式提供。
threading 模塊提供的經常使用方法:
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
threading模塊提供的類:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local.
Thread是線程類,與Java相似,有兩種使用方法,直接傳入要運行的方法或從Thread繼承並覆蓋run():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# encoding: UTF-8
import
threading
# 方法1:將要執行的方法做爲參數傳給Thread的構造方法
def
func():
print
'func() passed to Thread'
t
=
threading.Thread(target
=
func)
t.start()
# 方法2:從Thread繼承,並重寫run()
class
MyThread(threading.Thread):
def
run(
self
):
print
'MyThread extended from Thread'
t
=
MyThread()
t.start()
|
構造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
group: 線程組,目前尚未實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 線程名;
args/kwargs: 要傳入方法的參數。
實例方法:
isAlive(): 返回線程是否在運行。正在運行指啓動後、終止前。
get/setName(name): 獲取/設置線程名。
is/setDaemon(bool): 獲取/設置是否守護線程。初始值從建立該線程的線程繼承。當沒有非守護線程仍在運行時,程序將終止。
start(): 啓動線程。
join([timeout]): 阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。
一個使用join()的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# encoding: UTF-8
import
threading
import
time
def
context(tJoin):
print
'in threadContext.'
tJoin.start()
# 將阻塞tContext直到threadJoin終止。
tJoin.join()
# tJoin終止後繼續執行。
print
'out threadContext.'
def
join():
print
'in threadJoin.'
time.sleep(
1
)
print
'out threadJoin.'
tJoin
=
threading.Thread(target
=
join)
tContext
=
threading.Thread(target
=
context, args
=
(tJoin,))
tContext.start()
|
運行結果:
in threadContext.
in threadJoin.
out threadJoin.
out threadContext.
Lock(指令鎖)是可用的最低級的同步指令。Lock處於鎖定狀態時,不被特定的線程擁有。Lock包含兩種狀態——鎖定和非鎖定,以及兩個基本的方法。
能夠認爲Lock有一個鎖定池,當線程請求鎖定時,將線程至於池中,直到得到鎖定後出池。池中的線程處於狀態圖中的同步阻塞狀態。
構造方法:
Lock()
實例方法:
acquire([timeout]): 使線程進入同步阻塞狀態,嘗試得到鎖定。
release(): 釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
# encoding: UTF-8
import
threading
import
time
data
=
0
lock
=
threading.Lock()
def
func():
global
data
print
'%s acquire lock...'
%
threading.currentThread().getName()
# 調用acquire([timeout])時,線程將一直阻塞,
# 直到得到鎖定或者直到timeout秒後(timeout參數可選)。
# 返回是否得到鎖。
if
lock.acquire():
print
'%s get the lock.'
%
threading.currentThread().getName()
data
+
=
1
time.sleep(
2
)
print
'%s release lock...'
%
threading.currentThread().getName()
# 調用release()將釋放鎖。
lock.release()
t1
=
threading.Thread(target
=
func)
t2
=
threading.Thread(target
=
func)
t3
=
threading.Thread(target
=
func)
t1.start()
t2.start()
t3.start()
|
RLock(可重入鎖)是一個能夠被同一個線程請求屢次的同步指令。RLock使用了「擁有的線程」和「遞歸等級」的概念,處於鎖定狀態時,RLock被某個線程擁有。擁有RLock的線程能夠再次調用acquire(),釋放鎖時須要調用release()相同次數。
能夠認爲RLock包含一個鎖定池和一個初始值爲0的計數器,每次成功調用 acquire()/release(),計數器將+1/-1,爲0時鎖處於未鎖定狀態。
構造方法:
RLock()
實例方法:
acquire([timeout])/release(): 跟Lock差很少。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# encoding: UTF-8
import
threading
import
time
rlock
=
threading.RLock()
def
func():
# 第一次請求鎖定
print
'%s acquire lock...'
%
threading.currentThread().getName()
if
rlock.acquire():
print
'%s get the lock.'
%
threading.currentThread().getName()
time.sleep(
2
)
# 第二次請求鎖定
print
'%s acquire lock again...'
%
threading.currentThread().getName()
if
rlock.acquire():
print
'%s get the lock.'
%
threading.currentThread().getName()
time.sleep(
2
)
# 第一次釋放鎖
print
'%s release lock...'
%
threading.currentThread().getName()
rlock.release()
time.sleep(
2
)
# 第二次釋放鎖
print
'%s release lock...'
%
threading.currentThread().getName()
rlock.release()
t1
=
threading.Thread(target
=
func)
t2
=
threading.Thread(target
=
func)
t3
=
threading.Thread(target
=
func)
t1.start()
t2.start()
t3.start()
|
Condition(條件變量)一般與一個鎖關聯。須要在多個Contidion中共享一個鎖時,能夠傳遞一個Lock/RLock實例給構造方法,不然它將本身生成一個RLock實例。
能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。
構造方法:
Condition([lock/rlock])
實例方法:
acquire([timeout])/release(): 調用關聯的鎖的相應方法。
wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試得到鎖定(進入鎖定池);其餘線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
notifyAll(): 調用這個方法將通知等待池中全部的線程,這些線程都將進入鎖定池嘗試得到鎖定。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
例子是很常見的生產者/消費者模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# encoding: UTF-8
import
threading
import
time
# 商品
product
=
None
# 條件變量
con
=
threading.Condition()
# 生產者方法
def
produce():
global
product
if
con.acquire():
while
True
:
if
product
is
None
:
print
'produce...'
product
=
'anything'
# 通知消費者,商品已經生產
con.notify()
# 等待通知
con.wait()
time.sleep(
2
)
# 消費者方法
def
consume():
global
product
if
con.acquire():
while
True
:
if
product
is
not
None
:
print
'consume...'
product
=
None
# 通知生產者,商品已經沒了
con.notify()
# 等待通知
con.wait()
time.sleep(
2
)
t1
=
threading.Thread(target
=
produce)
t2
=
threading.Thread(target
=
consume)
t2.start()
t1.start()
|
Semaphore(信號量)是計算機科學史上最古老的同步指令之一。Semaphore管理一個內置的計數器,每當調用acquire()時 -1,調用release() 時+1。計數器不能小於0;當計數器爲0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。
基於這個特色,Semaphore常常用來同步一些有「訪客上限」的對象,好比鏈接池。
BoundedSemaphore 與Semaphore的惟一區別在於前者將在調用release()時檢查計數器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
構造方法:
Semaphore(value=1): value是計數器的初始值。
實例方法:
acquire([timeout]): 請求Semaphore。若是計數器爲0,將阻塞線程至同步阻塞狀態;不然將計數器-1並當即返回。
release(): 釋放Semaphore,將計數器+1,若是使用BoundedSemaphore,還將進行釋放次數檢查。release()方法不檢查線程是否已得到 Semaphore。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# encoding: UTF-8
import
threading
import
time
# 計數器初值爲2
semaphore
=
threading.Semaphore(
2
)
def
func():
# 請求Semaphore,成功後計數器-1;計數器爲0時阻塞
print
'%s acquire semaphore...'
%
threading.currentThread().getName()
if
semaphore.acquire():
print
'%s get semaphore'
%
threading.currentThread().getName()
time.sleep(
4
)
# 釋放Semaphore,計數器+1
print
'%s release semaphore'
%
threading.currentThread().getName()
semaphore.release()
t1
=
threading.Thread(target
=
func)
t2
=
threading.Thread(target
=
func)
t3
=
threading.Thread(target
=
func)
t4
=
threading.Thread(target
=
func)
t1.start()
t2.start()
t3.start()
t4.start()
time.sleep(
2
)
# 沒有得到semaphore的主線程也能夠調用release
# 若使用BoundedSemaphore,t4釋放semaphore時將拋出異常
print
'MainThread release semaphore without acquire'
semaphore.release()
|
Event(事件)是最簡單的線程通訊機制之一:一個線程通知事件,其餘線程等待事件。Event內置了一個初始爲False的標誌,當調用set()時設爲True,調用clear()時重置爲 False。wait()將阻塞線程至等待阻塞狀態。
Event其實就是一個簡化版的 Condition。Event沒有鎖,沒法使線程進入同步阻塞狀態。
構造方法:
Event()
實例方法:
isSet(): 當內置標誌爲True時返回True。
set(): 將標誌設爲True,並通知全部處於等待阻塞狀態的線程恢復運行狀態。
clear(): 將標誌設爲False。
wait([timeout]): 若是標誌爲True將當即返回,不然阻塞線程至等待阻塞狀態,等待其餘線程調用set()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# encoding: UTF-8
import
threading
import
time
event
=
threading.Event()
def
func():
# 等待事件,進入等待阻塞狀態
print
'%s wait for event...'
%
threading.currentThread().getName()
event.wait()
# 收到事件後進入運行狀態
print
'%s recv event.'
%
threading.currentThread().getName()
t1
=
threading.Thread(target
=
func)
t2
=
threading.Thread(target
=
func)
t1.start()
t2.start()
time.sleep(
2
)
# 發送事件通知
print
'MainThread set event.'
event.
set
()
|
Timer(定時器)是Thread的派生類,用於在指定時間後調用一個方法。
構造方法:
Timer(interval, function, args=[], kwargs={})
interval: 指定的時間
function: 要執行的方法
args/kwargs: 方法的參數
實例方法:
Timer從Thread派生,沒有增長實例方法。
1
2
3
4
5
6
7
8
|
# encoding: UTF-8
import
threading
def
func():
print
'hello timer!'
timer
=
threading.Timer(
5
, func)
timer.start()
|
local是一個小寫字母開頭的類,用於管理 thread-local(線程局部的)數據。對於同一個local,線程沒法訪問其餘線程設置的屬性;線程設置的屬性不會被其餘線程設置的同名屬性替換。
能夠把local當作是一個「線程-屬性字典」的字典,local封裝了從自身使用線程做爲 key檢索對應的屬性字典、再使用屬性名做爲key檢索屬性值的細節。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# encoding: UTF-8
import
threading
local
=
threading.local()
local.tname
=
'main'
def
func():
local.tname
=
'notmain'
print
local.tname
t1
=
threading.Thread(target
=
func)
t1.start()
t1.join()
print
local.tname
|
熟練掌握Thread、Lock、Condition就能夠應對絕大多數須要使用線程的場合,某些狀況下local也是很是有用的東西。本文的最後使用這幾個類展現線程基礎中提到的場景:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# encoding: UTF-8
import
threading
alist
=
None
condition
=
threading.Condition()
def
doSet():
if
condition.acquire():
while
alist
is
None
:
condition.wait()
for
i
in
range
(
len
(alist))[::
-
1
]:
alist[i]
=
1
condition.release()
def
doPrint():
if
condition.acquire():
while
alist
is
None
:
condition.wait()
for
i
in
alist:
print
i,
print
condition.release()
def
doCreate():
global
alist
if
condition.acquire():
if
alist
is
None
:
alist
=
[
0
for
i
in
range
(
10
)]
condition.notifyAll()
condition.release()
tset
=
threading.Thread(target
=
doSet,name
=
'tset'
)
tprint
=
threading.Thread(target
=
doPrint,name
=
'tprint'
)
tcreate
=
threading.Thread(target
=
doCreate,name
=
'tcreate'
)
tset.start()
tprint.start()
tcreate.start()
|