什麼是池?程序員
在保證計算機硬件安全的狀況下最大限度的利用計算機,池實際上是下降了程序的運行效率,可是保證了計算機硬件的安全(硬件的發展跟不上軟件的速度)安全
進程池和線程池就是咱們能夠提早在池子裏放上固定數量的進程或者線程,等到任務來了,就直接從池子裏拿一個進程或線程來處理任務,等任務處理完畢以後,進程或線程也不關閉,而是繼續放回池子中等待任務,若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。多線程
#進程池
from concurrent.futures import ProcessPoolExecutor import time import os pool = ProcessPoolExecutor() #能夠傳參數,若是不傳默認是計算機CPU的個數
def task(n): print(n) time.sleep(2) if __name__ == '__main__': for i in range(20): pool.submit(task,i) # 朝進程池提交任務
#線程池
rom concurrent.futures import ThreadPoolExecutor import time import os pool = ThreadPoolExecutor() #能夠傳參數指定線程池內的線程個數,若是不傳默認是計算機CPU的個數乘5
def task(n): print(n) time.sleep(2) if __name__ == '__main__': for i in range(20): pool.submit(task,i) # 朝進程池提交任務
同步:提交任務以後 原地等待任務的返回結果 期間不作任何事
異步:提交任務以後 不等待任務的返回結果(異步的結果怎麼拿???) 直接執行下一行代碼併發
from concurrent.futures import ProcessPoolExecutor import time import os pool = ProcessPoolExecutor(5) def task(n): print(n) time.sleep(2) return n**2 t_list = [] if __name__ == '__main__': for i in range(20): res = pool.submit(task,i) #print(res.result()) # 原地等待任務的結果(將併發變成了串行)
t_list.append(res) pool.shutdown() # 關閉池子,等待池子中全部的任務執行完畢才能往下執行代碼
for p in t_list: print('>>>:',p.result())
異步回調機制:當異步提交的任務有返回結果以後,會自動觸發回調函數的執行app
from concurrent.futures import ProcessPoolExecutor import time import os pool = ProcessPoolExecutor(5) def task(n): print(n) time.sleep(2) return n**2
def call_back(n): print('拿到了異步提交的結果:',n.result()) t_list = [] if __name__ == '__main__': for i in range(20): res = pool.submit(task,i).add_done_callback(call_back) # # 提交任務的時候 綁定一個回調函數 一旦該任務有結果 馬上執行對於的回調函數
t_list.append(res)
進程:資源單位異步
線程:執行單位socket
協程:單線程下實現併發函數
併發就是切換+保留狀態(看起來像同時執行的,就能夠稱之爲併發)工具
協程:徹底 是程序員想象出來的名詞,單線程下實現併發spa
程序員本身經過代碼本身檢測程序中的IO,一旦遇到IO本身經過代碼切換,給操做系統的感受就是你這個線程沒有任何IO狀態。(就是欺騙操做系統,讓它覺得你這個程序一直沒有IO,從而保證程序在運行態和就緒態來回切換,能夠提高代碼的運行效率)
切換+保存狀態就必定可以提高效率嗎???
當你的任務是iO密集型的狀況下 提高效率
若是你的任務是計算密集型的 下降效率
#經過代碼進行驗證 #串行併發 0.12008428573608398
import time def func1(): for i in range(10000): i+1
def func2(): for i in range(10000): i+1 start = time.time() func1() func2() stop = time.time() print(stop - start) #基於yield保存狀態的特色來併發 0.2261803150177002
import time def func1(): while True: 10000+1
yield
def func2(): g=func1() for i in range(1000000): i+1 next(g) start = time.time() func2() stop=time.time() print(stop-start)
咱們也能夠經過time.sleep來模擬IO,然而yield並不會捕捉到並自動切換,因此咱們找一個可以識別IO操做的工具,因而咱們須要瞭解一個新的模塊,gevent模塊。
可是gevent模塊並不能自動識別time.sleep等IO狀況,因此要手動再配置一個參數
from gevent import monkey monkey.patch_all() # 因爲該模塊常常被使用 因此建議寫成一行,以下
from gevent import monkey;monkey.patch_all()
#驗證
from gevent import monkey;monkey.patch_all() from gevent import spawn import time def heng(): print('哼') time.sleep(1) print('哼') def ha(): print('哈') time.sleep(2) print('哈') def hei(): print('嘿') time.sleep(3) print('嘿') start = time.time() g1 = spawn(heng) # spawn會檢測全部任務
g2 = spawn(ha) g3 = spawn(hei) g1.join() g2.join() g3.join() print(time.time()-start)
spawn就像一個列表,第一個任務來了就把它放進去,第二個任務來了也把它放進去,當第一個任務遇到IO操做時,立刻去執行第二個任務,若是第二個任務遇到IO的時候又切換回第一個任務。
#客戶端
import socket from threading import Thread,current_thread def client(): client = socket.socket() client.connect(('127.0.0.1',8080)) n = 0 while True: data = '%s %s'%(current_thread().name,n) client.send(data.encode('utf-8')) res = client.recv(1024) print(res.decode('utf-8')) n += 1
#經過開啓多線程來完成多個客戶端的併發
for i in range(400): t = Thread(target=client) t.start() #服務端
from gevent import monkey;monkey.patch_all() import socket from gevent import spawn server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): # 把通訊循環拿出來作成一個函數
while True: try: data = conn.recv(1024) if len(data) == 0:break
print(data.decode('utf-8')) conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close() def ser(): # 把鏈接循環拿出來作一個函數
while True: conn,addr = server.accept() spawn(talk,conn) # 檢測talk
if __name__ == '__main__': g1 = spawn(ser) # 檢測 ser
g1.join() # 讓主線程不會直接結束