Python多線程

threading基於Java的線程模型設計。鎖(Lock)和條件變量(Condition)在Java中是對象的基本行爲(每個對象都自帶了鎖和條件變量),而在Python中則是獨立的對象。Python Thread提供了Java Thread的行爲的子集;沒有優先級、線程組,線程也不能被中止、暫停、恢復、中斷。Java Thread中的部分被Python實現了的靜態方法在threading中以模塊方法的形式提供。java

threading 模塊提供的經常使用方法: 
threading.currentThread(): 返回當前的線程變量。 
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。ui

threading模塊提供的類:  
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local.
spa

3.1. Thread

Thread是線程類,與Java相似,有兩種使用方法,直接傳入要運行的方法或從Thread繼承並覆蓋run():.net

# encoding: UTF-8
import threading

# 方法1:將要執行的方法做爲參數傳給Thread的構造方法
def func():
    print 'func() passed to Thread'

t = threading.Thread(target=func)
t.start()

# 方法2:從Thread繼承,並重寫run()
class MyThread(threading.Thread):
    def run(self):
        print 'MyThread extended from Thread'

t = MyThread()
t.start()

構造方法: 
Thread(group=None, target=None, name=None, args=(), kwargs={}) 

group: 線程組,目前尚未實現,庫引用中提示必須是None; 
target: 要執行的方法; 
name: 線程名; 
args/kwargs: 要傳入方法的參數。線程

實例方法: 
isAlive(): 返回線程是否在運行。正在運行指啓動後、終止前。 

get/setName(name): 獲取/設置線程名。 
is/setDaemon(bool): 獲取/設置是否守護線程。初始值從建立該線程的線程繼承。當沒有非守護線程仍在運行時,程序將終止。 
start(): 啓動線程。 
join([timeout]): 阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。設計

一個使用join()的例子:code

# encoding: UTF-8
import threading
import time

def context(tJoin):
    print 'in threadContext.'
    tJoin.start()
   
    # 將阻塞tContext直到threadJoin終止。
    tJoin.join()
   
    # tJoin終止後繼續執行。
    print 'out threadContext.'

def join():
    print 'in threadJoin.'
    time.sleep(1)
    print 'out threadJoin.'

tJoin = threading.Thread(target=join)
tContext = threading.Thread(target=context, args=(tJoin,))

tContext.start()

運行結果:對象

in threadContext. 
in threadJoin. 
out threadJoin. 
out threadContext.繼承

3.2. Lock

Lock(指令鎖)是可用的最低級的同步指令。Lock處於鎖定狀態時,不被特定的線程擁有。Lock包含兩種狀態——鎖定和非鎖定,以及兩個基本的方法。遞歸

能夠認爲Lock有一個鎖定池,當線程請求鎖定時,將線程至於池中,直到得到鎖定後出池。池中的線程處於狀態圖中的同步阻塞狀態。

構造方法: 
Lock()

實例方法: 
acquire([timeout]): 使線程進入同步阻塞狀態,嘗試得到鎖定。 

release(): 釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。

# encoding: UTF-8
import threading
import time

data = 0
lock = threading.Lock()

def func():
    global data
    print '%s acquire lock...' % threading.currentThread().getName()
   
    # 調用acquire([timeout])時,線程將一直阻塞,
    # 直到得到鎖定或者直到timeout秒後(timeout參數可選)。
    # 返回是否得到鎖。
    if lock.acquire():
        print '%s get the lock.' % threading.currentThread().getName()
        data += 1
        time.sleep(2)
        print '%s release lock...' % threading.currentThread().getName()
       
        # 調用release()將釋放鎖。
        lock.release()

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start() 

3.3. RLock

RLock(可重入鎖)是一個能夠被同一個線程請求屢次的同步指令RLock使用了「擁有的線程」和「遞歸等級」的概念,處於鎖定狀態時,RLock被某個線程擁有。擁有RLock的線程能夠再次調用acquire(),釋放鎖時須要調用release()相同次數。

能夠認爲RLock包含一個鎖定池和一個初始值爲0的計數器,每次成功調用 acquire()/release(),計數器將+1/-1,爲0時鎖處於未鎖定狀態。

構造方法: 
RLock()

實例方法: 
acquire([timeout])/release(): 跟Lock差很少。

# encoding: UTF-8
import threading
import time

rlock = threading.RLock()

def func():
    # 第一次請求鎖定
    print '%s acquire lock...' % threading.currentThread().getName()
    if rlock.acquire():
        print '%s get the lock.' % threading.currentThread().getName()
        time.sleep(2)
       
        # 第二次請求鎖定
        print '%s acquire lock again...' % threading.currentThread().getName()
        if rlock.acquire():
            print '%s get the lock.' % threading.currentThread().getName()
            time.sleep(2)
       
        # 第一次釋放鎖
        print '%s release lock...' % threading.currentThread().getName()
        rlock.release()
        time.sleep(2)
       
        # 第二次釋放鎖
        print '%s release lock...' % threading.currentThread().getName()
        rlock.release()

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()

3.4. Condition

Condition(條件變量)一般與一個鎖關聯。須要在多個Contidion中共享一個鎖時,能夠傳遞一個Lock/RLock實例給構造方法,不然它將本身生成一個RLock實例。

能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。

構造方法: 
Condition([lock/rlock])

實例方法: 
acquire([timeout])/release(): 調用關聯的鎖的相應方法。 

wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。 
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試得到鎖定(進入鎖定池);其餘線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。 
notifyAll(): 調用這個方法將通知等待池中全部的線程,這些線程都將進入鎖定池嘗試得到鎖定。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。

例子是很常見的生產者/消費者模式:

