- 一、threading模塊詳解
- 二、queue模塊詳解
Python提供的與線程操做相關的模塊,網上有不少資料仍是用的thread模塊,在3.x版本中已經使用
threading來替代thread
,若是你在python 2.x版本想使用threading的話,可使用dummy_threading模塊
。css
- active_count():獲取當前活躍(alive)線程的個數。
- current_thread():獲取當前的線程對象。
- get_ident():返回當前線程的索引,一個非零的整數(3.3新增)。
- enumerate():獲取當前全部活躍線程的列表。
- main_thread():返回主線程對象(3.4新增)。
- settrace(func):設置一個回調函數,在run()執行以前被調用。
- setprofile(func):設置一個回調函數,在run()執行完畢以後調用。
- stack_size():返回建立新線程時使用的線程堆棧大小。
- threading.TIMEOUT_MAX:堵塞線程時間最大值,超過這個值會棧溢出。
問題引入:python
在一個進程內全部的線程共享進程的全局變量,線程間共享數據很方便可是每一個線程均可以隨意修改全局變量,可能會引發線程安全問題。sql
解決方法:apache
對於這種線程私有數據,最簡單的方法就是對變量加鎖或使用局部變量,只有線程自身能夠訪問,其餘線程沒法訪問。除此以外還可使用threading模塊爲咱們提供的
ThreadLocal變量
,它自己是一個全局變量,可是線程們卻可使用它來保存私有數據。安全
用法簡介:多線程
定義一個全局變量:data = thread.local(),而後就能夠往裏面存數據啦,好比data.num = xxx,可是有一點要注意:若是data裏沒有設置對應的屬性,直接取會報AttributeError異常,使用時能夠捕獲這個異常或先調用hasattr(對象,屬性)判斷對象中是否有該屬性!使用代碼示例以下:併發
import threading
import random
data = threading.local()
def show(d):
try:
num = d.num
except AttributeError:
print("線程 %s 還未設置該屬性!" % threading.current_thread().getName())
else:
print("線程 %s 中該屬性的值爲 = %s" % (threading.current_thread().getName(), num))
def thread_call(d):
show(d)
d.num = random.randint(1, 100)
show(d)
if __name__ == '__main__':
show(data)
data.num = 666
show(data)
for i in range(2):
t = threading.Thread(target=thread_call, args=(data,), name="Thread " + str(i))
t.start()
複製代碼
運行結果以下:app
線程 MainThread 還未設置該屬性!
線程 MainThread 中該屬性的值爲 = 666
線程 Thread 0 還未設置該屬性!
線程 Thread 0 中該屬性的值爲 = 80
線程 Thread 1 還未設置該屬性!
線程 Thread 1 中該屬性的值爲 = 17
複製代碼
不一樣線程訪問這個ThreadLocal變量,返回的都是不同的值,原理:dom
threading.local()實例化一個全局對象,這個全局對象裏有一個大字典,鍵值爲兩個弱引用對象{線程對象,字典對象},而後能夠經過
current_thread
()得到當前的線程對象,而後根據這個對象能夠拿到對應的字典對象,而後進行參數的讀或者寫。ide
建立新線程的兩種方式:
使用代碼示例(驗證單線程快仍是多線程快):
import threading
import time
def catch_fish():
pass
def one_thread():
start_time = time.time()
for i in range(1, 1001):
catch_fish()
end_time = time.time()
print("單線程測試 耗時 === %s" % str(end_time - start_time))
def muti_thread():
start_time = time.time()
for i in range(1, 1001):
threading.Thread(target=catch_fish()).start()
end_time = time.time()
print("多線程測試 耗時 === %s" % str(end_time - start_time))
if __name__ == '__main__':
# 單線程
threading.Thread(one_thread()).start()
# 多線程
muti_thread()
複製代碼
運行結果以下:
單線程測試 耗時 === 0.00011301040649414062
多線程測試 耗時 === 0.07665514945983887
複製代碼
從輸出結果能夠看到,多線程反而比單線程要慢,緣由是前面介紹過的Python中的全局解釋器鎖(GIL), 使得任什麼時候候僅有一個線程在執行。
Thread類構造函數
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
複製代碼
構造函數參數依次是:
- group:線程組
- target:要執行的函數
- name:線程名字
- args/kwargs:要傳入的函數的參數
- daemon:是否爲守護線程
相關屬性與函數:
- start():啓動線程,只能調用一次
- run():線程執行的操做,可繼承Thread重寫,參數可從args和kwargs獲取;
- join([timeout]):堵塞調用線程,直到被調用線程運行結束或超時;若是
沒設置超時時間會一直堵塞到被調用線程結束。- name/getName():得到線程名;
- setName():設置線程名;
- ident:線程是已經啓動,未啓動會返回一個非零整數;
- is_alive():判斷是否在運行,啓動後,終止前;
- daemon/isDaemon():線程是否爲守護線程;
- setDaemon():設置線程爲守護線程;
在概念那裏就講了,多個進程併發的訪問臨界資源可能會引發線程同步安全問題,寫個簡單的例子,而後再引入同步鎖。代碼示例以下:
import threading
file_name = "test.txt"
# 定義一個寫入文件的方法
def write_to_file(msg):
try:
with open(file_name, "a+", encoding="utf-8") as f:
f.write(msg + "\n")
except OSError as reason:
print(str(reason))
class MyThread(threading.Thread):
def __init__(self, msg):
super().__init__()
self.msg = msg
def run(self):
write_to_file(self.name + "~" + self.msg)
if __name__ == '__main__':
for i in range(1, 21):
t = MyThread(str(i)).start()
複製代碼
運行結果以下:
# test.txt文件內容
Thread-1~1
Thread-5~5
Thread-3~3
Thread-2~2
Thread-4~4
Thread-6~6
Thread-7~7
Thread-8~8
Thread-10~10
Thread-9~9
Thread-11~11
Thread-13~13
Thread-12~12
Thread-14~14
Thread-15~15
Thread-16~16
Thread-17~17
Thread-19~19
Thread-20~20
Thread-18~18
複製代碼
發現結果並無按照咱們預想的1-20那樣順序打印,而是亂的,threading模塊中提供了兩個類來確保多線程共享資源的訪問:「Lock」 和 「RLock」。
Lock:指令鎖,有兩種狀態(鎖定與非鎖定),以及兩個基本函數:
使用
acquire()
設置爲locked
狀態,使用release()
設置爲unlocked狀態
。acquire()函數有兩個可選參數:blocking=True:是否堵塞當前線程等待;timeout=None:堵塞等待時間。若是成功得到lock,acquire返回True,不然返回False,超時也是返回False。使用起來也很簡單,在訪問共享資源的地方acquire一下,用完release就好。使用代碼示例以下:
import threading
file_name = "test.txt"
lock = threading.Lock()
# 定義一個寫入文件的方法(加鎖)
def write_to_file(msg):
if lock.acquire():
try:
with open(file_name, "a+", encoding="utf-8") as f:
f.write(msg + "\n")
except OSError as reason:
print(str(reason))
finally:
lock.release()
class MyThread(threading.Thread):
def __init__(self, msg):
super().__init__()
self.msg = msg
def run(self):
write_to_file(self.name + "~" + self.msg)
if __name__ == '__main__':
for i in range(1, 101):
t = MyThread(str(i)).start()
複製代碼
這裏把循環次數改爲了101,反覆執行屢次,test.txt中寫入順序也是正確的,加鎖有效。另外有一點要注意:若是鎖的狀態是unlocked,此時調用release會拋出RuntimeError異常!
RLock:可重入鎖,和Lock相似,但RLock卻能夠被同一個線程請求屢次! 舉個例子:在一個線程裏調用Lock對象的acquire方法兩次。
lock.acquire()
lock.acquire()
lock.release()
lock.release()
複製代碼
你會發現程序卡住不動,由於已經發生了死鎖,可是方法調用是在同一個線程裏的,這很不合理吧。這個時候就能夠引入RLock了,使用RLock編寫同樣代碼,只需把threading.Lock()改爲threading.RLock(),便可解決這個問題。
雖然使用RLock能夠規避同一個線程引發的死鎖問題,可是acquire和release函數要成對出現,即有多少個acquire就要有多少個release,纔可以正真釋放鎖。
上面的互斥鎖Lock和RLock只是最簡單的同步機制,Python爲咱們提供了Condition(條件變量),以便於處理複雜線程同步問題,好比最經典的生產者與消費者問題。Condition除了提供與Lock相似的acquire
()與release
()函數外,還提供了wait
()與notify
()函數。
用法簡介:
- 1.調用`threading.Condition`得到一個條件變量對象;
- 2.線程調用acquire得到Condition對象;
- 3.進行條件判斷,不知足條件調用wait函數,知足條件,進行一些處理改變條件後,調用notify函數通知處於wait狀態的線程,從新進行條件判斷。
代碼示例以下(實現一個簡單的消費者和生產者):
import threading
import time
condition = threading.Condition()
products = 0 # 商品數量
# 定義生產者線程類
class Producer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products >= 99:
condition.wait()
else:
products += 2
print(self.name + "生產了2個產品,當前剩餘產品數爲:" + str(products))
condition.notify()
condition.release()
time.sleep(2)
# 定義消費者線程類
class Consumer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products < 4:
condition.wait()
else:
products -= 4
print(self.name + "消耗了4個產品,當前剩餘產品數爲:" + str(products))
condition.notify()
condition.release()
time.sleep(2)
if __name__ == '__main__':
# 建立五個生產者線程
for i in range(5):
p = Producer()
p.start()
# 建立兩個消費者線程
for j in range(2):
c = Consumer()
c.start()
複製代碼
部分運行結果以下:
Thread-1生產了2個產品,當前剩餘產品數爲:2
Thread-2生產了2個產品,當前剩餘產品數爲:4
Thread-3生產了2個產品,當前剩餘產品數爲:6
Thread-4生產了2個產品,當前剩餘產品數爲:8
Thread-5生產了2個產品,當前剩餘產品數爲:10
Thread-6消耗了4個產品,當前剩餘產品數爲:6
Thread-7消耗了4個產品,當前剩餘產品數爲:2
Thread-1生產了2個產品,當前剩餘產品數爲:4
Thread-5生產了2個產品,當前剩餘產品數爲:6
Thread-3生產了2個產品,當前剩餘產品數爲:8
Thread-7消耗了4個產品,當前剩餘產品數爲:4
Thread-6消耗了4個產品,當前剩餘產品數爲:0
Thread-4生產了2個產品,當前剩餘產品數爲:2
複製代碼
Condition維護着一個互斥鎖對象(默認是RLock),也能夠本身實例化一個在Condition實例化的時候經過構造函數傳入,因此,調用的Condition的acquire與release函數,其實調用就是這個鎖對象的acquire與release函數。
Condition提供的其餘函數:
- wait(timeout=None):釋放鎖,同時線程被掛起,直到收到通知被喚醒
或超時(若是設置了timeout),當線程被喚醒並從新佔有鎖時,程序才繼續執行;- wait_for(predicate, timeout=None):等待知道條件爲True,predicate應該是
一個回調函數,返回布爾值,timeout用於指定超時時間,返回值爲回調函數返回
的布爾值,或者超時,返回False(3.2新增);- notify(n=1):默認喚醒一個正在的等待線程,notify並不釋放鎖!!!
- notify_all():喚醒全部等待線程,進入就緒狀態,等待得到鎖,notify_all 一樣不釋放鎖!!!
注:上述函數只有在acquire以後才能調用,否則會報RuntimeError異常。
信號量,也是一個簡單易懂的東西,舉個形象的例子:
廁所裏有五個坑位,每有我的去廁所就會佔用一個坑位,所剩餘的坑位-1,當五個坑都被人佔滿時,新來的人就只能在外面等候,直到有人出來爲止。這裏的五個坑位就是信號量,蹲坑的人就是線程,初始值爲5,來人-1,走人+1,超過最大值,新來的處於堵塞狀態,咱們寫下代碼來還原這個過程。
信號量使用代碼示例以下:
import threading
import time
import random
s = threading.Semaphore(5) # 糞坑
class Human(threading.Thread):
def run(self):
s.acquire() # 佔坑
print("蹲坑 - " + self.name + " - " + str(time.ctime()))
time.sleep(random.randrange(1, 3))
print("走人 - " + self.name + " - " + str(time.ctime()))
s.release() # 走人
if __name__ == '__main__':
for i in range(10):
human = Human()
human.start()
複製代碼
運行結果以下:
蹲坑 - Thread-1 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-2 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-3 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-4 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-5 - Tue Jul 17 19:59:15 2018
走人 - Thread-1 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-6 - Tue Jul 17 19:59:16 2018
走人 - Thread-2 - Tue Jul 17 19:59:16 2018
走人 - Thread-3 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-8 - Tue Jul 17 19:59:16 2018
走人 - Thread-5 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-7 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-9 - Tue Jul 17 19:59:16 2018
走人 - Thread-4 - Tue Jul 17 19:59:17 2018
蹲坑 - Thread-10 - Tue Jul 17 19:59:17 2018
走人 - Thread-6 - Tue Jul 17 19:59:17 2018
走人 - Thread-8 - Tue Jul 17 19:59:17 2018
走人 - Thread-9 - Tue Jul 17 19:59:17 2018
走人 - Thread-7 - Tue Jul 17 19:59:18 2018
走人 - Thread-10 - Tue Jul 17 19:59:19 2018
複製代碼
Python提供的「用於線程間通訊的信號標誌」,一個線程標識了一個事件,其餘線程處於等待狀態,直到事件發生後,全部線程都會被激活。Event對象屬性實現了簡單的線程通訊機制,提供了設置信號,清除信號,等待等用於實現線程間的通訊。提供如下四個可供調用的方法:
- is_set():判斷內部標誌是否爲真
- set():設置信號標誌爲真
- clear():清除Event對象內部的信號標誌(設置爲false)
- wait(timeout=None):使線程一直處於堵塞,知道標識符變爲True
使用代碼示例(汽車等紅綠燈的例子):
import threading
import time
import random
class CarThread(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.threadEvent = event
def run(self):
# 休眠模擬汽車前後到達路口時間
time.sleep(random.randrange(1, 10))
print("汽車 - " + self.name + " - 到達路口...")
self.threadEvent.wait()
print("汽車 - " + self.name + " - 經過路口...")
if __name__ == '__main__':
light_event = threading.Event()
# 假設有20臺車子
for i in range(20):
car = CarThread(event=light_event)
car.start()
while threading.active_count() > 1:
light_event.clear()
print("紅燈等待...")
time.sleep(3)
print("綠燈通行...")
light_event.set()
time.sleep(2)
複製代碼
運行結果以下:
紅燈等待...
汽車 - Thread-10 - 到達路口...
汽車 - Thread-14 - 到達路口...
汽車 - Thread-9 - 到達路口...
汽車 - Thread-11 - 到達路口...
汽車 - Thread-12 - 到達路口...
綠燈通行...
汽車 - Thread-11 - 經過路口...
汽車 - Thread-10 - 經過路口...
汽車 - Thread-9 - 經過路口...
汽車 - Thread-14 - 經過路口...
汽車 - Thread-12 - 經過路口...
汽車 - Thread-6 - 到達路口...
汽車 - Thread-6 - 經過路口...
複製代碼
和Thread相似,只是要等待一段時間後纔會開始運行,單位秒,用法也很簡單,
代碼示例以下:
import threading
import time
def skill_ready():
print("菜餚製做完成!!!")
if __name__ == '__main__':
t = threading.Timer(5, skill_ready)
t.start()
while threading.active_count() > 1:
print("======菜餚製做中======")
time.sleep(1)
複製代碼
運行結果以下:
======菜餚製做中======
======菜餚製做中======
======菜餚製做中======
======菜餚製做中======
======菜餚製做中======
菜餚製做完成!!!
複製代碼
Barrier直譯柵欄,感受不怎麼好理解,咱們能夠把它看作是賽馬用的柵欄,而後馬(線程)依次來到柵欄前等待(wait),直到全部的馬都停在柵欄面前了,而後全部馬開始同時出發(start)。簡單點說就是: 多個線程間的相互等待,調用了wait()方法的線程進入堵塞, 直到全部的線程都調用了wait()方法,而後全部線程同時進入就緒狀態, 等待調度運行。
構造函數: Barrier(parties,action=None,timeout=None)
參數解釋:
- parties:建立一個可容納parties條線程的柵欄;
- action:所有線程被釋放時可被其中一條線程調用的可調用對象;
- timeout:線程調用wait()方法時沒有顯式設定timeout,就用的這個做爲默認值;
相關屬性與函數:
- wait(timeout=None):表示線程就位,返回值是一個0到parties-1之間的
整數, 每條線程都不同,這個值能夠用做挑選一條線程作些清掃工做,另外若是
你在構造函數裏設置了action的話,其中一個線程在釋放以前將會調用它。若是調用
出錯的話,會讓柵欄進入broken狀態,超時一樣也會進入broken狀態,若是柵欄在
處於broke狀態的時候調用reset函數,會拋出一個BrokenBarrierError異常。- reset():本方法將柵欄置爲初始狀態,即empty狀態。全部已經在等待的線程都會
接收到BrokenBarrierError異常,注意當有其餘處於unknown狀態的線程時,調用
此方法將可能獲取到額外的訪問。所以若是一個柵欄進入了broken狀態, 最好是
放棄他並新建一個柵欄,而不是調用reset方法。- abort():將柵欄置爲broken狀態。本方法將使全部正在等待或將要調用
wait()方法的線程收到BrokenBarrierError異常。本方法的使用情景爲,好比:
有一條線程須要abort(),又不想給其餘線程形成死鎖的狀態,或許設定
timeout參數要比使用本方法更可靠。- parites:將要使用本 barrier 的線程的數量
- n_waiting:正在等待本 barrier 的線程的數量
- broken:柵欄是否爲broken狀態,返回一個布爾值
- BrokenBarrierError:RuntimeError的子類,當柵欄被reset()或broken時引起;
使用代碼示例以下(公司一塊兒去旅遊等人齊纔出發):
import threading
import time
import random
class Staff(threading.Thread):
def __init__(self, barriers):
threading.Thread.__init__(self)
self.barriers = barriers
def run(self):
print("員工 【" + self.name + "】" + "出門")
time.sleep(random.randrange(1, 10))
print("員工 【" + self.name + "】" + "已簽到")
self.barriers.wait()
def ready():
print(threading.current_thread().name + ":人齊,出發,出發~~~")
if __name__ == '__main__':
print("要出去旅遊啦,你們快集合~")
b = threading.Barrier(10, action=ready, timeout=20)
for i in range(10):
staff = Staff(b)
staff.start()
複製代碼
運行結果以下:
要出去旅遊啦,你們快集合~
員工 【Thread-1】出門
員工 【Thread-2】出門
員工 【Thread-3】出門
員工 【Thread-4】出門
員工 【Thread-5】出門
員工 【Thread-6】出門
員工 【Thread-7】出門
員工 【Thread-8】出門
員工 【Thread-9】出門
員工 【Thread-10】出門
員工 【Thread-8】已簽到
員工 【Thread-4】已簽到
員工 【Thread-5】已簽到
員工 【Thread-6】已簽到
員工 【Thread-9】已簽到
員工 【Thread-2】已簽到
員工 【Thread-3】已簽到
員工 【Thread-7】已簽到
員工 【Thread-1】已簽到
員工 【Thread-10】已簽到
Thread-10:人齊,出發,出發~~~
複製代碼
Python中的queue模塊中已經實現了一個線程安全的多生產者,多消費者隊列,自帶鎖,經常使用於多線程併發數據交換。內置三種類型的隊列:
- Queue:FIFO(先進先出);
- LifoQueue:LIFO(後進先出);
- PriorityQueue:優先級最小的先出;
三種類型的隊列的構造函數都是(maxsize=0),用於設置隊列容量,若是設置的maxsize小於1,則表示隊列的長度無限長。
兩個異常:
相關函數:
- size():返回隊列的近似大小,注意:qsize()> 0不保證隨後的get()不會 阻塞也不保證qsize() < maxsize後的put()不會堵塞;
- empty():判斷隊列是否爲空,返回布爾值,若是返回True,不保證後續 調用put()不會阻塞,同理,返回False也不保證get()調用不會被阻塞;
- full():判斷隊列是否滿,返回布爾值若是返回True,不保證後續 調用get()不會阻塞,同理,返回False也不保證put()調用不會被阻塞;
- put(item, block=True, timeout=None):往隊列中放入元素,若是block爲True且timeout參數爲None(默認),爲堵塞型put(),若是timeout是 正數,會堵塞timeout時間並引起Queue.Full異常,若是block爲False則 爲非堵塞put()。
- put_nowait(item):等價於put(item, False),非堵塞put()
- get(block=True, timeout=None):移除一個隊列元素,並返回該元素,若是block爲True表示堵塞函數,block = False爲非堵塞函數,若是設置了timeout,堵塞時最多堵塞超過多少秒,若是這段時間內沒有可用的項,會引起Queue.Empty異常,若是爲非堵塞狀態,有數據可用返回數據無數據當即拋出Queue.Empty異常;
- get_nowait():等價於get(False),非堵塞get()
- task_done():完成一項工做後,調用該方法向隊列發送一個完成信號,任務-1;
- join():等隊列爲空,再執行別的操做;
使用代碼示例以下:
import threading
import queue
import time
import random
work_queue = queue.Queue()
# 任務模擬
def working():
global work_queue
while not work_queue.empty():
data = work_queue.get()
time.sleep(random.randrange(1, 2))
print("執行" + data)
work_queue.task_done()
# 工做線程
class WorkThread(threading.Thread):
def __init__(self, t_name, func):
self.func = func
threading.Thread.__init__(self, name=t_name)
def run(self):
self.func()
if __name__ == '__main__':
work_list = []
for i in range(1, 21):
work_list.append("任務 %d" % i)
# 模擬把須要執行的任務放到隊列中
for i in work_list:
work_queue.put(i)
# 初始化一個線程列表
threads = []
for i in range(0, len(work_list)):
t = WorkThread(t_name="線程" + str(i), func=working)
t.daemon = True
t.start()
threads.append(t)
work_queue.join()
for t in threads:
t.join()
print("全部任務執行完畢")
複製代碼
運行結果以下:
執行任務 1
執行任務 3
執行任務 5
執行任務 2
執行任務 4
執行任務 6
執行任務 8
執行任務 10
執行任務 13
執行任務 11
執行任務 17
執行任務 18
執行任務 19
執行任務 7
執行任務 14
執行任務 16
執行任務 9
執行任務 15
執行任務 12
執行任務 20
全部任務執行完畢
複製代碼
若是本文對你有所幫助,歡迎
留言,點贊,轉發
素質三連,謝謝😘~