python之路-day32-管道、數據共享、進程池

 

 

1、管道(不推薦使用,瞭解便可)數據庫

  進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可),會致使數據不安全的狀況出現,後面還會提到爲何編程

會帶來數據不安全的問題。數組

 

 1 #建立管道的類:
 2 Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
 3 #參數介紹:
 4 dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
 5 #主要方法:
 6     conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
 7     conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 8  #其餘方法:
 9 conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
10 conn1.fileno():返回鏈接使用的整數文件描述符
11 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
12  
13 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
14 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
15  
16 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
管道介紹
 1 from multiprocessing import Process, Pipe
 2 
 3 def f(conn):
 4     conn.send("Hello 妹妹") #子進程發送了消息
 5     conn.close()
 6 
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe() #創建管道,拿到管道的兩端,雙工通訊方式,兩端均可以收發消息
 9     p = Process(target=f, args=(child_conn,)) #將管道的一段給子進程
10     p.start() #開啓子進程
11     print(parent_conn.recv()) #主進程接受了消息
12     p.join()
管道初使用
1 應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起(就是阻塞)。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道的相同一端就會能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點
管道使用注意須知
from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會引起EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()            
引起EOFError

主進程將管道的兩端都傳送給子進程,子進程和主進程共用管道的兩種報錯狀況,都是在recv接收的時候報錯的:安全

    1.主進程和子進程中的管道的相同一端都關閉了,出現EOFError;併發

    2.若是你管道的一端在主進程和子進程中都關閉了,可是你還用這個關閉的一端去接收消息,那麼就會出現OSError;app

 

2、數據共享(瞭解便可)異步

  基於消息傳遞的併發編程是大勢所趨async

  即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合分佈式

  經過消息隊列交換數據,這樣極大的減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中ide

注意:進程之間應當是儘可能避免通訊,即便須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題。應該儘可能避免使用

共享數據的方法,之後會嘗試使用數據庫來解決進程之間的數據共享問題

  進程之間數據共享模塊之一Manager模塊:

1 進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
2 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此
3 
4 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
5 
6 A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager模塊介紹

  多進程共同去處理共享數據的時候,就和咱們多進程同時去操做一個文件中的數據同樣,不加鎖就會出現錯誤的結果,進程

不安全的,因此也須要加鎖

 1 from multiprocessing import Manager,Process,Lock
 2 def work(d,lock):
 3     with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
 4         d['count']-=1
 5 
 6 if __name__ == '__main__':
 7     lock=Lock()
 8     with Manager() as m:
 9         dic=m.dict({'count':100})
10         p_l=[]
11         for i in range(100):
12             p=Process(target=work,args=(dic,lock))
13             p_l.append(p)
14             p.start()
15         for p in p_l:
16             p.join()
17         print(dic)
Manager模塊使用

  總結下,進程之間的通訊:隊列、管道、數據共享

 

3、進程池

  multiprocess.pool 模塊

  建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個

進程去執行使用全部的任務(高級一些的進程池能夠根據你的併發量,搞成動態增長或減小進程池中進程數量的操做),

不會開啓其餘進程,提升操做系統效率,減小空間的佔用等

 1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
 2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
 3 initargs:是要傳給initializer的參數組
 4 
 5 p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
 6 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
 7 
 8 p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
 9 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
10     
11 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
12 
13 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
14 
15 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
16 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
17 obj.ready():若是調用完成,返回True
18 obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
19 obj.wait([timeout]):等待結果變爲可用。
20 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
Manager的方法
 1 import time
 2 from multiprocessing import Pool,Process
 3 
 4 #針對range(100)這種參數的
 5 # def func(n):
 6 #     for i in range(3):
 7 #         print(n + 1)
 8 
 9 def func(n):
10     print(n)
11     # 結果:
12     #     (1, 2)
13     #     alex
14 def func2(n):
15     for i in range(3):
16         print(n - 1)
17 if __name__ == '__main__':
18     #1.進程池的模式
19     s1 = time.time()  #咱們計算一下開多進程和進程池的執行效率
20     poll = Pool(5) #建立含有5個進程的進程池
21     # poll.map(func,range(100)) #異步調用進程,開啓100個任務,map自帶join的功能
22     poll.map(func,[(1,2),'alex']) #異步調用進程,開啓100個任務,map自帶join的功能
23     # poll.map(func2,range(100))  #若是想讓進程池完成不一樣的任務,能夠直接這樣搞
24     #map只限於接收一個可迭代的數據類型參數,列表啊,元祖啊等等,若是想作其餘的參數之類的操做,須要用後面咱們要學的方法。
25     # t1 = time.time() - s1
26     #
27     # #2.多進程的模式
28     # s2 = time.time()
29     # p_list = []
30     # for i in range(100):
31     #     p = Process(target=func,args=(i,))
32     #     p_list.append(p)
33     #     p.start()
34     # [pp.join() for pp in p_list]
35     # t2 = time.time() - s2
36     #
37     # print('t1>>',t1) #結果:0.5146853923797607s 進程池的效率高
38     # print('t2>>',t2) #結果:12.092015027999878s
進程池的簡單應用及與進程池的效率對比
相關文章
相關標籤/搜索