管道(瞭解)html
進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可),會致使數據不安全的狀況出現,後面咱們會說到爲何會帶來數據 不安全的問題。python
#建立管道的類: Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道 #參數介紹: dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象 #其餘方法: conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回鏈接使用的整數文件描述符 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。 管道介紹
from multiprocessing import Process, Pipe def f(conn): conn.send("Hello 妹妹") #子進程發送了消息 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() #創建管道,拿到管道的兩端,雙工通訊方式,兩端均可以收發消息 p = Process(target=f, args=(child_conn,)) #將管道的一段給子進程 p.start() #開啓子進程 print(parent_conn.recv()) #主進程接受了消息 p.join() 管道初使用
應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起(就是阻塞)。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道的相同一端就會能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。 git
from multiprocessing import Process, Pipe def f(parent_conn,child_conn): #parent_conn.close() #不寫close將不會引起EOFError while True: try: print(child_conn.recv()) except EOFError: child_conn.close() break if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(parent_conn,child_conn,)) p.start() child_conn.close() parent_conn.send('hello') parent_conn.close() p.join() 引起EOFError
主進程將管道的兩端都傳送給子進程,子進程和主進程共用管道的兩種報錯狀況,都是在recv接收的時候報錯的:github
1.主進程和子進程中的管道的相同一端都關閉了,出現EOFError;面試
2.若是你管道的一端在主進程和子進程中都關閉了,可是你還用這個關閉的一端去接收消息,那麼就會出現OSError;數據庫
因此你關閉管道的時候,就容易出現問題,須要將全部只用這個管道的進程中的兩端所有關閉才行。固然也能夠經過異常捕獲(try:except EOFerror)來處理。編程
雖然咱們在主進程和子進程中都打印了一下conn1一端的對象,發現兩個再也不同一個地址,可是子進程中的管道和主進程中的管道仍是能夠通訊的,由於管道是同一套,系統可以記錄。 json
咱們的目的就是關閉全部的管道,那麼主進程和子進程進行通訊的時候,能夠給子進程傳管道的一端就夠了,而且用咱們以前學到的,信息發送完以後,再發送一個結束信號None,那麼你收到的消息爲None的時候直接結束接收或者說結束循環,就不用每次都關閉各個進程中的管道了。數組
from multiprocessing import Pipe,Process def func(conn): while True: msg = conn.recv() if msg is None:break print(msg) if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() for i in range(10): conn2.send('約吧') conn2.send(None) 經過結束信號None來結束程序
from multiprocessing import Process,Pipe def consumer(p,name): produce, consume=p produce.close() while True: try: baozi=consume.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: break def producer(seq,p): produce, consume=p consume.close() for i in seq: produce.send(i) if __name__ == '__main__': produce,consume=Pipe() c1=Process(target=consumer,args=((produce,consume),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(produce,consume)) produce.close() consume.close() c1.join() print('主進程') 經過管道來實現生產者消費者模型
關於管道會形成數據不安全問題的官方解釋: The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time. 由Pipe方法返回的兩個鏈接對象表示管道的兩端。每一個鏈接對象都有send和recv方法(除其餘以外)。注意,若是兩個進程(或線程)試圖同時從管道的同一端讀取或寫入數據,那麼管道中的數據可能會損壞。固然,在使用管道的不一樣端部的過程當中不存在損壞風險。
from multiprocessing import Process,Pipe,Lock def consumer(p,name,lock): produce, consume=p produce.close() while True: lock.acquire() baozi=consume.recv() lock.release() if baozi: print('%s 收到包子:%s' %(name,baozi)) else: consume.close() break def producer(p,n): produce, consume=p consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1=Process(target=consumer,args=((produce,consume),'c1',lock)) c2=Process(target=consumer,args=((produce,consume),'c2',lock)) p1=Process(target=producer,args=((produce,consume),10)) c1.start() c2.start() p1.start() produce.close() consume.close() c1.join() c2.join() p1.join() print('主進程') 多個消費者競爭會出現數據不安全的問題的解決方案:加鎖
管道能夠用於雙工通訊,一般利用在客戶端/服務端中使用的請求/響應模型,或者遠程過程調用,就可使用管道編寫與進程交互的程序,像前面將網絡通訊的時候,咱們使用了一個叫subprocess的模塊,裏面有個參數是pipe管道,執行系統指令,並經過管道獲取結果。瀏覽器
數據共享(瞭解)
展望將來,基於消息傳遞的併發編程是大勢所趨
即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合
經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中
進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題,應該儘可能避免使用本節所講的共享數據的方式,之後咱們會嘗試使用數據庫來解決進程之間的數據共享問題。
進程之間數據共享的模塊之一Manager模塊:
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. Manager模塊介紹
多進程共同去處理共享數據的時候,就和咱們多進程同時去操做一個文件中的數據是同樣的,不加鎖就會出現錯誤的結果,進程不安全的,因此也須要加鎖
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) Manager模塊使用
總結一下,進程之間的通訊:隊列、管道、數據共享也算
下面要講的信號量和事件也至關於鎖,也是全局的,全部進程都能拿到這些鎖的狀態,進程之間這些鎖啊信號量啊事件啊等等的通訊,其實底層仍是socekt,只不過是基於文件的socket通訊,而不是跟上面的數據共享啊空間共享啊之類的機制,咱們以前學的是基於網絡的socket通訊,還記得socket的兩個家族嗎,一個文件的一個網絡的,因此未來若是說這些鎖之類的報錯,可能你看到的就是相似於socket的錯誤,簡單知道一下就能夠啦~~~
工做中經常使用的是鎖,信號量和事件不經常使用,可是信號量和事件面試的時候會問到,你能知道就行啦~~~
信號量(瞭解)
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
好比大保健:提早設定好,一個房間只有4個牀(計數器如今爲4),那麼同時只能四我的進來,誰先來的誰先佔一個牀(acquire,計數器減1),4個牀滿了以後(計數器爲0了),第五我的就要等着,等其中一我的出來(release,計數器加1),他就去佔用那個牀了。
from multiprocessing import Process,Semaphore import time,random def go_ktv(sem,user): sem.acquire() print('%s 佔到一間ktv小屋' %user) time.sleep(random.randint(0,3)) #模擬每一個人在ktv中待的時間不一樣 sem.release() if __name__ == '__main__': sem=Semaphore(4) p_l=[] for i in range(13): p=Process(target=go_ktv,args=(sem,'user%s' %i,)) p.start() p_l.append(p) for i in p_l: i.join() print('============》') 信號量使用
事件(瞭解)
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
clear:將「Flag」設置爲False
set:將「Flag」設置爲True
from multiprocessing import Process,Semaphore,Event import time,random e = Event() #建立一個事件對象 print(e.is_set()) #is_set()查看一個事件的狀態,默認爲False,可經過set方法改成True print('look here!') # e.set() #將is_set()的狀態改成True。 # print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr # e.clear() #將is_set()的狀態改成False # print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr e.wait() #根據is_set()的狀態結果來決定是否在這阻塞住,is_set()=False那麼就阻塞,is_set()=True就不阻塞 print('give me!!') #set和clear 修改事件的狀態 set-->True clear-->False #is_set 用來查看一個事件的狀態 #wait 依據事件的狀態來決定是否阻塞 False-->阻塞 True-->不阻塞 事件方法的使用
from multiprocessing import Process, Event import time, random def car(e, n): while True: if not e.is_set(): # 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色 print('\033[31m紅燈亮\033[0m,car%s等着' % n) e.wait() # 阻塞,等待is_set()的值變成True,模擬信號燈爲綠色 print('\033[32m車%s 看見綠燈亮了\033[0m' % n) time.sleep(random.randint(2,4)) if not e.is_set(): #若是is_set()的值是Flase,也就是紅燈,仍然回到while語句開始 continue print('車開遠了,car', n) break # def police_car(e, n): # while True: # if not e.is_set():# 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色 # print('\033[31m紅燈亮\033[0m,car%s等着' % n) # e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s以後沒有等到綠燈就闖紅燈走了 # if not e.is_set(): # print('\033[33m紅燈,警車先走\033[0m,car %s' % n) # else: # print('\033[33;46m綠燈,警車走\033[0m,car %s' % n) # break def traffic_lights(e, inverval): while True: time.sleep(inverval) if e.is_set(): print('######', e.is_set()) e.clear() # ---->將is_set()的值設置爲False else: e.set() # ---->將is_set()的值設置爲True print('***********',e.is_set()) if __name__ == '__main__': e = Event() for i in range(10): p=Process(target=car,args=(e,i,)) # 建立10個進程控制10輛車 time.sleep(random.random(1, 3)) #車不是一會兒全過來 p.start() # for i in range(5): # p = Process(target=police_car, args=(e, i,)) # 建立5個進程控制5輛警車 # p.start() #信號燈必須是單獨的進程,由於它無論你車開到哪了,我就按照我紅綠燈的規律來閃爍變換,對吧 t = Process(target=traffic_lights, args=(e, 5)) # 建立一個進程控制紅綠燈 t.start() print('預備~~~~開始!!!') 經過事件來模擬紅綠燈示例
進程池和mutiprocess.Poll
爲何要有進程池?進程池的概念。
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程(空間,變量,文件信息等等的內容)也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還須要進行切換而且記錄每一個進程的執行節點,也就是記錄上下文(各類變量等等亂七八糟的東西,雖然你看不到,可是操做系統都要作),這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。就看咱們上面的一些代碼例子,你會發現有些程序是否是執行的時候比較慢纔出結果,就是這個緣由,那麼咱們要怎麼作呢?
在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果
multiprocess.Poll模塊
建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務(高級一些的進程池能夠根據你的併發量,搞成動態增長或減小進程池中的進程數量的操做),不會開啓其餘進程,提升操做系統效率,減小空間的佔用等。
概念介紹:
Pool([numprocess [,initializer [, initargs]]]):建立進程池
numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。''' p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用 主要方法介紹
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
import time from multiprocessing import Pool,Process #針對range(100)這種參數的 # def func(n): # for i in range(3): # print(n + 1) def func(n): print(n) # 結果: # (1, 2) # alex def func2(n): for i in range(3): print(n - 1) if __name__ == '__main__': #1.進程池的模式 s1 = time.time() #咱們計算一下開多進程和進程池的執行效率 poll = Pool(5) #建立含有5個進程的進程池 # poll.map(func,range(100)) #異步調用進程,開啓100個任務,map自帶join的功能 poll.map(func,[(1,2),'alex']) #異步調用進程,開啓100個任務,map自帶join的功能 # poll.map(func2,range(100)) #若是想讓進程池完成不一樣的任務,能夠直接這樣搞 #map只限於接收一個可迭代的數據類型參數,列表啊,元祖啊等等,若是想作其餘的參數之類的操做,須要用後面咱們要學的方法。 # t1 = time.time() - s1 # # #2.多進程的模式 # s2 = time.time() # p_list = [] # for i in range(100): # p = Process(target=func,args=(i,)) # p_list.append(p) # p.start() # [pp.join() for pp in p_list] # t2 = time.time() - s2 # # print('t1>>',t1) #結果:0.5146853923797607s 進程池的效率高 # print('t2>>',t2) #結果:12.092015027999878s 進程池的簡單應用及與進程池的效率對比
有一點,map是異步執行的,而且自帶close和join
通常約定俗成的是進程池中的進程數量爲CPU的數量,工做中要看具體狀況來考量。
實際應用代碼示例:
同步與異步兩種執行方式:
import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(1) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞 # 但無論該任務是否存在阻塞,同步調用都會在原地等着 res_l.append(res) print(res_l) 進程池的同步調用
import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行,而且能夠執行不一樣的任務,傳送任意的參數了。 # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務 # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束 # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 res_l.append(res) # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用join,等待進程池內任務都處理完,而後能夠用get收集結果 # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了 p.close() #不是關閉進程池,而是結束進程池接收任務,確保沒有新任務再提交過來。 p.join() #感知進程池中的任務已經執行結束,只有當沒有新的任務添加進來的時候,才能感知到任務結束了,因此在join以前必須加上close方法 for res in res_l: print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get 進程池的異步調用
#一:使用進程池(異步調用,apply_async) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply_async(func, (msg, )) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 res_l.append(res) # s = res.get() #若是直接用res這個結果對象調用get方法獲取結果的話,這個程序就變成了同步,由於get方法直接就在這裏等着你建立的進程的結果,第一個進程建立了,而且去執行了,那麼get就會等着第一個進程的結果,沒有結果就一直等着,那麼主進程的for循環是沒法繼續的,因此你會發現變成了同步的效果 print("==============================>") #沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了 pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果 for i in res_l: print(i.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get #二:使用進程池(同步調用,apply) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, )) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個 print("==============================>") pool.close() pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 print(res_l) #看到的就是最終的結果組成的列表 for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法 print(i) 詳解:apply_async和apply
進程池版的socket併發聊天代碼示例:
#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count()) #開啓6個客戶端,會發現2個客戶端處於等待狀態 #在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問 server端:tcp_server.py
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8')) client端:tcp_client.py
發現:併發開啓多個客戶端,服務端同一時間只有4個不一樣的pid,只能結束一個客戶端,另一個客戶端纔會進來.
同時最多和4我的進行聊天,由於進程池中只有4個進程可供調用,那有同窗會問,咱們這麼多人想同時聊天怎麼辦,又不讓用多進程,進程池也不能開太多的進程,那咋整啊,後面咱們會學到多線程,到時候你們就知道了,如今大家先這樣記住就好啦
而後咱們再提一個回調函數
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,可是咱們也能夠經過進程通訊來拿到返回值,進程池的這個回調也是進程通訊的機制完成的。
咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果
import os from multiprocessing import Pool def func1(n): print('func1>>',os.getpid()) print('func1') return n*n def func2(nn): print('func2>>',os.getpid()) print('func2') print(nn) # import time # time.sleep(0.5) if __name__ == '__main__': print('主進程:',os.getpid()) p = Pool(5) #args裏面的10給了func1,func1的返回值做爲回調函數的參數給了callback對應的函數,不能直接給回調函數直接傳參數,他只能是你任務函數func1的函數的返回值 # for i in range(10,20): #若是是多個進程來執行任務,那麼當全部子進程將結果給了回調函數以後,回調函數又是在主進程上執行的,那麼就會出現打印結果是同步的效果。咱們上面func2裏面註銷的時間模塊打開看看 # p.apply_async(func1,args=(i,),callback=func2) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() #結果 # 主進程: 11852 #發現回調函數是在主進程中完成的,其實若是是在子進程中完成的,那咱們直接將代碼寫在子進程的任務函數func1裏面就好了,對不對,這也是爲何稱爲回調函數的緣由。 # func1>> 17332 # func1 # func2>> 11852 # func2 # 100 回調函數的簡單使用
回調函數在寫的時候注意一點,回調函數的形參執行有一個,若是你的執行函數有多個返回值,那麼也能夠被回調函數的這一個形參接收,接收的是一個元祖,包含着你執行函數的全部返回值。
使用進程池來搞爬蟲的時候,最耗時間的是請求地址的網絡請求延遲,那麼若是咱們在將處理數據的操做加到每一個子進程中,那麼全部在進程池後面排隊的進程就須要等更長的時間才能獲取進程池裏面的執行進程來執行本身,因此通常咱們就將請求做成一個執行函數,經過進程池去異步執行,剩下的數據處理的內容放到另一個進程或者主進程中去執行,將網絡延遲的時間也利用起來,效率更高。
requests這個模塊的get方法請求頁面,就和咱們在瀏覽器上輸入一個網址而後回車去請求別人的網站的效果是同樣的。安裝requests模塊的指令:在cmd窗口執行pip install requests。
import requests response = requests.get('http://www.baidu.com') print(response) print(response.status_code) #200正常,404找不到網頁,503等5開頭的是人家網站內部錯誤 print(response.content.decode('utf-8'))
from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了 ''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] ''' 使用多進程請求多個url來減小網絡等待浪費的時間
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7') # print(re.findall(pattern,res.text)) 爬蟲示例
若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中全部進程執行完畢 nums=[] for res in res_l: nums.append(res.get()) #拿到全部結果 print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理 無需回調函數的示例
進程池是多個須要被執行的任務在進程池外面排隊等待獲取進程對象去執行本身,而信號量是一堆進程等待着去執行一段邏輯代碼。
信號量不能控制建立多少個進程,可是能夠控制同時多少個進程可以執行,可是進程池能控制你能夠建立多少個進程。
舉例:就像那些開大車拉煤的,信號量是什麼呢,就比如我只有五個車道,你每次只能過5輛車,可是不影響你建立100輛車,可是進程池至關於什麼呢?至關於你只有5輛車,每次5個車拉東西,拉完你再把車放回來,給別的人拉煤用。
其餘語言裏面有更高級的進程池,在設置的時候,能夠將進程池中的進程動態的建立出來,當需求增大的時候,就會自動在進程池中添加進程,需求小的時候,自動減小進程,而且能夠設置進程數量的上線,最多爲多,python裏面沒有。
進程池的其餘實現方式:https://docs.python.org/dev/library/concurrent.futures.html