day 035 管道 和數據共享

管道安全

 

#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 #其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收   
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。

 

管道報錯

主進程將管道的兩端都傳送給子進程,子進程和主進程共用管道的兩種報錯狀況,都是在recv接收的時候報錯的:併發

    1.主進程和子進程中的管道的相同一端都關閉了,出現EOFError;app

    2.若是你管道的一端在主進程和子進程中都關閉了,可是你還用這個關閉的一端去接收消息,那麼就會出現OSError;異步

 

    因此你關閉管道的時候,就容易出現問題,須要將全部只用這個管道的進程中的兩端所有關閉才行。固然也能夠經過異常捕獲(try:except EOFerror)來處理。async

    雖然咱們在主進程和子進程中都打印了一下conn1一端的對象,發現兩個再也不同一個地址,可是子進程中的管道和主進程中的管道仍是能夠通訊的,由於管道是同一套,系統可以記錄。    函數

 

    咱們的目的就是關閉全部的管道,那麼主進程和子進程進行通訊的時候,能夠給子進程傳管道的一端就夠了,而且用咱們以前學到的,信息發送完以後,再發送一個結束信號None,那麼你收到的消息爲None的時候直接結束接收或者說結束循環,就不用每次都關閉各個進程中的管道了。spa

數據共享:操作系統

Manager模塊介紹線程

多進程共同去處理共享數據的時候,就和咱們多進程同時去操做一個文件中的數據是同樣的,不加鎖就會出現錯誤的結果,進程不安全的,因此也須要加鎖code

 

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()    
    with Manager() as m:
        dic=m.dict({'count':100})   #字典形式
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

 

進程池和multiprocess Pool 模塊

建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務(高級一些的進程池能夠根據你的併發量,搞成動態增長或減小進程池中的進程數量的操做),不會開啓其餘進程,提升操做系統效率,減小空間的佔用等。

定義池子將建立定量的進程:去完成任務,3個出去再回到池子中但進程不關閉,在出去執行任務,直到將任務執行完成結束

進程池的主要方法:

p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。  獲得的是返回結果也能夠不返回結果
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
   
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
 
進程池的實例:
進程池的效比進程的效率高不少
map方法是異步執行而且自帶close和join方法
from multiprocessing import Process,Pool
import time
def func(n):
    for i in range(5):
        n=n=i
    print(n)
if __name__ == '__main__':
    pool_start_time=time.time()
    pool= Pool(4)
    pool.map(func,range(100))
    pool_end_time=time.time()
    pool_dif_time=pool_end_time-pool_start_time
    p_s_time=time.time()
    p_list=[]
    for i in range(100):
        p2=Process(target=func,args=(i,))
        p2.start()
        p_list.append(p2)
    [p.join() for p in p_list]
    p_e_time=time.time()
    p_dif_time=p_e_time-p_s_time
    print("進程池時間",pool_dif_time)
    print("多進程時間",p_dif_time)

進程池的同步異步:

同步:

from multiprocessing import Process,Pool
import time
def func(i):
    time.sleep(1)
    return i**2
if __name__ == '__main__':
    p=Pool(4)
    for i in range(10):
        res=p.apply(func,args=(i,))   #同步的方法多個進程完成任務  能夠獲得返回值也能夠不提取返回值
        print(res)

異步:

from multiprocessing import Process,Pool
import time
import os
def func(i):
    time.sleep(2)
    # print(os.getpid())
    return i**2
if __name__ == '__main__':
    p=Pool(4)
    res_list=[]
    for i in range(10):
        res=p.apply_async(func,args=(i,))   #異步的方法  獲得返回值
        res_list.append(res)
    for i in res_list:
        print(i.get())

回調函數:

進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,可是咱們也能夠經過進程通訊來拿到返回值,進程池的這個回調也是進程通訊的機制完成的。

import os
from multiprocessing import Pool

def func1(n):
    print('func1>>',os.getpid())
    print('func1')
    return n*n

def func2(nn):
    print('func2>>',os.getpid())
    print('func2')
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == '__main__':
    print('主進程:',os.getpid())
    p = Pool(5)
    #args裏面的10給了func1,func1的返回值做爲回調函數的參數給了callback對應的函數,不能直接給回調函數直接傳參數,他只能是你任務函數func1的函數的返回值
    # for i in range(10,20): #若是是多個進程來執行任務,那麼當全部子進程將結果給了回調函數以後,回調函數又是在主進程上執行的,那麼就會出現打印結果是同步的效果。咱們上面func2裏面註銷的時間模塊打開看看
    #     p.apply_async(func1,args=(i,),callback=func2)
    p.apply_async(func1,args=(10,),callback=func2)

    p.close()
    p.join()

#結果
# 主進程: 11852  #發現回調函數是在主進程中完成的,其實若是是在子進程中完成的,那咱們直接將代碼寫在子進程的任務函數func1裏面就好了,對不對,這也是爲何稱爲回調函數的緣由。
# func1>> 17332
# func1
# func2>> 11852
# func2
# 100
相關文章
相關標籤/搜索