一、系統進行資源分配和調度的基本單位,一個具備必定獨立功能的程序關於某個數據集合的一次運行活動;python
二、它是一個動態的概念,一個活動的實體;算法
an instance of a computer program that is being executed
即正在運行的程序的實例化對象。注:其概念的關鍵點在於安全
1)、進程是一個實體(動態的),具備本身獨立的地址空間,包括: 文本區域(text region):存儲處理器執行的代碼; 數據區域(data region):存儲變量與進程執行期間使用的動態分配的內存; 堆棧(stack region):存儲的是程序執行過程當中調用的指令與本地變量; 注:正是因爲每一個進程是一個獨立的實體,其中以上所述的三個區域,即每一個進程的數據區域以及堆棧是獨立的,相互隔離的,因此在多進程中能夠保證數據的安全性
2)、編寫完的代碼,沒有運行時,稱爲程序, 正在運行的代碼,稱爲進程 程序是死的(靜態的),進程是活的(動態的)
三、進程的三大狀態bash
如圖所示
服務器
導入multiprocessing模塊中的Process類
以供後續建立類的時候直接調用併發
p = Process(target = func, name = process01, args=(5,))
實例化進程對象app
- target = func 表示調用對象,即子進程要執行的任務 func
- args 表示任務 func 的位置參數元組,args=(5, )
- name = process01 爲子進程的名稱
- p.start( ): 啓動進程,並調用該子進程中的p.run( )
- p.run( ): 進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要寫入該方法
- p.terminate( ): 強制終止進程p,不會進行任何清理操做
- p.is_alive( ): 若是p仍然運行,返回True。用來判斷進程是否還在運行
- p.join([timeout]): 主進程等待子進程p終止,timeout是可選的等待時間
# 主進程速度快於子進程,join方法可使得子進程執行結束後,再繼續執行主進程中的代碼,能夠用來同步代碼的一致性 import multiprocessing def func(): print("發送第一份郵件") if __name__ == "__main__": p = multiprocessing.Process(target=func) p.start() p.join() print("發送第二份郵件") # 發送第一份郵件 # 發送第二份郵件
# 多個子進程配合 join 方法實現異步併發 import multiprocessing def func(index): print(f"發送第{index}封郵件") if __name__ == "__main__": process_list = [] for i in range(10): p = multiprocessing.Process(target=func, args=(i, )) p.start() process_list.append(p) # p.join() 程序會變成同步阻塞 for i in process_list: i.join() # 異步併發 print("主進程發最後一封郵件!")
- name: 當前進程實例別名, 默認爲Process-N, N爲從1開始遞增的整
數- pid: 當前進程實例的ID值
# 建立進程的方法一: # 利用multiprocessing模塊提供一個Process類來建立一個進程對象 from multiprocessing import Process import time def func(n): while n > 0: print(n) time.sleep(3) n -= 1 if __name__ == "__main__": p = Process(target = func, args=(5,)) p.start() p.join()
# 建立進程的方法二: # 建立新的進程能夠自定義一個類去繼承Process類,每次實例化這個類的時候,就等同於實例化一個進程對象 import multiprocessing import time class ClockProcess(multiprocessing.Process): def run(self): n = 5 while n > 0: print(n) time.sleep(3) n -= 1 if __name__ == "__main__": p = ClockProcess() p.start() p.join()
- 守護 主進程 時,若是主進程執行結束了,意味着守護進程的壽命馬上終止.馬上殺死
- 語法:
- 進程對象.daemon = True 設置當前進程爲守護進程
- 必須寫在start( )調用進程以前進行設置
- 默認狀況下,主進程會等待全部子進程執行完畢以後,關閉程序,釋放資源。若不等待,子進程並不方便管理,容易形成殭屍進程,在後臺不停的佔用系統的資源(cpu和內存),不清楚進程的來源。
- 守護主進程即在主進程代碼執行結束以後,無需等待子進程執行,當即殺死程序
import multiprocessing def func(): print("start 當前子進程") print("end 當前子進程") if __name__ == "__main__": p = multiprocessing.Process(target=func) p.daemon = True p.start() print("主進程執行結束 ... ") # 主進程執行結束 ...
多個子進程下,未守護主進程,主進程仍會等待子進程執行結束dom
- 守護進程的實際用途:監控報活
import time # 監控報活 def alive(): while True: print("給監控服務器發消息, 當前5號服務器功能正常 i am ok ~") time.sleep(1) # 當前服務器正常完成的功能 def func(): time.sleep(5) print("當前5號服務器功能,統計財務報表~") if __name__ == "__main__": p1 = Process(target=func) p2 = Process(target=alive) # 守護p2進程 p2.daemon = True p1.start() p2.start() # 等待p1子進程執行結束以後,下面的主程序的代碼纔會放行; p1.join() # 未守護主進程,主進程會默認等待 print("當前服務器狀態:統計財務報表功能異常.....") # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 當前5號服務器功能,統計財務報表~ # 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~ # 當前服務器狀態:統計財務報表功能異常.....
# 手動建立 from multiprocessing import Process num = 1 def run1(): global num num += 5 print("子進程1運行中,num = %d" % (num)) def run2(): global num num += 10 print("子進程2運行中,num = %d" % (num)) if __name__ == "__main__": print("父進程啓動") p1 = Process(target=run1) p2 = Process(target=run2) print("子進程將要執行") p1.start() p2.start() p1.join() p2.join() print("子進程結束")
# 藉助舊版進程池建立多進程 from multiprocessing import Pool import random import time def work(num): print(random.random() * num) time.sleep(3) if __name__ == "__main__": # 實例化進程池對象,設置同一時間內最多能夠執行的進程數爲3個 # 題中的10個任務都由進程池中的這三個進程輪詢執行,不會建立額外 的進程數 # 若不指定則同一時間內能夠執行的進程個數默認爲cpu邏輯核心數 p = Pool(3) for i in range(10): # apply_async 選擇要調用的任務,每次循環出來的任務會用閒下來的子進程去執行 # 使⽤⾮阻塞⽅式調⽤func(並⾏執⾏,阻塞⽅式必須爲等待上⼀個進程退出後才能執⾏下⼀個進程), args爲傳遞給func的參數列表,kwargs爲傳遞給func的關鍵字參數列表; p.apply_async(work, (i,)) # 進程池關閉以後不會再接受新的請求 p.close() # 等待進程池中的全部子進程都結束 p.join() # 多進程中,主進程通常用來等待子進程執行完畢,真正的任務都由子進程中執行
# 藉助新版進程池建立多進程 from concurrent.futures import ProcessPoolExecutor import os import time def func(i): print("任務執行中... start", os.getpid()) time.sleep(10) print("任務結束... end", i) return i # ProcessPoolExecutor 進程池基本使用 """ 默認若是一個進程短期內能夠完成更多的任務,就不會建立額外的新的進程,以節省資源 """ if __name__ == "__main__": lst = [] print(os.cpu_count()) # cpu邏輯核心數 # 建立進程池對象 """進程池中默認最多建立cpu這麼多個進程,全部任務全由這幾個進程完成,不會額外建立進程""" p = ProcessPoolExecutor() # 異步提交任務 for i in range(10): res = p.submit(func, i) lst.append(res) # 獲取當前進程池返回值 for i in lst: print(i.result()) # 等待全部子進程執行結束 p.shutdown() # join print("主程序執行結束....")
進程間數據不共享,他們之間進行數據傳遞即爲通訊異步
from multiprocessing import Queue
async
藉助進程隊列
Queue
完成進程間的通訊
- 消息隊列遵循 先進先出 的原則
- 初始化Queue()對象時(q=Queue()),若括號中沒有指定最⼤可接收
的消息數量, 或數量爲負值, 那麼就表明可接受的消息數量沒有上限
q = Queue()
q.put(item, [block[, timeout]])
:將item消息寫⼊隊列- block 默認值爲True
- 若是block 使⽤默認值,且沒有設置timeout(單位秒)時,若消息列隊已經沒有空間可寫⼊,此時程序將被阻塞(停在寫⼊狀態) ,直到從消息列隊騰出空間爲⽌,若是設置了True和timeout,則會等待timeout秒,若還沒空間,則拋 出"q.Full"的異常信息
- 若是block值爲False, 消息列隊若是出現沒有空間可寫⼊的狀況, 則會⽴刻拋出"q.Full"滿了異常
q.put_nowait(item)
: 至關q.put(item, False)
;
q.get([block[, timeout]])
:獲取隊列中的⼀條消息, 而後將其從列隊中移除block默認值爲True
若是block使⽤默認值,且沒有設置timeout(單位秒),消息列隊若是爲空, 此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息爲⽌,
若是設置了timeout, 則會等待timeout秒, 若還沒讀取到任何消息, 則拋
出"q.Empty"異常若是block值爲False,消息列隊若是爲空,則會⽴刻拋出「q.Empty」空的異常
q.get_nowait()
:至關q.get(False)
- q = Queue()
- q.qsize(): 返回當前隊列包含的消息數量
- q.empty(): 若是隊列爲空, 返回True, 反之False
- q.full(): 若是隊列滿了, 返回True,反之False
from multiprocessing import Queue, Process import time def write(q): for value in ["a", "b", "c"]: print("開始寫入:", value) q.put(value) time.sleep(2) def read(q): while True: if not q.empty(): print("讀取到的是", q.get()) time.sleep(2) else: break if __name__ == "__main__": q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pw.join() #等待接收完畢 pr.start() pr.join() print("接受完畢!")
# 三個進程間通訊 from multiprocessing import Process from multiprocessing import Queue def func1(q1): q1.put("你好!") print(f"子進程p1往隊列q1中放入的數據爲:你好!") def func2(q1, q2): msg = q1.get() print(f"子進程p2從隊列q1中取出的數據爲:{msg}") q2.put(msg) print(f"子進程p2往隊列q2中放入的數據爲:{msg}") def func3(q2): msg = q2.get() print(f"子進程p3從隊列q2中取出的數據爲:{msg}") if __name__ == "__main__": q1 = Queue() q2 = Queue() p1 = Process(target=func1, args=(q1,)) p2 = Process(target=func2, args=(q1, q2)) p3 = Process(target=func3, args=(q2,)) p1.start() p2.start() p3.start()
# put 存儲 # get 獲取 # task_done 隊列計數減1 # join 阻塞 # task_done 配合 join 一塊兒使用 # [1,2,3,4,5] # 隊列計數5 # put 一次 每存放一個值,隊列計數器加1 # get 一次 經過task_done讓隊列計數器減1 # join 函數,會根據隊列中的計數器來斷定是阻塞仍是放行 # 若是計數器變量是0,意味着放行,其餘狀況阻塞; from multiprocessing import Process,JoinableQueue jq = JoinableQueue() # put 會讓隊列計數器加1 jq.put("a") print(jq.get()) # 經過task_done,讓隊列計數器減1 jq.task_done() # 只有隊列計數器是0的時,纔會放行 jq.join() # 隊列.join print("finish")
# 消費者模型 def consumer(q, name): while True: food = q.get() if food is None: break time.sleep(random.uniform(0.1, 1)) print("%s 吃了一個%s" % (name, food)) # 生產者模型 def producer(q, name, food): for i in range(5): time.sleep(random.uniform(0.1, 1)) print("%s 生產了 %s%s" % (name, food, i)) q.put(food + str(i)) if __name__ == "__main__": q = Queue() # 消費者1 p1 = Process(target=consumer, args=(q, "張三")) p1.start() # 消費者2 a2 = Process(target=consumer, args=(q, "李四")) a2.start() # 生產者1 p2 = Process(target=producer, args=(q, "王五", "黃金")) p2.start() # 生產者2 b2 = Process(target=producer, args=(q, "小明", "鑽石")) b2.start() # 在生產完全部的數據以後,在隊列的末尾塞入一個None p2.join() b2.join() # 消費者模型若是獲取的是None,表明中止消費 q.put(None) q.put(None)
from multiprocessing import Process,JoinableQueue # 消費者模型 def consumer(q, name): while True: food = q.get() time.sleep(random.uniform(0.1, 1)) print("%s 吃了一個%s" % (name, food)) q.task_done() # 生產者模型 def producer(q, name, food): for i in range(5): time.sleep(random.uniform(0.1, 1)) print("%s 生產了 %s%s" % (name, food, i)) q.put(food + str(i)) if __name__ == "__main__": q = JoinableQueue() # 消費者1 p1 = Process(target=consumer, args=(q, "張三")) p1.daemon = True p1.start() # 生產者1 p2 = Process(target=producer, args=(q, "李四", "黃金")) p2.start() # 把生產者全部的數據都裝載到隊列中 p2.join() # 當隊列計數器減到0的時候,會馬上放行 # 必須等待消費者模型中全部的數據都task_done以後,變成0了就表明消費結束. q.join() print("程序結束....")
from multiprocessing import Manager, Pool import time def write(q): for i in "welcome": print("開始寫入", i) q.put(i) def read(q): time.sleep(2) for i in range(q.qsize()): # q.qsize()獲取到當前隊列的消息數量! print("獲得消息", q.get()) if __name__ == "__main__": print("主進程啓動!") q = Manager().Queue() po = Pool() po.apply_async(write, (q,)) po.apply_async(read, (q,)) po.close() po.join()