python--(十五步代碼學會進程)python
一.進程的建立json
import time import os #os.getpid() 獲取本身進程的id號 #os.getppid() 獲取本身進程的父進程id號 from multiprocessing import Process def func(): print("aaa") time.sleep(1) print("子進程>>>",os.getpid()) print("該子進程的父進程>>>",os.getppid()) print(12345) if __name__ == "__main__": p = Process(target=func,) p.start() print("*" * 10) print("主進程>>>",os.getpid()) print("父進程>>>",os.getppid()) 給要執行的函數傳參數 import time from multiprocessing import Process def func(x,y): print(x) time.sleep(1) print(y) if __name__ == "__main__": p = Process(target=func,args=("姑娘","來玩啊"))#這是func須要接受的參數的傳輸方式 p.start() print("父進程執行結束")
二.join方法安全
import time from multiprocessing import Process 驗證join方法 global_num = 100 def func1(): time.sleep(2) global global_num global_num = 0 print("子進程全局變量>>>",global_num) if __name__ == "__main__": p1 = Process(target=func1,) p1.start() print("子進程執行") time.sleep(3) p1.join()#阻塞住,等待你的p1子進程執行sing結束,主進程的程序才能從這裏繼續往下執行 print("主進程的全局變量>>>",global_num) 驗證了一下併發的執行時間 import time from multiprocessing import Process def func1(n): time.sleep(n) print("func1",n) def func2(n): time.sleep(n) print("func2",n) def func3(n): time.sleep(n) print("func3",n) if __name__ == "__main__": p1 = Process(target=func1,args=(1,)) p2 = Process(target=func2,args=(2,)) p3 = Process(target=func3,args=(3,)) p1.start() p2.start() p3.start() for循環在建立進程中的應用 import time from multiprocessing import Process def func1(n): time.sleep(1) print(n) if __name__ == "__main__": pro_list = [] for i in range(10): p1 = Process(target=func1,args=(i,)) p1.start() pro_list.append(p1) # p1.join() # for p in pro_list: # # p.join() p1.join() print("主進程結束")
殭屍進程和孤兒進程服務器
import time import os from multiprocessing import Process def func1(): time.sleep(30) print(os.getpid()) print('子進程') if __name__ == '__main__': p1 = Process(target=func1,) p1.start() # p1.join() # time.sleep(2) # print(p1.pid) print('主進程的ID',os.getpid()) print('主進程結束')
三.建立進程的兩種方式網絡
import time from multiprocessing import Process import os # import test01 # def func1(n): # # time.sleep(1) # print(n) # # def func2(n): # # time.sleep(1) # print(n) # # def func3(n): # # time.sleep(1) # print(n) # # def func4(n): # # time.sleep(1) # print(n) # # if __name__ == '__main__': # p1 = Process(target=func1,args=(1,)) # p2 = Process(target=func2,args=(2,)) # p3= Process(target=func3,args=(3,)) # p4 = Process(target=func4,args=(4,)) # p1.start() # run() # p2.start() # p3.start() # p4.start() # # time.sleep(0.5) # print('主進程結束') # 以前同步執行的 # func1(1) # func2(2) # func3(3) # func4(4) 建立進程的第一種方式: # p1 = Process(target=func1, args=(1,)) # p1.start() 建立進行的第二種方式: #本身定義一個類,繼承Process類,必須寫一個run方法,想傳參數,自行寫init方法,而後執行super父類的init方法 # class MyProcess(Process): # def __init__(self,n,name): # super().__init__() # self.n = n # self.name = name # # def run(self): # # print(1+1) # # print(123) # print('子進程的進程ID',os.getpid()) # print('你看看n>>',self.n) # # if __name__ == '__main__': # p1 = MyProcess(100,name='子進程1') # p1.start() #給操做系統發送建立進程的指令,子進程建立好以後,要被執行,執行的時候就會執行run方法 # print('p1.name',p1.name) # print('p1.pid',p1.pid) # print('主進程結束')
四.進程的其餘方法terminate is_alive.py併發
import time from multiprocessing import Process def func1(): time.sleep(2) print() print("子進程") if __name__ == "__main__": p1 = Process(target=func1,) p1.start() p1.terminate() #給操做系統發了一個關閉p1子進程的信號,關閉進程 time.sleep(1) print("進程是否還活着:",p1.is_alive())#是返回True,否返回False print(p1.pid) print("主進程結束")
五.守護進程app
#守護的子進程跟着主進程走 import time import os from multiprocessing import Process def func(): time.sleep(5) print('子進程', os.getpid()) if __name__ == '__main__': p1 = Process(target=func) p1.daemon = True # 設置守護進程, 當主進程結束時所有子進程當即結束 p1 .start() # time.sleep(5.5) print('主進程結束')
六.驗證進程之間是空間隔離的dom
import time from multiprocessing import Process #進程之間是空間隔離的,不共享資源 global_num = 100 def func1(): global global_num global_num = 0 print("子進程全局變量>>>",global_num) if __name__ == "__main__": p1 = Process(target=func1,) p1.start() time.sleep(1) print("主進程的全局變量>>>",global_num)
七.子進程中不能使用input異步
from multiprocessing import Process def func1(): s = input('>>>') if __name__ == '__main__': p1 = Process(target=func1,) p1.start() # a = input('>>>:') print('主進程結束') ##報錯
八.進程鎖socket
ticket_lock = Lock()#建立鎖 .acquire()#加鎖, .release()#解鎖
同步鎖的做用:#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 # 雖然能夠用文件共享數據實現進程間通訊,但問題是:
# 1.效率低(共享數據基於文件,而文件是硬盤上的數據) # 2.須要本身加鎖處理
import json import time import random from multiprocessing import Process,Lock def get_ticket(i,ticket_lock): print("咱們都到齊了,你們預備!!123") time.sleep(1) #全部代碼 異步執行,到這裏等待,同時再去搶下面的代碼執行 ticket_lock.acquire() #這裏有個門,只有一我的可以搶到這個鑰匙,加鎖 with open("ticket","r") as f: last_ticket_info = json.load(f) #將文件數據load爲字典類型的數據 last_ticket = last_ticket_info["count"] print(last_ticket) #查看一下餘票的信息 if last_ticket > 0: #若是看到餘票大於零,說明你能夠搶到票 time.sleep(random.random()) #模擬網絡延遲時間 last_ticket = last_ticket - 1 last_ticket_info["count"] = last_ticket with open("ticket","w") as f: #將修改後的參數寫回文件 json.dump(last_ticket_info,f) print("%s號搶到了,丫nb!" % i) else: print("%s號傻逼,沒票了,明年再來" % i) ticket_lock.release() if __name__ == "__main__": ticket_lock = Lock() #建立一個進程鎖 for i in range(10): p = Process(target=get_ticket, args=(i, ticket_lock)) p.start()
九.信號量
Semaphore()
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
import time import random from multiprocessing import Process,Semaphore def dbj(i,s): s.acquire() print('%s號男主人公來洗腳'%i) print('-------------') time.sleep(random.randrange(3,6)) # print(time.time()) s.release() if __name__ == '__main__': s = Semaphore(4) #建立一個計數器,每次acquire就減1,直到減到0,那麼上面的任務只有4個在同時異步的執行,後面的進程須要等待. for i in range(10): p1 = Process(target=dbj,args=(i,s,)) p1.start()
十.事件
e = Event()# e.set()#將e改成True e.clear() # 將e改成False
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
from multiprocessing import Process, Event e = Event() #False True print(e.is_set()) e.set() #將e事件的狀態改成True print("在這裏等待") e.clar() #將e事件的狀態改成False print("111") e.wait() print("是真的嗎")
import time from multiprocessing import Process,Event #模擬紅綠燈執行狀態的函數 def traffic_lights(e): while 1: print("紅燈啦") time.sleep(5) e.set() #將e改成True print("綠燈了") time.sleep(3) e.clear() #將e改成False def car(i,e): if not e.is_set(): #新來的車看到的是紅燈 print("咱們在等待....") e.wait() print("走你") else: print("能夠走了!!!") if __name__ == "__main__": e = Event() hld = Process(target=traffic_lights, args=(e,)) hld.start() while 1: time.sleep(0.5) #建立10個車 for i in range(3): p1 = Process(target=car,args=(i,e,)) p1.start()
十一.隊列
# 遵循先進先出的原則 q = Queue(3) 建立3個隊列 q.put()發送數據 q.get()接受數據
q = Queue([maxsize]) #建立共享的進程隊列 q.get( [ block [ ,timeout ] ] ) #返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。 q.get_nowait( ) #和q.get(False)方法,同樣 q.put(item [, block [,timeout ] ] ) #將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。 q.qsize() #返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。 q.empty() #若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。 .full() #若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。 q.close() #關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。 q.cancel_join_thread() #不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。 q.join_thread() #鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
from multiprocessing import Process,Queue #先進先出 q = Queue(3) q.put(1) q.put(2) # print(q.full()) #q.full()隊列滿了返回True,不滿返回False q.put(3) # print('>>>>',q.full()) q.get_nowait()= () #不會阻塞住,至關於空隊列 # try: # q.get(False) # queue.Empty # q.get_nowait() #queue.Empty # except: # print('隊列目前是空的') # while 1: # try: # q.get(False) #queue.Empty # except: # print('隊列目前是空的')
隊列實現進程的通訊
import time from multiprocessing import Process,Queue def girl(q): print('來自boy的信息',q.get()) print('來自校領導的凝視',q.get()) def boy(q): q.put('約嗎') if __name__ == '__main__': q = Queue(5) boy_p = Process(target=boy,args=(q,)) girl_p = Process(target=girl,args=(q,)) boy_p.start() girl_p.start() time.sleep(1) q.put('好好工做,別亂搞')
十二.生產者消費者模式
#生產者消費者模型總結 #程序中有兩類角色 一類負責生產數據(生產者) 一類負責處理數據(消費者) #引入生產者消費者模型爲了解決的問題是: 平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度 #如何實現: 生產者<-->隊列<——>消費者 #生產者消費者模型實現類程序的解耦和
import time from multiprocessing import Process,Queue def producer(q): for i in range(1,11): time.sleep(1) print('生產了包子%s號' % i) q.put(i) q.put(None) #針對第三個版本的消費者,往隊列裏面加了一個結束信號 #版本1 # def consumer(q): # while 1: # time.sleep(2) # s = q.get() # print('消費者吃了%s包子' % s) #版本2 # def consumer(q): # while 1: # time.sleep(0.5) # try: # s = q.get(False) # print('消費者吃了%s包子' % s) # except: # break def consumer(q): while 1: time.sleep(2) s = q.get() if s == None: break else: print('消費者吃了%s包子' % s)
生產者消費者模型 import time from multiprocessing import Process,Queue def producer(q): for i in range(1,11): time.sleep(1) print('生產了包子%s號' % i) q.put(i) def consumer(q): while 1: time.sleep(2) s = q.get() if s == None: break else: print('消費者吃了%s包子' % s) if __name__ == '__main__': #經過隊列來模擬緩衝區,大小設置爲20 q = Queue(20) #生產者進程 pro_p = Process(target=producer,args=(q,)) pro_p.start() #消費者進程 con_p = Process(target=consumer,args=(q,)) con_p.start() pro_p.join() q.put(None)
1 #生產者消費者模型 2 import time 3 from multiprocessing import Process,Queue,JoinableQueue 4 5 def producer(q): 6 for i in range(1,11): 7 time.sleep(0.5) 8 print('生產了包子%s號' % i) 9 q.put(i) 10 q.join() 11 print('在這裏等你') 12 def consumer(q): 13 while 1: 14 time.sleep(1) 15 s = q.get() 16 print('消費者吃了%s包子' % s) 17 q.task_done() #給q對象發送一個任務結束的信號 18 19 if __name__ == '__main__': 20 #經過隊列來模擬緩衝區,大小設置爲20 21 q = JoinableQueue(20) 22 #生產者進程 23 pro_p = Process(target=producer,args=(q,)) 24 pro_p.start() 25 #消費者進程 26 con_p = Process(target=consumer,args=(q,)) 27 con_p.daemon = True # 28 con_p.start() 29 pro_p.join() 30 print('主進程結束')
十三.管道
from multiprocessing import Process,Pipe
conn1,conn2 = Pipe()
進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可),會致使數據不安全的狀況出現
# 管道 from multiprocessing import Process,Pipe import time # conn1,conn2 = Pipe() # conn1.send("你好") # print(">>>>>") # msg = conn2.recv() # print(msg) # def func1(conn2): # try: # msg = conn2.recv() # print(">>>",msg) # #若是管道一端關閉了,那麼另一端在接收消息的時候回報錯 # except EOFError: # print("對方管道一端已經關閉") # conn2.close() # if __name__ == '__main__': # conn1,conn2 = Pipe() # p = Process(target=func1,args=(conn2,)) # p.start() # conn1.send("收到了嗎") def func1(conn1,conn2): msg = conn2.recv() #阻塞 print(">>>>",msg) if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func1, args=(conn1, conn2,)) p.start() conn1.send("收到了嗎") conn1.close() #conn1.recv() #OSError: handle is closed
十四.數據共享(不安全)
# 數據共享 # from multiprocessing import Process,Manager # # def func(m_dic): # m_dic["輝哥"] = "輝哥大帥比" # if __name__ == '__main__': # m = Manager() # m_dic = m.dict({"輝哥":"輝哥帥不帥"}) # print("主進程",m_dic) # p = Process(target=func, args=(m_dic,)) # p.start() # p.join() # print("主進程2",m_dic) # 數據共享manager不安全 # from multiprocessing import Process,Manager,Lock # def func(m_dic, ml): # """不加鎖的狀況會出現數據錯亂 # m_dic["count"] -= 1 # 下面是加鎖的另外一種形式 # 等同 : ml.acquire() # m_dic["count"] -= 1 # ml.release()""" # with ml: # m_dic["count"] -= 1 # if __name__ == '__main__': # m = Manager() # ml = Lock() # m_dic = m.dict({"count":100}) # p_list = [] # for i in range(20): # p1 = Process(target=func,args=(m_dic, ml,)) # p1.start() # p_list.append(p1) # [pp.join() for pp in p_list] # print("主進程",m_dic)
十五.進程池
multiprocess.Poll模塊
建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務(高級一些的進程池能夠根據你的併發量,搞成動態增長或減小進程池中的進程數量的操做),不會開啓其餘進程,提升操做系統效率,減小空間的佔用等。
進程池相關方法:
p.apply(func [, args [, kwargs]]):
在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):
在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。''' p.close():
關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.jion():
等待全部工做進程退出。此方法只能在close()或teminate()以後調用 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。 obj.ready():若是調用完成,返回True obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常 obj.wait([timeout]):等待結果變爲可用。 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
import time from multiprocessing import Process,Pool def func(n): print(n) if __name__ == '__main__': pool = Pool(4) # pool.map(func,range(100)) #參數是可迭代的 pool.map(func,['sb',(1,2)]) #參數是可迭代的
# 進程池 import time from multiprocessing import Process,Pool def func(n): for i in range(5): time.sleep(1) n= n + i print(n) if __name__ == '__main__': #用時間驗證一下傳參 pool_start_time = time.time() pool = Pool(4) #4個進程 pool.map(func,range(100)) #map(方法,可迭代對象) 映射 自帶join功能,異步執行任務 pool_end_time = time.time() pool_dif_time = pool_end_time - pool_start_time #多進程的執行時間 # p_s_time = time.time() # p_list = [] # for i in range(200): # p1 = Process(target=func, args=(i,)) # p1.start() # p_list.append(p1) # [p.join() for p in p_list] # p_e_time = time.time() # p_dif_time = p_e_time - p_s_time print('進程池的執行時間', pool_dif_time) print('多進程的執行時間', p_dif_time)
import time from multiprocessing import Process,Pool def fun(i): time.sleep(0.5) return i**2 if __name__ == '__main__': p = Pool(4) for i in range(10): res = p.apply(fun,args=(i,)) # apply 同步執行的進程方法,他會等待你的任務的返回結果 print(res)
import time from multiprocessing import Process,Pool def fun(i): time.sleep(1) print(i) return i**2 if __name__ == '__main__': p = Pool(4) res_list = [] for i in range(10): res = p.apply_async(fun,args=(i,)) # #同步執行的方法,他會等待你的任務的返回結果, res_list.append(res) p.close() # 不是關閉進程池,而是不容許再有其餘任務來使用進程池 p.join() # 這是感知進程池中任務的方法,進程池中全部的進程隨着主進程的結束而結束了,等待進程池的任務所有執行完 for e_res in res_list: print("結果", e_res.get())
回調函數:
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,可是咱們也能夠經過進程通訊來拿到返回值,進程池的這個回調也是進程通訊的機制完成的。
咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果
import os from multiprocessing import Pool def func1(n): print('func1>>',os.getpid()) # print('func1') return n*n def func2(nn): print('func2>>',os.getpid()) # print('func2') print(nn) # import time # time.sleep(0.5) if __name__ == '__main__': print('主進程:',os.getpid()) p = Pool(4) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join()
進程池版的socket併發聊天代碼示例:
#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count()) #開啓6個客戶端,會發現2個客戶端處於等待狀態 #在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問 複製代碼
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
進程池中爬蟲示例:
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7') # print(re.findall(pattern,res.text))