(一)生產與消費者模式數據庫
⽣產者消費者模式是經過⼀個容器來解決⽣產者和消費者的強耦合問題。設計模式
⽣產者和消費者彼此之間不直接通信,⽽經過阻塞隊列來進⾏通信,因此⽣產 者⽣產完數據以後不⽤等待消費者處理,直接扔給阻塞隊列,消費者不找⽣產者要數據,⽽是直接從阻塞隊列⾥取,阻塞隊列就至關於⼀個緩衝區,平衡了⽣產者和消費者的處理能⼒。緩存
這個阻塞隊列就是⽤來給⽣產者和消費者解耦的,縱觀⼤多數設計模式,都會找⼀個第三者出來進⾏解耦。多線程
在線程世界⾥,⽣產者就是⽣產數據的線程,消費者就是消費數據的線程。app
在多線程開發當中,若是⽣產者處理速度很快,⽽消費者處理速度很慢,那 麼⽣產者就必須等待消費者處理完,才能繼續⽣產數據。一樣的道理,若是 消費者的處理能⼒⼤於⽣產者,那麼消費者就必須等待⽣產者。異步
爲了解決這 個問題因而引⼊了⽣產者和消費者模式。async
實例演示:函數
1 import threading 2 from queue import Queue 3 import time 4 5 6 class Producer(threading.Thread): 7 def run(self): 8 count = 0 9 while True: 10 #若是緩存隊列中小於1000個元素,就生產100個,不然就休息 11 if queue.qsize() <= 1000: 12 for i in range(100): 13 count = count + 1 14 msg = '生成產品' + str(count) 15 print(msg) 16 queue.put(msg) 17 time.sleep(0.5) 18 19 20 class Consumer(threading.Thread): 21 def run(self): 22 while True: 23 #若是緩存隊列中大於100個元素,就消耗50個,不然休息 24 if queue.qsize() >= 100: 25 for i in range(50): 26 msg = self.name + '消耗' + queue.get() 27 print(msg) 28 time.sleep(1) 29 30 31 32 if __name__ == "__main__": 33 #創建一個緩衝隊列 34 queue = Queue() 35 36 #初始化隊列 37 for i in range(100): 38 msg = '初始產品' + str(i) 39 queue.put(msg) 40 41 #定義兩個生產者 42 for i in range(2): 43 p = Producer() 44 p.start() 45 46 #定義五個消費者 47 for i in range(5): 48 c = Consumer() 49 c.start() 50 51 52 》》》輸出: 53 生成產品1 54 生成產品2 55 生成產品3 56 生成產品4 57 生成產品5生成產品1 58 生成產品6 59 60 生成產品2 61 生成產品3生成產品7 62 生成產品8 63 生成產品4 64 。。。。。
對於Queue的說明:spa
1. 對於Queue,在多線程通訊之間扮演重要的⻆⾊線程
2. 添加數據到隊列中,使⽤put()⽅法
3. 從隊列中取數據,使⽤get()⽅法
4. 判斷隊列中是否還有數據,使⽤qsize()⽅法
(二)ThreadLocal
在多線程環境下,每一個線程都有⾃⼰的數據。
⼀個線程使⽤⾃⼰的局部變量⽐使⽤全局變量好,由於局部變量只有線程⾃⼰能看⻅,不會影響其餘線程,⽽全局變量的修改必須加鎖。
若是要在一個函數中將該函數的局部變量傳給另一個函數,咱們能夠怎麼辦呢?能夠採用全局字典:
⽤⼀個全局dict存放全部的Student對象,而後以thread⾃身做爲key得到線程對應的Student對象
1 import threading 2 3 global_dic = {} 4 5 6 def work(name): 7 std = Student(name) 8 #把std放在全局字典裏面,這樣即便不一樣線程同時運行,也不會覆蓋掉全局變量的修改 9 global_dic[threading.current_thread()] = std 10 f1() 11 f2() 12 13 14 def f1(): 15 #不傳入std,直接從全局字典中訪問便可 16 std = global_dic[threading.current_thread()] 17 ... 18 19 20 def f2(): 21 #任何函數均可以直接查看到當前進程的std 22 std = global_dic[threading.current_thread()] 23 ...
這種⽅式理論上是可⾏的,它最⼤的優勢是消除了std對象在每層函數中的傳遞問題,可是每一個函數獲取std的代碼有點low。 有沒有更簡單的⽅式?
1 import threading 2 3 #建立全局的TL對象 4 local_school = threading.local() 5 6 7 def process_student(): 8 #獲取當前線程關聯的student 9 std = local_school.student 10 print('%s (in %s)'%(std, threading.current_thread().name)) 11 12 13 def process_thread(name): 14 #綁定ThreadLocal的student: 15 local_school.student = name 16 process_student() 17 18 19 if __name__ == "__main__": 20 t1 = threading.Thread(target=process_thread, args=('LaoWang',), name='T-1') 21 t2 = threading.Thread(target=process_thread, args=('XiaoWang',), name='T-2') 22 t1.start() 23 t2.start() 24 t1.join() 25 t2.join() 26 27 28 》》》輸出: 29 LaoWang (in T-1) 30 XiaoWang (in T-2)
全局變量local_school就是⼀個ThreadLocal對象,每一個Thread對它均可以讀寫student屬性,但互不影響。
你能夠把local_school當作全局變量,但每一個屬性如local_school.student都是線程的局部變量,能夠任意讀寫⽽互不⼲擾,也不⽤管理鎖的問題,ThreadLocal內部會處理。
能夠理解爲全局變量local_school是⼀個dict,不但能夠⽤ local_school.student,還能夠綁定其餘變量,如local_school.teacher等等。
ThreadLocal最常⽤的地⽅就是爲每一個線程綁定⼀個數據庫鏈接,HTTP請求,⽤戶身份信息等,這樣⼀個線程的全部調⽤到的處理函數均可以⾮常⽅便地訪問這些資源。
(三)異步
咱們再來看看同步和異步:
接下來,咱們用一段代碼來講明異步的思想:
1 from multiprocessing import Pool 2 import time 3 import os 4 5 6 def test(): 7 print('---進程池中的進程--pid=%d,ppid=%d'%(os.getpid(),os.getppid())) 8 for i in range(3): 9 print('--%d--'%i) 10 time.sleep(1) 11 return 'HAHA'#返回的值將被傳入回調函數 12 13 14 def test2(args): 15 print('---callback func --pid=%d'%os.getpid()) 16 print('---callback func --args=%s'%args) 17 18 19 if __name__ == '__main__': 20 pool = Pool(3) 21 pool.apply_async(func=test, callback=test2) 22 23 while True: 24 time.sleep(1) 25 print("---主進程-pid=%d---"%os.getpid()) 26 27 28 》》》輸出: 29 ---進程池中的進程--pid=13368,ppid=18320 30 --0-- 31 ---主進程-pid=18320--- 32 --1-- 33 ---主進程-pid=18320--- 34 --2-- 35 ---主進程-pid=18320--- 36 ---callback func --pid=18320 37 ---callback func --args=HAHA 38 ---主進程-pid=18320--- 39 ---主進程-pid=18320--- 40 ---主進程-pid=18320--- 41 ---主進程-pid=18320--- 42 ---主進程-pid=18320---