今日主要內容:安全
1.管道(Pipe) 數據接收一次就沒有了app
2.事件(Event)dom
3.基於事件的進程通訊異步
4.信號量(Semaphore)async
5. 進程池(重點)函數
6.進程池的同步方法和異步方法ui
7. 進程池的回調函數,( pool.apply_async(f1,args=(1,),callback=f2) ) ,回調函數是在異步進程中的函數spa
1.管道(Pipe) 數據接收一次就沒有了對象
Pipe() #建立管道,全雙工,返回管道的兩端,可是一端發送消息,只能是另外一端才能接受到,本身這一端是接收不到的進程
from multiprocessing import Process,Pipe
def f1(conn):
from_zhujincheng=conn.recv()# 此處接收能夠接收任何數據,並且不須要帶最大的傳輸的大小
print('我是子進程')
print('來自主進程的消息>>>',from_zhujincheng)
if __name__=='__main__':
conn1,conn2=Pipe() # 建立管道,全雙工,返回管道的兩端,可是一端發送的消息,只能另外一端接收,本身這一端是不能接受的
# 能夠將一端或者兩端發送給其餘的進程,那麼多個進程之間就能夠經過這一個管道進行通訊了 可是這樣會致使數據的不安全,容易致使程序錯亂
p1=Process(target=f1,args=(conn1,) )
p1.start()
conn2.send('你好啊,大兄嘚')
print('我是主進程')
2.事件(Event)
Event() #建立事件對象,這個對象的初始狀態爲False
from multiprocess import Process,Event
e=Event() # 建立事件對象,這個對象的初始狀態爲False
print('判斷e的狀態>>>' , e.is_set()) #判斷e當前的狀態
print('程序運行到這裏了')
e.set() # 這裏的做用是將對象的狀態改成True
e.clear() # 將e的狀態改成False
e.wait() # 若是程序中e的狀態爲False,那麼系統會阻塞在這裏,只有改成True的時候纔會繼續往下執行
print('進程走過了wait.')
3.基於事件的進程通訊
import time
from multiprocessing import Process,Event
def func(e):
time.sleep(1)
n=100
print('子進程計算結果爲',n)
e.set() # 將e的值修改成True
if __name__=='__main__':
e=Event() # 建立事件對象,初始狀態爲False
p=Process(target=func,args=(e,))
p.start()
print('主程序在等待中......')
e.wait() # 等待e的值變爲True
print('結果執行完畢,能夠拿到這個值')
4.信號量(Semaphore)
S = semphore(4),內部維護了一個計數器,acquire-1,release+1,爲0的時候,其餘的進程都要在acquire以前等待
S.acquire()
須要鎖住的代碼
S.release()
Semaphore(4) # 計數器4,acquire一次減一,若是semaphore的值爲0,其餘人等待,release加一 . 搶票啊啥的通常用這個,就是多個程序搶佔這4個cpu,若是4個都被佔用了,那麼他們剩餘的所有等待,只有當一我的執行完畢的時候,剩下的人才會繼續爭搶這個cpu.
import time
import random
from multiprocessing import Process,Semaphore
def func():
s.acquire() # 與進程鎖的用法相似
print(f'{i}號男嘉賓登場')
time.sleep(random.randint(1,3)) # 隨機
s.release() #與進程鎖的用法相似
if __name__=='__main__':
s=Semaphore(3) # 計數器3,每當acquire一次,計數器就減一,當release一次,計數器就加一. 若是計數器爲0,那麼意味着全部的計數器都被佔用,其餘的程序都要等待這3個程序,若是一個執行完畢後,剩下的才能夠繼續爭搶這個名額.
for i in range(10): # 建立了10個進程,
p=Process(target=func,args=(i,))
p.start()
5. 進程池(重點)
進程的建立和銷燬是頗有消耗的,影響代碼執行效率
進程池:
Map:異步提交任務,而且傳參須要可迭代類型的數據,自帶close和join功能
Res = Apply(f1,args=(i,)) #同步執行任務,必須等任務執行結束才能給進程池提交下一個任務,能夠直接拿到返回結果res
Res_obj = Apply_async(f1,args=(i,)) #異步提交任務,能夠直接拿到結果對象,從結果對象裏面拿結果,要用get方法,get方法會阻塞程序,沒有拿到結果會一直等待
Close : 鎖住進程池,防止有其餘的新的任務在提交給進程池
Join : 等待着進程池將本身裏面的任務都執行完
回調函數:
Apply_async(f1,args=(i,),callback=function) #將前面f1這個任務的返回結果做爲參數傳給callback指定的那個function函數
進程池的map用法 (Pool)
# 對比多進程和進程池的效率,統計進程池和多進程執行100個任務的時間
import time
from multiprocessing import Process,Pool
def func(i):
time.sleep(0.5)
for a in range(10):
n=n+i
if __name__=='__main__':
# 進程池執行100個任務的時間
s_time=time.time()# 記錄開始的時間
p=Pool(4) # 裏面的參數是指定進程池中有多少個進程用的,4表示4個進程,若是不傳參數,那麼默認開啓的進程數就是你電腦的cpu個數
p.map(func,range(100)) # map 2個參數第一個是你要執行的函數,第二個必須是可迭代的 ,異步提交任務,自帶join功能
e_time=time.time()# 記錄結束的時間
res_time=e_time-s_time
print('進程池執行命令所用時間>>>',res_time)
#多進程執行100個任務的時間
p_s_t=time.time()
p_list=[]
for i in range(100):
p=Process(target=func,args=(i,))
p.start()
p_list.append(p)
[pp.join() for pp in p_list]
p_e_t=time.time()
res_t = p_e_t - p_s_t
print('多進程執行命令所用的時間>>>',res_t)
# 結果
進程池執行命令所用時間:0.40s
多進程執行命令所用時間:9.24s
因此說進程池的時間比多進程的時間快了近十倍,因此......你懂個人意思吧,兄嘚
6.進程池的同步方法和異步方法
apply() 同步方法,將任務變成了串行
apply_async()異步方法
# 同步方法
import time
from multiprocessing import Process,Pool
def func(n):
time.sleep(1)
# print(n)
return n*n
if __name__=='__main__':
po=Pool(4)
for i in range(10):
print('你好啊,大哥')
res=po.apply(func,args=(i,)) #進程池的同步方法,將任務變成了串行
print(res)
#異步方法
import time
from multiprocessing import Process,Pool
def func():
pass
if __name__=='__main__':
po=Pool(4)
p_list=[]
for i in range(10):
print('你能每次都看到我嗎?')
res=po.apply_async(func,args=(i,)) #異步給進程池提交任務
p_list.append(res)
po.close()#鎖住進程池,意思就是不讓其餘的長鬚再往這個進程池裏面提交任務了
po.join() # 必須等待子程序執行完畢才能夠執行主程序
#打印結果,若是異步提交以後的結果對象
for i in p_list:
print(i.get())# 從對象中拿值須要用到get方法,get的效果是join的效果
# 主程序運行結束,進程池裏面的任務所有中止,不會等待進程池裏面的任務
print('主進程直接結束')
7. 進程池的回調函數,( pool.apply_async(f1,args=(1,),callback=f2) ) ,回調函數是在異步進程中的函數
所謂的回調函數其實指的是,在主進程中第一個函數算出的值,被回調函數把結果傳入到第二個函數中進行計算.
import os
from multiprocess import Process,Pool
def func(n):
s=n+1
return s**2
def func2(x):
print('回調函數中的結果>>>',x)
if __name__=='__main__':
po=Pool(4)
po.apply_async(func,args=(3,),callback=func2)
po.close()
po.join()
print('主進程的id',os.getpid())