python-45-管道與進程池的使用

前言

管道:能夠互相通訊、數據共享,但容易出現數據搶佔問題,能夠加鎖解決。python

進程池:每開啓進程,開啓屬於這個進程的內存空間;能提高計算機的效率,進程過多 操做系統的調度;安全

1、初識管道

一、初識管道,能夠互相通訊。app

# 一、初識管道,能夠互相通訊。
from multiprocessing import Pipe
conn1, conn2 = Pipe()
conn1.send('123')
print(conn2.recv())
conn2.send('321')
print(conn1.recv())

二、管道實現:生產者/消費者模型。dom

可是這裏可能會有個問題,消費者可能同時拿同一個數據,那怎樣纔好呢?(接收的時候上鎖:lock)異步

因此管道 + 鎖來控制操做管道的行爲 來避免進程之間爭搶數據形成的數據不安全現象。async

# 二、管道實現:生產者/消費者模型
from multiprocessing import Pipe,Process
import time,random
def consumer(con,pro,name):
    pro.close()
    while 1:
        try:
            m=con.recv()
            print('%s使用了 %s' % (name,m))
            time.sleep(random.random())
        except EOFError:
            con.close()
            break
def producer(con,pro,name,mask):
    con.close()
    for i in range(1,6):
        time.sleep(random.random())
        m='%s生產了%s %s'%(name,mask,i)
        print(m)
        pro.send(m)
    pro.close()
if __name__ == '__main__':
    con,pro=Pipe()
    p=Process(target=producer,args=(con,pro,'大廠','N95 '))
    p.start()
    c=Process(target=consumer,args=(con,pro,'A企業'))
    c.start()
    c1=Process(target=consumer,args=(con,pro,'B企業'))
    c1.start()
    con.close()
    pro.close()

 三、Manager,數據共享,但不安全的,進程之間也會搶佔資源。函數

但能夠加鎖進行約束解決。學習

# 三、Manager,數據共享不安全的,但會進程之間搶佔資源。
# 但能夠加鎖約束解決
from multiprocessing import Manager,Process,Lock
def func(dic,lock):
    # lock.acquire()
    dic['count']+=1
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    m=Manager()
    dic=m.dict({'count':0})
    p_lst=[]
    for i in range(50):
        p=Process(target=func,args=(dic,lock))
        p.start()
        p_lst.append(p)
    for i in p_lst:i.join()
    print('主進程:%s'%dic)

 加鎖後:數值一直是準確的。ui

2、進程池

  • python中的進程池,是先建立一個屬於進程的池子;
  • 進程池能指定能存放n個進程執行;

一、進程池與多進程效率對比:spa

一樣是執行100個進程,進程池每次處理5個,而多進程for循環處理。結果進程池效率賽過多進程。

from multiprocessing import Pool,Process
import time
def func(i):
    print(i)

if __name__ == '__main__':
    # 進程池的效果
    st=time.time()
    pool=Pool(5)                # 池子可放5個(通常CPU的個數+1)
    pool.map(func,range(100))   # 100個進程任務
    t1=time.time()-st
    st=time.time()
    print('進程池時間:',t1)

    # 原來多進程的效果
    p_lst=[]
    for i in range(100):
        p=Process(target=func,args=(i,))
        p_lst.append(p)
        p.start()
    for p in p_lst:p.join()
    t2=time.time()-st
    print('多進程時間:',t2)

 二、進程池傳多個參數:

#  二、進程池傳多個參數:
from multiprocessing import Pool
import time
def func(i):
    print(i)

if __name__ == '__main__':
    st=time.time()
    pool=Pool(5)
    pool.map(func,[(10,'name','age'),100])

 三、apply:同步

# 三、apply:同步
from multiprocessing import Pool
import time
def func():
    print('--開始~')
    time.sleep(0.1)
    print('==結束!'+'\n')
if __name__ == '__main__':
    pool=Pool()
    for i in range(5):
        pool.apply(func)       # 同步了

 四、apply_async:異步

配合close()、join()進行使用。有沒有發現進程池中的pid有重複的?那是由於進程池有固定的N個進程,因此不會變。

# 四、apply_async:異步
from multiprocessing import Pool
import time,os
def func(i):
    pid=os.getpid()
    print('%s--開始~:%s'%(i,pid))
    time.sleep(1)
if __name__ == '__main__':
    pool=Pool(2)
    for i in range(5):
        pool.apply_async(func,(i,))
    pool.close()        # 結束進程池接收任務
    pool.join()         # 感知進程池中的任務執行結束

 五、進程池的返回值:

  • apply:直接接收返回值
  • apply_async:須要get(),會堵塞因等待返回值,解決可先放列表get()
  • map:一次性返回全部返回值,自帶的
# 五、進程池的返回值
# apply:直接接收返回值
# apply_async:須要get(),會堵塞因等待返回值,解決可先放列表get()。
from multiprocessing import Pool
import time
def func(i):
    time.sleep(0.5)
    return i+1
if __name__ == '__main__':
    p=Pool(3)
    p_lst=[]

    # for i in range(10):
        # res=p.apply(func,args=(i,))        # 直接接收返回值
        # print(res)

    #     res=p.apply_async(func,args=(i,))   # apply_async
    #     p_lst.append(res)
    # for i in p_lst:print(i.get())

    res=p.map(func,range(10))               # map一次性返回 
    print(res)

①apply同步返回值:

②apply_async返回值:

因3個線程,因此每次打印3個信息。

③map返回值:

六、進程池回調函數:

  • 先執行異步函數func,將func返回值傳入func1函數中的ii參數。
  • 回調函數是在主進程中執行,而不是子進程中。
# 六、進程池回調函數
# 先執行異步函數func,將func返回值傳入func1函數中的ii參數。
# 回調函數是在主進程中執行,而不是子進程中。
from multiprocessing import Pool
def func(i):
    return i
def func1(ii):
    print('i+1=',ii+1)
if __name__ == '__main__':
    p=Pool()
    for i in range(5):
        p.apply_async(func,args=(i,),callback=func1)
    p.close()
    p.join()

小結:

  • 管道實現的代碼通常加上鎖Lock
  • p=Pool():實例化;
  • p.map(函數名,可迭代類型):默認異步的執行任務,且自帶close和join;
  • p.apply:同步調用;
  • p.apply_async:異步調用和主進程徹底異步,須要手動close和join;

歡迎來你們QQ交流羣一塊兒學習:482713805

相關文章
相關標籤/搜索