python--線程知識詳解

Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。

1.一、threading模塊python

threading模塊創建在_thread模塊之上。thread模塊以低級=原始的方式來處理和控制線程,而threading模塊
經過對thread進行二次封裝,提供了更方便的api來處理線程。算法

簡單的線程實例:
建立了20個「前臺」線程,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令api

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 建立一個簡單的threading線程實例
 5 """
 6 import threading
 7 import time
 8 
 9 def to_worker(num):
10     """
11     線程方法
12     :param num:
13     :return:
14     """
15     time.sleep(1)
16     print("The num is %s" % num)
17     return
18 
19 for i in range(5):
20     t = threading.Thread(target=to_worker, args=(i, ))
21     t.start()  #激活線程

代碼執行結果:多線程

 

1.二、建立線程的構造方法併發

t = threading.Thread(group = None, target = None, name = Nome, args = 0, kwargs = {})app

註釋說明:
group --線程組
target --要執行的方法
name --線程名
args/kwargs -要傳入方法的參數

Thread類提供瞭如下方法:
一、t.start() --激活線程
二、t.getName() --獲取線程的名稱
三、t.setName() --設置線程的名稱
四、t.name() --獲取或設置線程的名稱
五、t.is_alive() --判斷線程是否爲激活狀態
六、t.isAlive() --判斷線程是否爲激活狀態
七、t.setDaemon() --設置爲後臺線程或前臺線程(默認:False)
八、t.isDaemon() --判斷是否爲守護線程
九、t.ident() --獲取線程的標識符。線程標識符是一個非零整數,只有在調用start()方法後,該屬性纔有效,不然它只返回None
十、t.join() --逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義。
十一、t.run() --線程被cpu調度後自動執行線程對象的run方法

1.三、python線程鎖ide

當有一個數據有多個線程對其進行修改的時候,任何一個線程改變他都會對其餘線程形成影響,若是咱們想某一個線程在使用完以前,其餘線程不能對其修改,就須要對這個線程加一個線程鎖。

簡單的線程鎖實例:函數

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 線程鎖小實例
 5 """
 6 import threading
 7 import time
 8 
 9 globals_num = 0
10 
11 lock = threading.RLock()
12 
13 def Func():
14     lock.acquire()  #獲取鎖
15     global globals_num
16     globals_num += 1
17     time.sleep(1)
18     print(globals_num)
19     lock.release()  #釋放鎖
20 
21 for i in range(10):
22     t = threading.Thread(target=Func)
23     t.start()

代碼執行結果:ui

 

threading.RLock和threading.Lock 的區別
RLock容許在同一線程中被屢次acquire。而Lock卻不容許這種狀況。 若是使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的瑣。
spa

 1 import threading
 2 lock = threading.Lock()    #Lock對象
 3 lock.acquire()
 4 lock.acquire()  #產生了死瑣。
 5 lock.release()
 6 lock.release() 
 7 
 8 
 9 
10 import threading
11 rLock = threading.RLock()  #RLock對象
12 rLock.acquire()
13 rLock.acquire()    #在同一線程內,程序不會堵塞。
14 rLock.release()
15 rLock.release()

 

1.四、threading.Event

一、python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法:set、wait、clear。
二、事件處理的機制:全局定義了一個Flag,若是Flag值爲False,那麼當程序執行event.wait方法時就會阻塞,
若是Flag值爲True,那麼event.wait方法時便再也不阻塞。

方法說明:
clear --將Flag設置爲False
set -- 將Flag設置爲True
Event.isSet() --判斷標識符是否爲True


