一。線程池併發
線程池是一個處理線程任務的集合,他是能夠接受必定量的線程任務,並建立線程,處理該任務,處理結束後不會馬上關閉池子,會繼續等待提交的任務,也就是他們的進程/線程號不會改變。app
當線程池中的任務沒有結束時是不會接受下一個任務的。異步
它的操做有:socket
pool = ThreadPoolExecutor()tcp
建立一個線程池,其中括號中表明的是一次能夠接納的線程任務,能夠不加參數,不加參數其數量就是當前cpu的個數*5。函數
res = pool.submit(func,args)spa
提交一個任務,args表明的是函數的參數。res接受的是該submit的返回值,相似於以下的類:線程
<Future at 0x2057e656940 state=running>
state表明的是當前該線程的狀態。3d
res.result()code
而使用result能夠將提交的任務函數的返回值獲取。
這裏的result還有等待任務的返回值的做用。若是任務沒結束,就會一直等待,能夠將並行操做改爲串行操做。
pool.shutdown()
能夠將池子關閉,並等待池子終端 任務所有結束再執行下面代碼。
例子:
import time from concurrent.futures import ThreadPoolExecutor import os from gevent import os pool = ThreadPoolExecutor(5) def task(n): print(n,os.getpid()) time.sleep(2) return n**2 list_1 = [] for i in range(20): res = pool.submit(task,i) #提交任務 print(res.result())#等待任務的返回值 list_1.append(res) pool.shutdown() #關閉池子,等待池子中的任務運行完畢 for j in list_1: print('>>>',j.result()) print('主')
進程池:
進程池的使用和線程池差很少,區別僅只有包名不一樣,在進程池中咱們能夠驗證如下池中的進程/線程是不是用的一樣的進程/線程,使用os。getpid()方法便可。
進程值不傳值,裏面的數值默認時cpu的個數。
import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os from gevent import os # pool = ThreadPoolExecutor(5) pool = ProcessPoolExecutor(5) def task(n): print(n,os.getpid()) time.sleep(2) return n**2 def callback(n): print(n.result()) if __name__ == '__main__': list_1 = [] for i in range(20): res = pool.submit(task,i).add_done_callback(callback) #提交任務 # print(res.result())#等待任務的返回值 list_1.append(res)
異步回調:
除了上面使用的將返回的future對象添加到列表,再調用result()方法返回其返回值之外,還能夠對指派任務的返回值用
add_done_callback(callback)
方法,將該對象回調到callback(能夠自定義)函數,由函數接納處理該值,函數的參數就是任務的返回值,多個返回值要設置多個參數。
回調是在生產返回值時就運行的。
二。協程
協程就是在單線程的狀況下實現併發。
通常程序的多道技術都是用 切換+保存狀態 實現
在通常的cpu運算時,都是在五種狀態中來回切換的,程序運行的5種狀態:
1.新建。2.就緒。3.運行。4.阻塞。5.結束。
通常的,程序都是在2,3,4的狀態來回切換,有2種狀況。
1,程序遇到了io操做,由運行態進入到了阻塞態,直到io操做結束後再到阻塞態等待時間片。
2.程序的時間片用完,由運行態到就緒態。
協程的做用就是使得線程遇到io操做本身切換,運行的方式從1.變成2.線程持續不斷的就緒,能夠得到大量的cpu運算時間。
要實現這個功能須要考慮線程的保存狀態問題。
這裏就要用到迭代器的知識,yield,
yield能夠保存上一次操做的狀態,因此使用yield能夠驗證協程對計算密集型的線程操做後是否能加快效率。
#串行執行 0.8540799617767334 # import time # # def func1(): # for i in range(10000000): # i+1 # # def func2(): # for i in range(10000000): # i+1 # # start = time.time() # func1() # func2() # stop = time.time() # print(stop - start)
#基於yield併發執行 1.3952205181121826 # import time # def func1(): # while True: # 10000000+1 # yield # # def func2(): # g=func1() # for i in range(10000000): # time.sleep(100) # 模擬IO,yield並不會捕捉到並自動切換 # i+1 # next(g) # # start=time.time() # func2() # stop=time.time() # print(stop-start)
能夠看到,在計算密集的線程中,不斷切換線程是不利於程序的運行的。
而yield不能識別io操做,而進行線程之間的切換的,因此須要引入一個模塊gevent。
gevent是一個能夠識別io的魔塊,但不能識別time.sleep,因此還要調用另一個模塊識別time.sleep。
from gevent import monkey;monkey.patch_all()
# 因爲該模塊常常被使用 因此建議寫成一行 from gevent import spawn import time
spawn()能夠檢測()中的全部任務
def heng(): print("哼") time.sleep(2) print('哼') def ha(): print('哈') time.sleep(3) print('哈') def heiheihei(): print('嘿嘿嘿') time.sleep(5) print('嘿嘿嘿') start = time.time() g1 = spawn(heng) g2 = spawn(ha) # spawn會檢測全部的任務 g3 = spawn(heiheihei) g1.join() g2.join() g3.join() # heng() # ha() print(time.time() - start) 哼 哈 嘿嘿嘿 哼 哈 嘿嘿嘿 5.033252716064453
本來10秒鐘的程序,如今須要5秒鐘就能夠運行結束了。
spawn能夠將全部線程添加至一個列表,輪流運行其沒有io操做的部分。
spawn有一個返回值g
注意,須要在程序最後等待全部程序都運行結束才結束程序,使用g.join方法。
三。使用gevent實現tcp的併發
from gevent import monkey;monkey.patch_all() import socket from gevent import spawn server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): while True: try: data = conn.recv(1024) if len(data) == 0:break print(data.decode('utf-8')) conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close() def server1(): while True: conn, addr = server.accept() spawn(talk,conn) if __name__ == '__main__': g1 = spawn(server1) g1.join()
四。IO模型。
1.阻塞型IO
阻塞型io是在進行io操做時,先跳入阻塞態,而後等待數據。
數據得到後拷貝數據,
最後再進入就緒態,
其中等待數據和拷貝數據都是再阻塞狀態:
2.非阻塞io
非阻塞io是在遇到io操做時,先發送接受數據請求,若是沒有數據就返回一個沒有的信號,以後會反覆發送數據請求,直到有數據爲止,這種模型很佔cpu操做。
3.IO多路複用
這個模型中有一個select,是一個監測機制,相似於列表,管理io操做。
當須要進行io操做時,調用select尋找數據,若是找到數據就返回數據,
等待的操做所有交給select。
4.異步IO(asyn。。。。)
在遇到io操做時,有一個回調機制,當須要io操做時,回調機制(內存中)會去尋找數據,當尋找到數據後會返回數據