要想實現進程間通訊,能夠用管道或者隊列python
隊列比管道更好用(隊列自帶管道和鎖)編程
隊列特色:先進先出安全
堆棧特色:先進後出網絡
咱們採用隊列來實現進程間數據通訊,下面先介紹一下隊列多線程
基本方法:q.put(元素) q.get() q.get_nowait() q.full() q.empty()
併發
from multiprocessing import Process, Queue q = Queue(5) # 實例化出一個對象 # -------------------------------------- # q.put(元素) 往隊列裏放東西 # 若是隊列滿了還往裏面放,就會等在這裏 # -------------------------------------- # q.put(1) # q.put(2) # q.put(3) # -------------------------------------- # # q.full() 判斷隊列有沒有滿 # -------------------------------------- # print(q.full()) # q.full 判斷隊列有沒有滿 # # False # q.put(4) # q.put(5) # # q.put(6) # 若是隊列滿了還往裏面放,就會等在這裏 # print(q.full()) # # True for i in range(5): q.put(i) print(q.full()) # True # -------------------------------------- # q.get() 從隊列頭取一個值 # 若是隊列空了,就會等在這裏,等數據過來 # -------------------------------------- print(q.get()) print(q.full()) # 0 # False print(q.get()) print(q.get()) # print(q.get()) # -------------------------------------- # q.get_nowait() 從隊列頭取一個值 # 在隊列有數據的狀況下,與get取值同樣 # 當隊列沒有值的狀況下,取值直接報錯 # -------------------------------------- print(q.get_nowait()) # 在隊列有數據的狀況下,與get取值同樣,當隊列沒有值的狀況下,取值直接報錯 # -------------------------------------- # q.empty() 判斷隊列是否爲空 # 在併發的狀況下,這個方法不許確 # -------------------------------------- print(q.empty()) # 判斷隊列是否爲空,須要注意的是在併發的狀況下,這個方法不許確 print(q.get()) # 1 # 2 # 3 # False # 4 # print(q.get()) # 若是隊列空了,就會等在這裏,等數據過來 print(q.empty()) # True # print(q.get_nowait()) # 直接報錯 queue.Empty
數據的互通,可實現主進程與子進程之間的互通,子進程與子進程之間的互通
數據只有一份,取完就沒了,沒法重複獲取同一份數據app
from multiprocessing import Queue, Process def producer(q): q.put('hello baby.') def consumer(q): print(q.get()) if __name__ == '__main__': q = Queue() # 生成一個隊列對象 p1 = Process(target=producer, args=(q,)) c1 = Process(target=consumer, args=(q,)) p1.start() c1.start() # 子進程獲取到了另外一個子進程的數據 # hello baby. # print(q.get()) # 主進程獲取到了子進程的數據 # hello baby.
生產者:生產/製造數據的dom
消費者:消費/處理數據的異步
例子:作包子的,賣包子的 1.作的包子遠比買包子的多 2.作的包子遠比買包子的少 --> 供需不平衡
用處:解決供需不平衡的問題測試
from multiprocessing import Process, Queue import time import random def producer(name, food, q: Queue): for i in range(10): data = f'{name} 生產了 {food}{i}' time.sleep(random.random()) q.put(data) print(data) def consumer(name, q): while True: res = q.get() if not res: # 已經把生產者作的東西所有吃完了,那麼本消費者也結束食用 break data = res.split(' ')[2] data = f'{name} 吃了 {data}' print(data) time.sleep(random.random()) if __name__ == '__main__': q = Queue() p = Process(target=producer, args=('大廚egon', '饅頭', q)) p2 = Process(target=producer, args=('跟班tank', '生蠔', q)) c = Process(target=consumer, args=('jason', q)) c2 = Process(target=consumer, args=('吃貨kevin', q)) p.start() p2.start() c.start() c2.start() # 不知道何時生產者何時生成完 p.join() p2.join() q.put(None) # 經過 None來標誌生產者已生產完成 q.put(None) # 能夠實現,可是很差
改用JoinableQueue模塊的隊列
與守護進程
來實現
from multiprocessing import Process, JoinableQueue import time import random def producer(name, food, q: JoinableQueue): for i in range(10): data = f'{name} 生產了 {food}{i}' time.sleep(random.random()) q.put(data) print(data) def consumer(name, q): while True: res = q.get() if not res: break data = res.split(' ')[2] data = f'{name} 吃了 {data}' print(data) time.sleep(random.random()) q.task_done() # 告訴隊列,你已經從隊列中取出了一個數據,而且處理完畢了 if __name__ == '__main__': q = JoinableQueue() p = Process(target=producer, args=('大廚egon', '饅頭', q)) p2 = Process(target=producer, args=('跟班tank', '生蠔', q)) c = Process(target=consumer, args=('jason', q)) c2 = Process(target=consumer, args=('吃貨kevin', q)) p.start() p2.start() c.daemon = True # 配合join,結束程序消費者也結束(注意join是主進程的最後一句代碼) c.start() c2.daemon = True c2.start() # 不知道何時生產者何時生成完 p.join() p2.join() q.join() # 等待隊列中數據所有取出,執行完了這句話,也就意味着隊列中沒有數據了(消費者那裏仍是會卡住,get不到東西等待) # 配合上 守護進程 來實現....
進程和線程其實都是虛擬單位,都是用來幫助咱們形象的描述某種事物
進程:資源單位(一塊獨立的內存空間)
線程:執行單位
將內存比喻成工廠,那麼進程就至關於工廠裏的車間,而你的線程就至關因而車間裏面的流水線
CPU其實運行的實際上是線程,進程只是資源單位
線程執行時須要的資源單位都跟進程要
ps:每一個進程都自帶一個線程,線程纔是真正的執行單位,進程只是在線程運行過程當中提供代碼運行所須要的資源
每一個進程都會自帶一個線程
線程沒有主次之分,只不過咱們默認就把主進程自帶的那個線程叫作主線程
ps:開啓線程的開銷要遠遠小於開啓進程的開銷(可能剛執行完建立線程的代碼線程就建立好了)
from threading import Thread import time def task(name): print(f"{name} is running") time.sleep(3) print(f"{name} is over") t = Thread(target=task, args=('egon', )) # 開線程不須要在 __main__ 代碼塊內,可是習慣性的仍是寫在 __main__ 內 t.start() # 告訴操做系統開啓一個線程 # 線程的開銷遠遠小於進程,小到以致於能夠代碼執行完,線程就已經開啓了 print("主") # 線程沒有主次之分,都在同一個進程的名稱空間裏,只是人爲把進程自帶的線程叫作主線程 # egon is running # 主線程 # 進程的時候這個主線程可能會是最早打印的 # egon is over
from threading import Thread import time class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print(f"{self.name} is running") time.sleep(1) print(f"{self.name} is over") if __name__ == '__main__': t = MyThread('jason') t.start() # 開啓線程的速度很是快,幾乎代碼執行完線程就已經開啓 print("主") # jason is running # 主 # jason is over
from threading import Thread money = 666 def task(): global money money = 999 t = Thread(target=task) t.start() t.join() # 確保是線程運行結束後 print(money) # 999 # 主線程與子線程之間數據是通用的
線程間想要實現數據通訊,不須要藉助於隊列(線程間支持數據通訊)
import time from threading import Thread, active_count, current_thread import os def task(name): print(f"{name} is running {os.getpid()}") # # ------------------------------------------------ # # current_thread().name current_thread().getname() 當前線程名 # # 記得導入模塊 # # ------------------------------------------------ # print(f"current_thread().name:{current_thread().name}") # current_thread().name:Thread-1 time.sleep(1) print(f"{name} is over") # t = Thread(target=task, args=('jason', )) # t.start() # # ------------------------------------------------ # # os.getpid() os.getppid() 獲取進程號 父進程號 # # 多個線程屬於同一進程 # # ------------------------------------------------ # print(f"pid {os.getpid()}") # # jason is running 5572 # # pid 5572 # # jason is over t = Thread(target=task, args=('jason', )) t.start() # ------------------------------------------------ # active_count() 統計當前存活的線程數 # 記得導入模塊 # ------------------------------------------------ print(active_count()) print(f"pid {os.getpid()}") # jason is running 5728 # 2 # pid 5728 print(f"主 current_thread().name:{current_thread().name}") # 主 current_thread().name:MainThread t.join() # 主線程等待子線程運行結束 # jason is over print("主 active_count", active_count()) # 可能會有問題,多線程是異步,可能join的線程結束了,其餘線程也正好結束了(多個線程時) # 主 active_count 1 # Thread.join(t) # 能夠考慮用類調用對象方法,傳入對象來在循環裏對線程對象進行操做
主線程要等待全部非守護線程結束後纔會結束(不是主線程的代碼執行完了就立馬結束了)
主線程結束後,守護(子)線程也會當即結束
主線程的結束也就意味着進程的結束
主線程必須等待其餘非守護線程的結束才能結束
由於子線程在運行的時候須要使用進程中的資源,而主線程一旦結束了,資源也就銷燬了
# from threading import Thread, current_thread # import time # # # def task(i): # print(f"{current_thread().name}") # time.sleep(i) # print("GG") # # # for i in range(3): # t = Thread(target=task, args=(i, )) # t.start() # # # print("主") # # 循環的時候就已經打印了部分數據了(異步) # # Thread-1 # # GG # # Thread-2 # # Thread-3 # # 主 # # GG # # GG # 主線程運行結束以後爲何須要等待子線程結束才能結束呢? ''' 主線程的結束也就意味着進程的結束 主線程必須等待其餘非守護線程的結束才能結束 由於子線程在運行的時候須要使用進程中的資源,而主線程一旦結束了,資源也就銷燬了 ''' from threading import Thread, current_thread import time def task(i): print(f"{current_thread().name}") time.sleep(i) print("GG") for i in range(3): t = Thread(target=task, args=(i,)) t.daemon = True t.start() print("主") # Thread-1 # GG # Thread-2 # Thread-3 # 主
下面程序的執行結果是什麼?
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon = True t1.start() t2.start() print("main-------") # 123 # 456 # main------- # end123 # end456
從線程間通訊那裏的案例能夠看出,線程間數據是相通的,那麼多個線程對同一份數據進行操做會產生問題
下面一樣模擬一個網絡延遲來對數據進行操做(確保全部線程都執行完的操做能夠記一下)
# 模擬網絡延遲的現象 # 多個線程操做同一個數據,也會形成數據不安全 import time from threading import Thread n = 10 def task(): global n tmp = n time.sleep(1) n = tmp - 1 # ------------------------------- t_list = [] for i in range(10): t = Thread(target=task) t.start() t_list.append(t) # 確保其餘線程都執行完了以後再打印 for t in t_list: t.join() # ------------------------------- print(n) # 9
# 加鎖解決問題 import time from threading import Thread, Lock n = 10 def task(mutex): mutex.acquire() # 搶鎖 global n tmp = n time.sleep(1) n = tmp - 1 mutex.release() # 釋放鎖 t_list = [] mutex = Lock() for i in range(10): t = Thread(target=task, args=(mutex, )) t.start() t_list.append(t) # 確保其餘線程都執行完了以後再打印 for t in t_list: t.join() print(n) # 0 # 等10s多點 後打印出結果,數據未受延遲影響,保證了數據安全
雖然互斥鎖也是將併發改爲串行,犧牲效率來保證數據安全,這一點
線程對象.join()
也能夠實現將併發改爲串行,一樣保證數據安全,但線程對象.join()
是將每個線程的運行都變成串行的,對比互斥鎖的只將數據操做部分編程串行消耗的時間要多得多,若果線程耗時長,執行效率就會低的可怕
# # 不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全 # from threading import current_thread, Thread, Lock # import os # import time # # # def task(): # # 未加鎖的代碼併發運行 # time.sleep(3) # print('%s start to run' % current_thread().getName()) # global n # # 加鎖的代碼串行運行 # lock.acquire() # temp = n # time.sleep(0.5) # n = temp - 1 # lock.release() # # # if __name__ == '__main__': # n = 100 # lock = Lock() # threads = [] # start_time = time.time() # for i in range(100): # t = Thread(target=task) # threads.append(t) # t.start() # for t in threads: # t.join() # stop_time = time.time() # print('主:%s n:%s' % (stop_time - start_time, n)) # # ''' # Thread-3 start to run # Thread-1 start to run # ...... # Thread-100 start to run # Thread-96 start to run # 主:53.06105661392212 n:0 # ''' # 利用 join 保證數據安全 from threading import current_thread, Thread, Lock import os import time def task(): time.sleep(3) print('%s start to run' % current_thread().getName()) global n temp = n time.sleep(0.5) n = temp - 1 if __name__ == '__main__': n = 100 lock = Lock() start_time = time.time() for i in range(100): t = Thread(target=task) t.start() t.join() stop_time = time.time() print('主:%s n:%s' % (stop_time - start_time, n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.1616487503052 n:0 # 耗時是多麼的恐怖 '''
線程和進程的用戶大同小異,能夠對比着來記
後續能夠畫圖或表格用對比的方式來整理一下,方便記憶~