threading.Event簡單實例:
當線程執行的時候,若是flag爲False,則線程會阻塞,當flag爲True的時候,線程不會阻塞。它提供了本地和遠程的併發性。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 threading.Event事件實例
 5 """
 6 import threading
 7 
 8 def do(event):
 9     print("start.....")
10     event.wait()
11     print("execuse...")
12 
13 event_obj = threading.Event()
14 
15 for i in range(5):
16     t = threading.Thread(target=do, args=(event_obj,))
17     t.start()
18 
19 event_obj.clear()
20 inp = input('input:(true) ')
21 if inp == "true":
22     event_obj.set()

代碼執行結果:

 

1.五、threading.Condition(條件變量)

示例說明:當小夥伴a在往火鍋裏面添加魚丸,這個就是生產者行爲;另一個小夥伴b在吃掉魚丸就是消費者行爲。當火鍋裏面魚丸達到必定數量加滿後b才能吃,這就是一種條件判斷了。

Condition(條件變量)一般與一個鎖關聯。須要在多個Contidion中共享一個鎖時,能夠傳遞一個Lock/RLock實例給構造方法,不然它將本身生成一個RLock實例。

能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。

Condition():

acquire(): 線程鎖
release(): 釋放鎖
wait(timeout): 線程掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)纔會被喚醒繼續運行。wait()必須在已得到Lock前提下才能調用,不然會觸發RuntimeError。
notify(n=1): 通知其餘線程,那些掛起的線程接到這個通知以後會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已得到Lock前提下才能調用,不然會觸發RuntimeError。notify()不會主動釋放Lock。
notifyAll(): 若是wait狀態線程比較多,notifyAll的做用就是通知全部線程。


生產者與消費者示例:
現實場景:當a同窗王火鍋裏面添加魚丸加滿後(最多3個,加滿後通知b去吃掉),通知b同窗去吃掉魚丸(吃到0的時候通知a同窗繼續添加)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 threading.Condition
 5 """
 6 import threading
 7 import time
 8 
 9 con = threading.Condition()
10 
11 num = 0
12 
13 #生產者
14 class Producer(threading.Thread):
15 
16     def __init__(self):
17         threading.Thread.__init__(self)
18 
19     def run(self):
20         #鎖定線程
21         global  num
22         con.acquire()  #獲取鎖
23         while True:
24             print("a同窗開始添加......")
25             num += 1
26             print("鍋裏丸子個數爲:%s" % str(num))
27             time.sleep(1)
28             if num >= 3:
29                 print("丸子個數已經達到3個了,沒法添加。")
30                 #喚醒等待的線程
31                 con.notify()  #喚醒同窗開吃
32                 #等待通知
33                 con.wait()
34 
35         #釋放鎖
36         con.release()
37 
38 #消費者
39 class Consumers(threading.Thread):
40     def __init__(self):
41         threading.Thread.__init__(self)
42 
43     def run(self):
44         con.acquire()
45         global num
46         while True:
47             print("我準備開吃了...")
48             num -= 1
49             print("鍋裏丸子數量爲:%s" % str(num))
50             time.sleep(2)
51             if num <= 0:
52                 print("丸子吃完了,趕忙添加啦..")
53                 con.notify()  #喚醒等待的線程
54                 #等待通知
55                 con.wait()
56         con.release()  #釋放鎖
57 
58 p = Producer()
59 c = Consumers()
60 p.start()
61 c.start()

代碼執行結果:

 

1.六、Queue模塊

1.6.一、建立一個「隊列」對象
import queue
q = queue.queue(maxsize = 10)
queue.queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

1.6.二、將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。

1.6.三、將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。

1.6.四、Python queue模塊有三種隊列及構造函數:
一、Python queue模塊的FIFO隊列先進先出。 class queue.queue(maxsize)
二、LIFO相似於堆,即先進後出。 class queue.Lifoqueue(maxsize)
三、還有一種是優先級隊列級別越低越先出來。 class queue.Priorityqueue(maxsize)

1.6.五、queue經常使用方法(q =queue.queue()):
q.qsize() 返回隊列的大小
q.empty() 若是隊列爲空,返回True,反之False
q.full() 若是隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 至關q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 至關q.put(item, False)
q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列爲空,再執行別的操做

