1、multiprocess.process模塊linux
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
from multiprocessing import Process def func(name): print('子進程:你好,',name) if __name__ == '__main__': p = Process(target=func,args=('hsr',)) p.start()
from multiprocessing import Process import os def func(): print('子進程PID:',os.getpid()) if __name__ == '__main__': p = Process(target=func) p.start() print('父進程PID:',os.getpid())
from multiprocessing import Process import time def func(*args): print('*'*args[0]) time.sleep(5) print('*' * args[1]) if __name__ == '__main__': p = Process(target=func,args=(10,20)) p.start() p.join() #主線程等待p終止 print("-------運行完了-------")
from multiprocessing import Process import time def func(no,*args): print(str(no)+" :"+'*'*args[0]) time.sleep(5) print(str(no)+" :"+'*'*args[1]) if __name__ == '__main__': p_li = [] for i in range(10): p_li.append(Process(target=func,args=(i,10,20))) for i in p_li: i.start() [i.join() for i in p_li] #讓最後的print等子進程都結束了再執行 print('運行完了')
#自定義類 繼承Process類 #必須實現run方法,run方法就是子進程執行的方法 #若是要參數,則實現本身的init方法,並在其中調用父類的init方法 from multiprocessing import Process import os class MyProcess(Process): def __init__(self,arg1): super().__init__() self.arg1 = arg1 def run(self): print("My Process:",self.pid) print(self.arg1) if __name__ == '__main__': print(os.getpid()) p1 = MyProcess(4) p1.start()
#進程間不會共享數據 from multiprocessing import Process import os def func(): global n n = 0 print('pid:'+str(os.getpid())+" "+str(n)) if __name__ == '__main__': n = 100 p = Process(target=func) p.start() p.join() print('pid:'+str(os.getpid())+" "+str(n))
#守護進程 from multiprocessing import Process import time def func(): while 1: time.sleep(2) print('Good') if __name__ == '__main__': p = Process(target=func) p.daemon = True #設置子進程爲守護進程 p.start() i = 10 while i>0: print('Do something') time.sleep(5) i -= 1
2、進程同步編程
#模擬吃50我的吃5個蘋果 #使用Lock對象的acquire請求鎖,release釋放鎖 from multiprocessing import Process from multiprocessing import Lock import json def eat(no,lock): lock.acquire() with open('info.json') as f: dic = json.load(f) AppleNum = dic["Apple"] print("蘋果個數:" + str(AppleNum)) if AppleNum >0: print("%d 吃了一個蘋果" %no) AppleNum -= 1 dic["Apple"] = AppleNum with open('info.json','w') as f: json.dump(dic,f) else: print("%d 沒有蘋果吃了" %no) lock.release() if __name__ == '__main__': lock = Lock() for i in range(50): Process(target=eat, args=(i,lock)).start()
from multiprocessing import Process,Semaphore import time import random def grid(i,sem): sem.acquire() print(str(i)+'放入了格子') time.sleep(random.randint(2,6)) print(str(i)+'拿出了格子') sem.release() if __name__ == '__main__': sem = Semaphore(4) for i in range(20): Process(target=grid,args=(i,sem)).start()
from multiprocessing import Event if __name__ == "__main__": e = Event() # c建立一個事件 print(e.is_set()) # 查看一個事件的狀態 e.set() #將事件的狀態改成True e.wait() #根據e.is_set()的值決定是否阻塞 print(1235455) e.clear() #將事件的狀態改成False e.wait() print(12323545555)
from multiprocessing import Event,Process import time import random def traffic_light(e): while 1: if e.is_set(): e.clear() print('\033[31m[-----------紅燈-----------]\033[0m') else: e.set() print('\033[32m[-----------綠燈-----------]\033[0m') time.sleep(2) def car(e,i): if not e.is_set(): print('%s號車在等紅燈' %i) e.wait() #阻塞直到狀態改變 print('\033[0;32;40m%s號車經過\033[0m' %i) if __name__ == '__main__': e = Event() light = Process(target=traffic_light,args=(e,)) light.start() for i in range(20): time.sleep(random.random()) cars = Process(target=car,args=(e,i)) cars.start()
3、進程間通訊json
1.隊列Queuewindows
Queue([maxsize])
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具備如下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
q.empty()
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
from multiprocessing import Process,Queue if __name__ == '__main__': q = Queue(5) #建立隊列 for i in range(5): q.put(i) #放進數據 print(q.full()) #q.put(6) 此處阻塞 for i in range(5): print(q.get()) #獲取數據 print(q.empty()) #q.get() 此處阻塞
from multiprocessing import Event,Process,Queue def produce(q): q.put('from produce') def comsume(q): print(q.get()) if __name__ == '__main__': q = Queue(5) #建立隊列 pro = Process(target=produce,args=(q,)) pro.start() com = Process(target=comsume, args=(q,)) com.start()
from multiprocessing import Process,Queue import time import random def producer(name,goods,q): for i in range(10): time.sleep(random.randint(1,4)) print('%s生產了第%s個%s'%(name,i,goods)) q.put('第%s個%s'%(i,goods)) def comsumer(q,name): while 1: goods = q.get() if goods == None:break print('\033[31m%s買了了%s\033[0m' % (name,goods)) time.sleep(random.randint(2,6)) if __name__ == '__main__': q = Queue(10) p = Process(target=producer,args=('HSR','牛奶',q)) p2 = Process(target=producer, args=('TTT', '麪包', q)) c = Process(target=comsumer, args=(q,'Lisi')) c2 = Process(target=comsumer, args=(q, 'ZhangSan')) p.start() p2.start() c.start() c2.start() p.join() p2.join() q.put(None) q.put(None)
from multiprocessing import Process,JoinableQueue import time import random def producer(name,goods,q): for i in range(10): time.sleep(random.randint(1,4)) print('%s生產了第%s個%s'%(name,i,goods)) q.put('第%s個%s'%(i,goods)) q.join() #阻塞,直到隊列中的數據被所有執行完畢 def comsumer(q,name): while 1: goods = q.get() if goods == None:break print('\033[31m%s買了了%s\033[0m' % (name,goods)) time.sleep(random.randint(2,6)) q.task_done() #count - 1 if __name__ == '__main__': q = JoinableQueue(10) p = Process(target=producer,args=('HSR','牛奶',q)) p2 = Process(target=producer, args=('TTT', '麪包', q)) c = Process(target=comsumer, args=(q,'Lisi')) c2 = Process(target=comsumer, args=(q, 'ZhangSan')) p.start() p2.start() c.daemon = True #設置爲守護進程,主進程結束則子進程結束,而這裏的主進程等待生產進程的結束 c2.daemon = True #生產進程又等待消費進程消費完。因此消費者消費完了就會結束進程 c.start() c2.start() p.join() p2.join()
2.管道數組
from multiprocessing import Pipe,Process def func(conn1,conn2): conn2.close() while 1: try: print(conn1.recv()) except EOFError: conn1.close() break if __name__ == '__main__': conn1, conn2 = Pipe() p1 = Process(target=func,args=(conn1, conn2)) #傳給不一樣進程的conn是不會相互影響的 p1.start() conn1.close() for i in range(20): conn2.send("hi") conn2.close()
#Pipe有數據不安全性 #管道可能出現一端的多個消費者同時取一個數據 #因此能夠加上一個進程鎖來保證安全性 from multiprocessing import Pipe,Process,Lock import time import random def producer(con,pro,name,goods): con.close() for i in range(8): time.sleep(random.randint(1,3)) print('%s生成了第%s個%s'%(name,i,goods)) pro.send('第%s個%s'%(i,goods)) pro.close() def consumer(con,pro,name,lock): pro.close() while 1: try: lock.acquire() goods = con.recv() lock.release() print('%s喝了%s'%(name,goods)) time.sleep(random.random()) except EOFError: lock.release() #由於最後消費者經過異常來結束進程,因此最後一次的recv後面的lock.release不會執行,因此要在 #這個地方再寫一個release() con.close() break if __name__ == '__main__': con, pro = Pipe() lock = Lock() p = Process(target=producer, args=(con,pro,'HSR','牛奶')) c = Process(target=consumer, args=(con, pro, 'TTT',lock)) c2 = Process(target=consumer, args=(con, pro, 'TTT2',lock)) p.start() c.start() c2.start() con.close() pro.close()
3.Manager安全
from multiprocessing import Manager,Process def func(dic): dic['count'] -= 1 print(dic) if __name__ == '__main__': m = Manager() 建立一個Manger() dic = m.dict({'count':100}) #變成進程共享的字典 p = Process(target=func, args=(dic,)) p.start() p.join() #等待子進程結束
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
4、進程池網絡
numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。''' p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。 obj.ready():若是調用完成,返回True obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常 obj.wait([timeout]):等待結果變爲可用。 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
#map若是要給函數傳參數,只能傳可迭代對象 from multiprocessing import Pool def func(dic): print(dic) def func2(dic): print(dic+2) if __name__ == '__main__': pool = Pool(5) #進程數,CPU核心數+1 #若是Pool()不傳參數,默認是cpu核心數 pool.map(func2,range(100)) #100個任務 #這裏自帶join效果 pool.map(func, ['hsr','ttt']) # 2個任務
from multiprocessing import Pool import os import time def func(n): print('[pid:%s]start id:%s'%(os.getpid(),n)) time.sleep(1.5) print('\033[31m[pid:%s]end id:%s\033[0m'%(os.getpid(),n)) if __name__ == '__main__': pool = Pool(5) for i in range(10): #pool.apply(func,args=(i,)) #同步 pool.apply_async(func,args=(i,)) #異步。與主進程徹底異步,須要手動close和join pool.close() # 結束進程池接收任務 pool.join() # 感知進程中的任務都執行結束
import socket from multiprocessing import Pool def func(conn): while 1: conn.send(b'hello') ret = conn.recv(1024).decode('utf-8') if ret == 'q': break print(ret) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1', 8081)) sk.listen() pool = Pool(5) while 1: conn, addr = sk.accept() pool.apply_async(func,args=(conn,))
import socket sk = socket.socket() sk.connect(('127.0.0.1',8081)) ret = sk.recv(1024).decode('utf-8') print(ret) c = input().encode('utf-8') sk.send(c) sk.close()
from multiprocessing import Pool def func(i): return i**2 if __name__ == '__main__': pool = Pool(5) #使用map的返回值 ret = pool.map(func,range(10)) print(ret) res_l = [] for i in range(10): #同步 # res = pool.apply(func,args=(i,)) #apply的結果就是func的返回值 # print(res) #異步 res = pool.apply_async(func,args=(i,)) #apply_async的結果 #這裏若是直接使用res.get()來獲取返回值,會阻塞,因此先將其放入列表中,後面再get # print(res.get()) #阻塞等待func的結果 res_l.append(res) for i in res_l: print(i.get())
from multiprocessing import Pool def func(i): print('in func1') return i**2 def func2(n): print('in func2') print(n) if __name__ == '__main__': pool = Pool(5) pool.apply_async(func, args=(10,), callback=func2) #執行func1,把返回值做爲fun2的參數執行func2 #回調函數func2在主進程中zhi'x pool.close() pool.join()
import requests from multiprocessing import Pool def get(url): ret = requests.get(url) if ret.status_code == 200: return ret.content.decode('utf-8'),url def call_back(args): print(args[1] +" "+ str(len(args[0]))) url_lst = [ 'http://www.cnblog.com', 'https://www.baidu.com', 'http://www.sohu.com' ] if __name__ == '__main__': pool = Pool(5) for i in url_lst: pool.apply_async(get,args=(i,),callback=call_back) pool.close() pool.join()