# encoding: UTF-8
import threading
import time

# 商品
product = None
# 條件變量
con = threading.Condition()

# 生產者方法
def produce():
    global product
   
    if con.acquire():
        while True:
            if product is None:
                print 'produce...'
                product = 'anything'
               
                # 通知消費者,商品已經生產
                con.notify()
           
            # 等待通知
            con.wait()
            time.sleep(2)

# 消費者方法
def consume():
    global product
   
    if con.acquire():
        while True:
            if product is not None:
                print 'consume...'
                product = None
               
                # 通知生產者,商品已經沒了
                con.notify()
           
            # 等待通知
            con.wait()
            time.sleep(2)

t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()

3.5. Semaphore/BoundedSemaphore

Semaphore(信號量)是計算機科學史上最古老的同步指令之一。Semaphore管理一個內置的計數器,每當調用acquire()時-1,調用release() 時+1。計數器不能小於0;當計數器爲0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。

基於這個特色,Semaphore常常用來同步一些有「訪客上限」的對象,好比鏈接池。

BoundedSemaphore 與Semaphore的惟一區別在於前者將在調用release()時檢查計數器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。

構造方法: 
Semaphore(value=1): value是計數器的初始值。

實例方法: 
acquire([timeout]): 請求Semaphore。若是計數器爲0,將阻塞線程至同步阻塞狀態;不然將計數器-1並當即返回。 

release(): 釋放Semaphore,將計數器+1,若是使用BoundedSemaphore,還將進行釋放次數檢查。release()方法不檢查線程是否已得到 Semaphore。

# encoding: UTF-8
import threading
import time

# 計數器初值爲2
semaphore = threading.Semaphore(2)

def func():
   
    # 請求Semaphore,成功後計數器-1;計數器爲0時阻塞
    print '%s acquire semaphore...' % threading.currentThread().getName()
    if semaphore.acquire():
       
        print '%s get semaphore' % threading.currentThread().getName()
        time.sleep(4)
       
        # 釋放Semaphore,計數器+1
        print '%s release semaphore' % threading.currentThread().getName()
        semaphore.release()

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t4 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
t4.start()

time.sleep(2)

# 沒有得到semaphore的主線程也能夠調用release
# 若使用BoundedSemaphore,t4釋放semaphore時將拋出異常
print 'MainThread release semaphore without acquire'
semaphore.release()

3.6. Event

Event(事件)是最簡單的線程通訊機制之一:一個線程通知事件,其餘線程等待事件。Event內置了一個初始爲False的標誌,當調用set()時設爲True,調用clear()時重置爲 False。wait()將阻塞線程至等待阻塞狀態。

Event其實就是一個簡化版的 Condition。Event沒有鎖,沒法使線程進入同步阻塞狀態。

構造方法: 
Event()

實例方法: 
isSet(): 當內置標誌爲True時返回True。 

set(): 將標誌設爲True,並通知全部處於等待阻塞狀態的線程恢復運行狀態。 
clear(): 將標誌設爲False。 
wait([timeout]): 若是標誌爲True將當即返回,不然阻塞線程至等待阻塞狀態,等待其餘線程調用set()。

# encoding: UTF-8
import threading
import time

event = threading.Event()

def func():
    # 等待事件,進入等待阻塞狀態
    print '%s wait for event...' % threading.currentThread().getName()
    event.wait()
   
    # 收到事件後進入運行狀態
    print '%s recv event.' % threading.currentThread().getName()

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()

time.sleep(2)

# 發送事件通知
print 'MainThread set event.'
event.set()

3.7. Timer

Timer(定時器)是Thread的派生類,用於在指定時間後調用一個方法。

構造方法: 
Timer(interval, function, args=[], kwargs={}) 

interval: 指定的時間 
function: 要執行的方法 
args/kwargs: 方法的參數

實例方法: 
Timer從Thread派生,沒有增長實例方法。

# encoding: UTF-8
import threading

def func():
    print 'hello timer!'

timer = threading.Timer(5, func)
timer.start()

3.8. local

local是一個小寫字母開頭的類,用於管理 thread-local(線程局部的)數據。對於同一個local,線程沒法訪問其餘線程設置的屬性;線程設置的屬性不會被其餘線程設置的同名屬性替換。

能夠把local當作是一個「線程-屬性字典」的字典,local封裝了從自身使用線程做爲 key檢索對應的屬性字典、再使用屬性名做爲key檢索屬性值的細節。

# encoding: UTF-8
import threading

local = threading.local()
local.tname = 'main'

def func():
    local.tname = 'notmain'
    print local.tname

t1 = threading.Thread(target=func)
t1.start()
t1.join()

print local.tname

熟練掌握Thread、Lock、Condition就能夠應對絕大多數須要使用線程的場合,某些狀況下local也是很是有用的東西。本文的最後使用這幾個類展現線程基礎中提到的場景:

# encoding: UTF-8
import threading

alist = None
condition = threading.Condition()

def doSet():
    if condition.acquire():
        while alist is None:
            condition.wait()
        for i in range(len(alist))[::-1]:
            alist[i] = 1
        condition.release()

def doPrint():
    if condition.acquire():
        while alist is None:
            condition.wait()
        for i in alist:
            print i,
        print
        condition.release()

def doCreate():
    global alist
    if condition.acquire():
        if alist is None:
            alist = [0 for i in range(10)]
            condition.notifyAll()
        condition.release()

tset = threading.Thread(target=doSet,name='tset')
tprint = threading.Thread(target=doPrint,name='tprint')
tcreate = threading.Thread(target=doCreate,name='tcreate')
tset.start()
tprint.start()
tcreate.start()
相關文章
相關標籤/搜索