Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。
1.一、threading模塊python
threading模塊創建在_thread模塊之上。thread模塊以低級=原始的方式來處理和控制線程,而threading模塊
經過對thread進行二次封裝,提供了更方便的api來處理線程。算法
簡單的線程實例:
建立了20個「前臺」線程,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令api
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 建立一個簡單的threading線程實例 5 """ 6 import threading 7 import time 8 9 def to_worker(num): 10 """ 11 線程方法 12 :param num: 13 :return: 14 """ 15 time.sleep(1) 16 print("The num is %s" % num) 17 return 18 19 for i in range(5): 20 t = threading.Thread(target=to_worker, args=(i, )) 21 t.start() #激活線程
代碼執行結果:多線程
1.二、建立線程的構造方法併發
t = threading.Thread(group = None, target = None, name = Nome, args = 0, kwargs = {})app
註釋說明:
group --線程組
target --要執行的方法
name --線程名
args/kwargs -要傳入方法的參數
Thread類提供瞭如下方法:
一、t.start() --激活線程
二、t.getName() --獲取線程的名稱
三、t.setName() --設置線程的名稱
四、t.name() --獲取或設置線程的名稱
五、t.is_alive() --判斷線程是否爲激活狀態
六、t.isAlive() --判斷線程是否爲激活狀態
七、t.setDaemon() --設置爲後臺線程或前臺線程(默認:False)
八、t.isDaemon() --判斷是否爲守護線程
九、t.ident() --獲取線程的標識符。線程標識符是一個非零整數,只有在調用start()方法後,該屬性纔有效,不然它只返回None
十、t.join() --逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義。
十一、t.run() --線程被cpu調度後自動執行線程對象的run方法
1.三、python線程鎖ide
當有一個數據有多個線程對其進行修改的時候,任何一個線程改變他都會對其餘線程形成影響,若是咱們想某一個線程在使用完以前,其餘線程不能對其修改,就須要對這個線程加一個線程鎖。
簡單的線程鎖實例:函數
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 線程鎖小實例 5 """ 6 import threading 7 import time 8 9 globals_num = 0 10 11 lock = threading.RLock() 12 13 def Func(): 14 lock.acquire() #獲取鎖 15 global globals_num 16 globals_num += 1 17 time.sleep(1) 18 print(globals_num) 19 lock.release() #釋放鎖 20 21 for i in range(10): 22 t = threading.Thread(target=Func) 23 t.start()
代碼執行結果:ui
threading.RLock和threading.Lock 的區別
RLock容許在同一線程中被屢次acquire。而Lock卻不容許這種狀況。 若是使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的瑣。
spa
1 import threading 2 lock = threading.Lock() #Lock對象 3 lock.acquire() 4 lock.acquire() #產生了死瑣。 5 lock.release() 6 lock.release() 7 8 9 10 import threading 11 rLock = threading.RLock() #RLock對象 12 rLock.acquire() 13 rLock.acquire() #在同一線程內,程序不會堵塞。 14 rLock.release() 15 rLock.release()
1.四、threading.Event
一、python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法:set、wait、clear。
二、事件處理的機制:全局定義了一個Flag,若是Flag值爲False,那麼當程序執行event.wait方法時就會阻塞,
若是Flag值爲True,那麼event.wait方法時便再也不阻塞。
方法說明:
clear --將Flag設置爲False
set -- 將Flag設置爲True
Event.isSet() --判斷標識符是否爲True
threading.Event簡單實例:
當線程執行的時候,若是flag爲False,則線程會阻塞,當flag爲True的時候,線程不會阻塞。它提供了本地和遠程的併發性。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 threading.Event事件實例 5 """ 6 import threading 7 8 def do(event): 9 print("start.....") 10 event.wait() 11 print("execuse...") 12 13 event_obj = threading.Event() 14 15 for i in range(5): 16 t = threading.Thread(target=do, args=(event_obj,)) 17 t.start() 18 19 event_obj.clear() 20 inp = input('input:(true) ') 21 if inp == "true": 22 event_obj.set()
代碼執行結果:
1.五、threading.Condition(條件變量)
示例說明:當小夥伴a在往火鍋裏面添加魚丸,這個就是生產者行爲;另一個小夥伴b在吃掉魚丸就是消費者行爲。當火鍋裏面魚丸達到必定數量加滿後b才能吃,這就是一種條件判斷了。
Condition(條件變量)一般與一個鎖關聯。須要在多個Contidion中共享一個鎖時,能夠傳遞一個Lock/RLock實例給構造方法,不然它將本身生成一個RLock實例。
能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。
Condition():
acquire(): 線程鎖
release(): 釋放鎖
wait(timeout): 線程掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)纔會被喚醒繼續運行。wait()必須在已得到Lock前提下才能調用,不然會觸發RuntimeError。
notify(n=1): 通知其餘線程,那些掛起的線程接到這個通知以後會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已得到Lock前提下才能調用,不然會觸發RuntimeError。notify()不會主動釋放Lock。
notifyAll(): 若是wait狀態線程比較多,notifyAll的做用就是通知全部線程。
生產者與消費者示例:
現實場景:當a同窗王火鍋裏面添加魚丸加滿後(最多3個,加滿後通知b去吃掉),通知b同窗去吃掉魚丸(吃到0的時候通知a同窗繼續添加)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 threading.Condition 5 """ 6 import threading 7 import time 8 9 con = threading.Condition() 10 11 num = 0 12 13 #生產者 14 class Producer(threading.Thread): 15 16 def __init__(self): 17 threading.Thread.__init__(self) 18 19 def run(self): 20 #鎖定線程 21 global num 22 con.acquire() #獲取鎖 23 while True: 24 print("a同窗開始添加......") 25 num += 1 26 print("鍋裏丸子個數爲:%s" % str(num)) 27 time.sleep(1) 28 if num >= 3: 29 print("丸子個數已經達到3個了,沒法添加。") 30 #喚醒等待的線程 31 con.notify() #喚醒同窗開吃 32 #等待通知 33 con.wait() 34 35 #釋放鎖 36 con.release() 37 38 #消費者 39 class Consumers(threading.Thread): 40 def __init__(self): 41 threading.Thread.__init__(self) 42 43 def run(self): 44 con.acquire() 45 global num 46 while True: 47 print("我準備開吃了...") 48 num -= 1 49 print("鍋裏丸子數量爲:%s" % str(num)) 50 time.sleep(2) 51 if num <= 0: 52 print("丸子吃完了,趕忙添加啦..") 53 con.notify() #喚醒等待的線程 54 #等待通知 55 con.wait() 56 con.release() #釋放鎖 57 58 p = Producer() 59 c = Consumers() 60 p.start() 61 c.start()
代碼執行結果:
1.六、Queue模塊
1.6.一、建立一個「隊列」對象
import queue
q = queue.queue(maxsize = 10)
queue.queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。
1.6.二、將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。
1.6.三、將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。
1.6.四、Python queue模塊有三種隊列及構造函數:
一、Python queue模塊的FIFO隊列先進先出。 class queue.queue(maxsize)
二、LIFO相似於堆,即先進後出。 class queue.Lifoqueue(maxsize)
三、還有一種是優先級隊列級別越低越先出來。 class queue.Priorityqueue(maxsize)
1.6.五、queue經常使用方法(q =queue.queue()):
q.qsize() 返回隊列的大小
q.empty() 若是隊列爲空,返回True,反之False
q.full() 若是隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 至關q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 至關q.put(item, False)
q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列爲空,再執行別的操做
1.6.六、簡單的queue實例:生產者-消費者模型
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 Queue隊列 5 """ 6 import queue 7 import threading 8 9 10 message = queue.Queue(10) 11 12 def producer(i): 13 while True: 14 message.put(i) 15 16 def consumer(i): 17 while True: 18 msg = message.get(i) 19 20 for i in range(12): 21 t = threading.Thread(target=producer, args=(i, )) 22 t.start() 23 for i in range(10): 24 t = threading.Thread(target=consumer, args=(i, )) 25 t.start()
1.七、自定義線程池
1.7.一、方法一:簡單往隊列中傳輸線程數
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 自定義線程池 5 方法一:簡單往隊列中傳輸線程數 6 """ 7 import threading 8 import time 9 import queue 10 11 class ThreadingPool(): 12 def __init__(self, max_num = 10): 13 self.queue = queue.Queue(max_num) 14 for i in range(max_num): 15 self.queue.put(threading.Thread) 16 17 def getthreading(self): 18 return self.queue.get() 19 20 def addthreading(self): 21 self.queue.put(threading.Thread) 22 23 24 def func(p, i): 25 time.sleep(1) 26 print(i) 27 p.addthreading() 28 29 if __name__ == "__main__": 30 p = ThreadingPool() 31 for i in range(12): 32 thread = p.getthreading() 33 t = thread(target = func, args = (p, i)) 34 t.start()
代碼執行結果:
1.7.二、方法二:往隊列中無限添加任務
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 """ 4 自定義線程池 5 方法二:往隊列中無限添加任務 6 """ 7 import queue 8 import threading 9 import contextlib 10 import time 11 12 StopEvent = object() 13 14 class ThreadPool(object): 15 16 def __init__(self, max_num): 17 self.q = queue.Queue() 18 self.max_num = max_num 19 20 self.treminal = False 21 self.generate_list = [] 22 self.free_list = [] 23 24 def run(self, func, args, callback=None): 25 """ 26 線程池執行一個任務 27 :param func: 任務函數 28 :param args: 任務函數所需參數 29 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數:一、任務函數執行狀態;二、任務函數返回值(默認爲None,即不執行回調函數) 30 :return:若是線程池已經終止,則返回True,不然爲None 31 """ 32 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 33 self.generate_thread() 34 w = (func, args, callback,) 35 self.q.put(w) 36 37 def generate_thread(self): 38 """ 39 建立一個線程 40 :param self: 41 :return: 42 """ 43 t = threading.Thread(target=self.call) 44 t.start() 45 46 def call(self): 47 """ 48 循環去獲取任務函數並執行任務函數 49 :return: 50 """ 51 current_thread = threading.currentThread 52 self.generate_list.append(current_thread) 53 54 event = self.q.get() #獲取線程 55 while event != StopEvent: #判斷獲取的線程數不等於全局變量 56 func, arguments, callback = event #拆分元組, 獲取執行函數,參數, 回調函數 57 try: 58 result = func(*arguments) #執行函數 59 status = True 60 61 except Exception as e: #函數執行失敗 62 status = False 63 result = e 64 65 if callback is not None: 66 try: 67 callback(status, result) 68 except Exception as e: 69 pass 70 71 with self.work_state(): 72 event = self.q.get() 73 74 else: 75 self.generate_list.remove(current_thread) 76 77 78 def close(self): 79 """ 80 關閉線程,給傳輸全局非元組的變量來進行關閉 81 :return: 82 """ 83 for i in range(len(self.generate_list)): 84 self.q.put(StopEvent) 85 86 87 def terminate(self): 88 """ 89 忽然關閉線程 90 :return: 91 """ 92 self.terminal = True 93 while self.generate_list: 94 self.q.put(StopEvent) 95 self.q.empty() 96 97 98 def work_state(self): 99 self.free_list.append(threading.current_thread) 100 try: 101 yield 102 finally: 103 self.free_list.remove(threading.currentThread) 104 105 def work(i): 106 print(i) 107 return i + 1 #返回給回調函數 108 109 def callback(ret): 110 print(ret) 111 112 pool = ThreadPool(10) 113 for item in range(50): 114 pool.run(func=work, args=(item, ), callback=callback) 115 116 pool.terminate()