上一節中咱們知道GIL鎖將致使CPython中多線程沒法並行執行,只能併發的執行。git
而併發實現的原理是切換+保存,那就意味着使用多線程實現併發,就須要爲每個任務建立一個線程,必然增長了線程建立銷燬與切換的帶來的開銷github
明顯的問題就是,高併發狀況下,因爲任務數量太多致使沒法開啓新的線程,使得即沒有實際任務要執行,也沒法建立新線程來處理新任務的狀況編程
如何解決上述問題呢,首先要保證併發效果,而後來想辦法避免建立線程帶來的開銷問題;json
協程既是所以而出現的,其原理是使用單線程來實現多任務併發,那麼如何能實現單線程併發呢?服務器
單線程實現併發這句話乍一聽好像在瞎說多線程
首先須要明確併發的定義併發
併發:指的是多個任務同時發生,看起來好像是同時都在進行異步
並行:指的是多個任務真正的同時進行socket
早期的計算機只有一個CPU,既然CPU能夠切換線程來實現併發,那麼爲什麼不能在線程中切換任務來併發呢?
因此線程實現併發理論上是可行的
併發 = 切換任務+保存狀態,只要找到一種方案,可以在兩個任務之間切換執行而且保存狀態,那就能夠實現單線程併發
python中的生成器就具有這樣一個特色,每次調用next都會回到生成器函數中執行代碼,這意味着任務之間能夠切換,而且是基於上一次運行的結果,這意味着生成器會自動保存執行狀態!
因而乎咱們能夠利用生成器來實現併發執行:
def task1(): while True: yield print("task1 run") def task2(): g = task1() while True: next(g) print("task2 run") task2()
併發雖然實現了,可是這對效率的影響是好是壞呢?來測試一下
# 兩個計算任務一個採用生成器切換併發執行 一個直接串行調用 import time def task1(): a = 0 for i in range(10000000): a += i yield def task2(): g = task1() b = 0 for i in range(10000000): b += 1 next(g) s = time.time() task2() print("併發執行時間",time.time()-s) # 單線程下串行執行兩個計算任務 效率反而比並發高 由於併發須要切換和保存 def task1(): a = 0 for i in range(10000000): a += i def task2(): b = 0 for i in range(10000000): b += 1 s = time.time() task1() task2() print("串行執行時間",time.time()-s)
能夠看到對於純計算任務而言,單線程併發反而使執行效率降低了一半左右,因此這樣的方案對於純計算任務而言是沒有必要的
咱們暫且不考慮這樣的併發對程序的好處是什麼,在上述代碼中,使用yield來切換是的代碼結構很是混亂,若是十個任務須要切換呢,不敢想象!所以就有人專門對yield進行了封裝,這便有了greenlet模塊
from greenlet import greenlet def eat(name): print('%s eat 1' %name) g2.switch('jack') print('%s eat 2' %name) g2.switch() def play(name): print('%s play 1' %name) g1.switch() print('%s play 2' %name) g1=greenlet(eat) g2=greenlet(play) g1.switch('rose')#能夠在第一次switch時傳入參數,之後都不須要再次傳
該模塊簡化了yield複雜的代碼結構,實現了單線程下多任務併發,可是不管直接使用yield仍是greenlet都不能檢測IO操做,遇到IO時一樣進入阻塞狀態,一樣的對於純計算任務而言效率也是沒有任何提高的。
測試:
#切換 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() stop=time.time() print('run time is %s' %(stop-start)) # 52.763017892837524
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題,
任務的代碼一般會既有計算操做又有阻塞操做,咱們徹底能夠在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提升效率,這就用到了Gevent模塊。
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
須要強調的是:
#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行) #2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)
對比操做系統控制線程的切換,用戶在單線程內控制協程的切換
優勢以下:
#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級 #2. 單線程內就能夠實現併發的效果,最大限度地利用cpu
缺點以下:
#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程來儘量提升效率 #2. 協程本質是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
#用法 #建立一個協程對象g1, g1=gevent.spawn(func,1,,2,3,x=4,y=5) #spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合做一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
import gevent,sys from gevent import monkey # 導入monkey補丁 monkey.patch_all() # 打補丁 import time print(sys.path) def task1(): print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print("task2 run") # gevent.sleep(1) time.sleep(1) print("task2 over") g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) #gevent.joinall([g1,g2]) g1.join() g2.join() # 執行以上代碼會發現不會輸出任何消息 # 這是由於協程任務都是以異步方式提交,因此主線程會繼續往下執行,而一旦執行完最後一行主線程也就結束了, # 致使了協程任務沒有來的及執行,因此這時候必須join來讓主線程等待協程任務執行完畢 也就是讓主線程保持存活 # 後續在使用協程時也須要保證主線程一直存活,若是主線程不會結束也就意味着不須要調用join
須要注意:
1.若是主線程結束了 協程任務也會當即結束。
2.monkey補丁的原理是把原始的阻塞方法替換爲修改後的非阻塞方法,即偷樑換柱,來實現IO自動切換
必須在打補丁後再使用相應的功能,避免忘記,建議寫在最上方
咱們能夠用threading.current_thread().getName()來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程
#myjson.py def dump(): print("一個被替換的 dump函數") def load(): print("一個被替換的 load函數")
# test.py import myjson import json # 補丁函數 def monkey_pacth_json(): json.dump = myjson.dump json.load = myjson.load # 打補丁 monkey_pacth_json() # 測試 json.dump() json.load() # 輸出: # 一個被替換的 dump函數 # 一個被替換的 load函數
from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print('GET: %s' %url) response=requests.get(url) if response.status_code == 200: print('%d bytes received from %s' %(len(response.text),url)) start_time=time.time() gevent.joinall([ gevent.spawn(get_page,'https://www.python.org/'), gevent.spawn(get_page,'https://www.yahoo.com/'), gevent.spawn(get_page,'https://github.com/'), ]) stop_time=time.time() print('run time is %s' %(stop_time-start_time))
#=====================================服務端 from gevent import monkey;monkey.patch_all() from socket import * import gevent #若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print('client %s:%s msg: %s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1',8080)
#=====================================多線程模擬多個客戶端併發訪問 from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) #套接字對象必定要加到函數內,即局部名稱空間內,放在函數外則被全部線程共享,則你們公用一個套接字對象,那麼客戶端端口永遠同樣了 c.connect((server_ip,port)) count=0 while True: c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8')) msg=c.recv(1024) print(msg.decode('utf-8')) count+=1 if __name__ == '__main__': for i in range(500): t=Thread(target=client,args=('127.0.0.1',8080)) t.start()