多道 、分時、實時html
同步異步python
阻塞和非阻塞編程
進程三狀態:就緒 運行 阻塞json
併發並行安全
子進程和主進程併發
多併發的tcp服務端app
import socket from multiprocessing import Process def communicate(conn): while True: conn.send("hello".encode("utf-8")) print(conn.recv(1024)) if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',9001)) sk.listen() while True: conn,addr = sk.accept() Process(target=communicate,args=(conn,)).start()
import socket sk = socket.socket() sk.connect(('127.0.0.1',9001)) while True: print(sk.recv(1024)) mv = input(">>>>>>>>>>:").strip() sk.send(mv.encode("utf-8"))
進程是操做系統中最小的資源分配單位dom
進程異步
第二種開啓子進程的方式socket
def func(index): time.sleep(random.random()) print('第%s個郵件已經發送完畢'%index) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func,args=(i,)) p.start() p_lst.append(p) for p in p_lst: p.join() print('所有發送完畢')
join控制子進程
#子進程同步,執行完畢後才執行主程序後面的程序 # import time # from multiprocessing import Process # def f(name): # print("hello",name) # time.sleep(1) # if __name__ == '__main__': # for i in range(5): # p = Process(target=f,args=(i,)) # p.start() # p.join() #阻塞, # print("主進程執行") #子程序異步執行,執行完了阻塞結束 import time from multiprocessing import Process def f(name): print("hello",name) time.sleep(1) if __name__ == '__main__': p_list = [] for i in range(10): p = Process(target=f,args=(i,)) p.start() p_list.append(p) for i in p_list: i.join() print("主進程執行完畢")
守護進程 daemon
守護進程會隨着主進程代碼執行完畢而結束
守護進程內沒法再開啓子進程,不然會拋出異常
注意:進程之間是相互獨立的,主進程代碼運行結束,守護進程也會隨即終止
import time from multiprocessing import Process def func1(): count = 1 while True: time.sleep(0.5) print(count*"*") count += 1 def func2(): print("func strat") time.sleep(5) print("func2 end") if __name__ == '__main__': p1 = Process(target=func1) p1.daemon = True #定義爲守護進程 p1.start() #執行 Process(target=func2).start() time.sleep(3) print("主進程") #輸出 # func strat # * # ** # *** # **** # ***** # 主進程 # func2 end
若是主進程執行完畢那麼守護進程也會結束,可是其餘子進程若是沒執行完還會繼續執行
鎖
做業:在進程之間保證數據安全性
from multiprocessing import Process,Lock
lock= Lock()實例對象
lock.acquire() 取鑰匙開門
lock.release() 關門放鑰匙
例題 模擬搶票
import time import json from multiprocessing import Process,Lock def search(person): #查票 with open("ticket") as f: #文件中保存着一個字典{"count":4} dic = json.load(f) #讀出文件中的字典 time.sleep(0.2) print("%s查詢餘票"%person,dic["count"]) def get_ticket(person): #搶票 with open("ticket") as f: dic = json.load(f) time.sleep(0.2) #模擬延遲 if dic["count"] >0: print("%s買到票了"%person) dic["count"] -= 1 time.sleep(0.2) with open("ticket","w") as f: json.dump(dic,f) #寫回文件 else: print("%s沒買到票"%person) def ticket(person,lock): search(person) lock.acquire() #開門,一次只能進一個 get_ticket(person) lock.release() #關門 if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=ticket,args=("person%s"%i,lock)) p.start()
爲了保證數據的安全,在異步的狀況下,多個進程又可能同時修改同一份數據的時候,須要給這個數據上鎖
加鎖的做用
同步控制
import time from multiprocessing import Process,Lock def func(num,lock): time.sleep(1) print("異步執行",num) lock.acquire() time.sleep(0.5) print("同步執行",num) lock.release() #同步執行是依次執行,間隔0.5秒 if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=func,args=(i,lock)) p.start()
信號量 機制:計數器+鎖實現的 Semaphore
主程序控制必定數量的子程序同時執行,這些數量的子程序執行完一個就會有下一個子程序補充進來
import time import random from multiprocessing import Process,Semaphore def ktv(person,sem): sem.acquire() #進 print("%s走進KTV"%person) time.sleep(random.randint(1,3)) #隨機延遲一到三秒 print("%s走出ktv"%person) sem.release() #出 if __name__ == '__main__': sem = Semaphore(4) #信號量爲4,默認爲1 for i in range(10): Process(target=ktv,args=(i,sem)).start()
事件 Event
阻塞事件 wait() 方法
控制這個屬性的值
set()將這個屬性的值改爲True
clear() 將這個屬性的值改爲False
is_set() 判斷當前屬性是否爲True
#模擬紅綠燈,只有所有車經過後才中止 import time import random from multiprocessing import Process,Event def traffic_light(e): print("紅燈亮") while True: if e.is_set(): time.sleep(2) print("紅燈亮") e.clear() else: time.sleep(2) print("綠燈亮") e.set() def car(e,i): if not e.is_set(): print("car%s在等待"%i) e.wait() print("car%s經過了"%i) if __name__ == '__main__': e = Event() p = Process(target=traffic_light,args=(e,)) p.daemon =True #變成守護進程 p.start() p_list = [] for i in range(10): time.sleep(random.randrange(0,3,2)) p = Process(target=car,args=(e,i)) p.start() p_list.append(p) for p in p_list:p.join()
多個進程之間有一些固定的通訊內容
socket給予文件家族通訊
進程之間雖然內存不共享,可是能夠通訊,
隊列是基於管道 + 鎖 實現的
管道(Pipe)是基於socket,pickle實現的
def consume(q): print('son-->',q.get()) q.put('abc') if __name__ == '__main__': q = Queue() p = Process(target=consume,args=(q,)) p.start() q.put({'123':123}) p.join() print('Foo-->',q.get())
簡單的生產消費模型
def consume(q): print('son-->',q.get()) q.put('abc') if __name__ == '__main__': q = Queue() p = Process(target=consume,args=(q,)) p.start() q.put({'123':123}) p.join() print('Foo-->',q.get())
相同的原理 JoinableQueue
task_done 通知隊列已經有一個數據被處理了
q.join() 阻塞直到放入隊列中全部的數據都被處理掉(有多少個數據就接受到多少taskdone)
import time import random from multiprocessing import Process,JoinableQueue def consumer(q,name): while True: food = q.get() time.sleep(random.uniform(0.3,0.8)) print("%s吃了一個%s"%(name,food)) q.task_done() def producer(q,name,food): for i in range(10): time.sleep(random.uniform(0.3,0.8)) print("%s生產了%s%s"%(name,food,i)) q.put(food+str(i)) if __name__ == '__main__': jq = JoinableQueue() c1 = Process(target=consumer,args=(jq,"alex")) c1.daemon = True p1 = Process(target=producer,args=(jq,"libai","包子")) c1.start() p1.start() p1.join() jq.join()
管道 進程之間數據不安全 且存取數據複雜
進程池開啓的個數:默認是CPU的個數
開啓過多的進程並不能提升你的效率,反而會下降效率
計算密集型 充分佔用CPU 多進程能夠充分利用多核 適合開啓多進程,可是不適合開啓不少多進程
IO密集型 大部分時間都在阻塞隊列,而不是在運行狀態 根本不太適合開啓多進程
提交任務:
1.同步提交 apply
返回值:子進程對應函數的返回值
一個一個順序執行的,並無任何的併發效果
# import os # import time # from multiprocessing import Process,Pool # def task(num): # time.sleep(0.5) # print("%s: %s"%(num,os.getpid())) # return num ** 2 # if __name__ == '__main__': # p = Pool(4) # for i in range(20): # res = p.apply(task,args=(i,)) #apply 提交任務方法,同步提交 # print("--->",res) #四個任務依次執行,輪換
2.異步提交 apply_async
- ```Python import os import time from multiprocessing import Pool def task(num): time.sleep(1) print("%s: %s"%(num,os.getpid())) return num **2 if __name__ == '__main__': p = Pool(4) for i in range(20): res = p.apply_async(task,args=(i,)) #apply_async 異步提交 p.close() p.join() #輸出結果同時四個認識執行 ``` - 3.map()方法 - 異步提交的簡化版本 - 自帶close和join方法 - 直接拿到返回值的可迭代對象 - 循環能夠拿到返回值
數據共享 Manager
把全部實現了數據共享的比較便捷的類都從新又封裝了一遍,而且在原有的multiprocessing基礎上
支持的數據類型有限
list dict都不是安全的數據,你須要本身加鎖來保證數據的安全
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: d["count"] -= 1 if __name__ == '__main__': lock = Lock() with Manager() as m: #使用以後數據就會變成共享 dic = m.dict({"count":100}) p_l = [] for i in range(100): p = Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
進程池-----回調函數
當func執行完畢後執行callback函數
func的返回值做爲callback的參數
回調函數是在主進程實現的
子進程有大量的計算要去作,回調函數等待結果作簡單處理
import os from multiprocessing import Pool def func(i): print("第一個任務",os.getpid()) return "*"*i def call_back(res): print("回調函數",os.getpid()) ##pid號爲11420 與主進程pid號相同 print("res---->",res) if __name__ == '__main__': p = Pool() print("主進程",os.getpid()) #pid爲11420 說明回調函數是主進程實現的 p.apply_async(func,args=(1,),callback=call_back) p.close() p.join()
#基於多進程的共享數據的小爬蟲 import re from urllib.request import urlopen url_lst = [ 'http://www.baidu.com', 'http://www.sohu.com', 'http://www.sogou.com', 'http://www.4399.com', 'http://www.cnblogs.com', ] from multiprocessing import Pool def get_url(url): response = urlopen(url) ret = re.search("www\.(.*?)\.com",url) print("%s finished"%ret.group(1),ret.group()) return ret.group(1),response.read() def call(content): url,con = content with open(url+".html","wb") as f: f.write(con) if __name__ == '__main__': p = Pool() for url in url_lst: p.apply_async(get_url,args=(url,),callback=call) p.close() p.join()