1.6.六、簡單的queue實例:生產者-消費者模型

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 Queue隊列
 5 """
 6 import queue
 7 import threading
 8 
 9 
10 message = queue.Queue(10)
11 
12 def producer(i):
13     while True:
14         message.put(i)
15 
16 def consumer(i):
17     while True:
18         msg = message.get(i)
19 
20 for i in range(12):
21     t = threading.Thread(target=producer, args=(i, ))
22     t.start()
23 for i in range(10):
24     t = threading.Thread(target=consumer, args=(i, ))
25     t.start()

 

1.七、自定義線程池

1.7.一、方法一:簡單往隊列中傳輸線程數

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 """
 4 自定義線程池
 5 方法一:簡單往隊列中傳輸線程數
 6 """
 7 import threading
 8 import time
 9 import queue
10 
11 class ThreadingPool():
12     def __init__(self, max_num = 10):
13         self.queue = queue.Queue(max_num)
14         for i in range(max_num):
15             self.queue.put(threading.Thread)
16 
17     def getthreading(self):
18         return self.queue.get()
19 
20     def addthreading(self):
21         self.queue.put(threading.Thread)
22 
23 
24 def func(p, i):
25     time.sleep(1)
26     print(i)
27     p.addthreading()
28 
29 if __name__ == "__main__":
30     p = ThreadingPool()
31     for i in range(12):
32         thread = p.getthreading()
33         t = thread(target = func, args = (p, i))
34         t.start()

代碼執行結果:

 

1.7.二、方法二:往隊列中無限添加任務

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 """
  4 自定義線程池
  5 方法二:往隊列中無限添加任務
  6 """
  7 import queue
  8 import threading
  9 import contextlib
 10 import time
 11 
 12 StopEvent = object()
 13 
 14 class ThreadPool(object):
 15 
 16     def __init__(self, max_num):
 17         self.q = queue.Queue()
 18         self.max_num = max_num
 19 
 20         self.treminal = False
 21         self.generate_list = []
 22         self.free_list = []
 23 
 24     def run(self, func, args, callback=None):
 25         """
 26         線程池執行一個任務
 27         :param func: 任務函數
 28         :param args: 任務函數所需參數
 29         :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數:一、任務函數執行狀態;二、任務函數返回值(默認爲None,即不執行回調函數)
 30         :return:若是線程池已經終止,則返回True,不然爲None
 31         """
 32         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 33             self.generate_thread()
 34             w = (func, args, callback,)
 35             self.q.put(w)
 36 
 37     def generate_thread(self):
 38         """
 39         建立一個線程
 40         :param self:
 41         :return:
 42         """
 43         t = threading.Thread(target=self.call)
 44         t.start()
 45 
 46     def call(self):
 47         """
 48         循環去獲取任務函數並執行任務函數
 49         :return:
 50         """
 51         current_thread = threading.currentThread
 52         self.generate_list.append(current_thread)
 53 
 54         event = self.q.get()  #獲取線程
 55         while event != StopEvent: #判斷獲取的線程數不等於全局變量
 56             func, arguments, callback = event  #拆分元組, 獲取執行函數,參數, 回調函數
 57             try:
 58                 result = func(*arguments) #執行函數
 59                 status = True
 60 
 61             except Exception as e: #函數執行失敗
 62                 status = False
 63                 result = e
 64 
 65             if callback is not None:
 66                 try:
 67                     callback(status, result)
 68                 except Exception as e:
 69                     pass
 70 
 71             with self.work_state():
 72                 event = self.q.get()
 73 
 74         else:
 75             self.generate_list.remove(current_thread)
 76 
 77 
 78     def close(self):
 79         """
 80         關閉線程,給傳輸全局非元組的變量來進行關閉
 81         :return:
 82         """
 83         for i in range(len(self.generate_list)):
 84             self.q.put(StopEvent)
 85 
 86 
 87     def terminate(self):
 88         """
 89         忽然關閉線程
 90         :return:
 91         """
 92         self.terminal = True
 93         while self.generate_list:
 94             self.q.put(StopEvent)
 95         self.q.empty()
 96 
 97 
 98     def work_state(self):
 99         self.free_list.append(threading.current_thread)
100         try:
101             yield
102         finally:
103             self.free_list.remove(threading.currentThread)
104 
105 def work(i):
106     print(i)
107     return i + 1 #返回給回調函數
108 
109 def callback(ret):
110     print(ret)
111 
112 pool = ThreadPool(10)
113 for item in range(50):
114     pool.run(func=work, args=(item, ), callback=callback)
115 
116 pool.terminate()
相關文章
相關標籤/搜索