一 協程html
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。python
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:git
協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。github
協程的好處:編程
缺點:數組
使用yield實現協程操做例子 併發
def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name, new_baozi)) # time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n += 1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" % n) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
咱們先給協程一個標準定義,即符合什麼條件就能稱之爲協程:異步
基於上面這4點定義,咱們剛纔用yield實現的程並不能算是合格的線程,由於它有一點功能沒實現,哪一點呢?是第4點。socket
二 Greenletasync
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator
安裝gevent模塊後,就可使用greenlet了。
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) #啓動一個協程 gr2 = greenlet(test2) gr1.switch() #手動切換協程,相似於yield的__next__()
感受確實用着比generator還簡單了呢,但好像尚未解決一個問題,就是遇到IO操做,自動切換,對不對?
三 Gevent
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
gevent能夠自動識別IO操做,以下程序中,fun1函數執行過程當中,進行了2秒的IO操做,fun2函數執行過程當中,進行了1秒的IO操做,使用gevent協程操做,遇到IO操做就自動切換,從而實現了併發,原本串行執行完這兩個函數須要3秒,如今只須要2秒。
import gevent def func1(): print('running in func1') gevent.sleep(2) #模擬IO操做 print('wait 1 s switch to func1') def func2(): print('switch to func2') gevent.sleep(1) print('wait 1 s switch to func2') gevent.joinall([ gevent.spawn(func1), gevent.spawn(func2), ])
running in func1 switch to func2 wait 1 s switch to func2 wait 1 s switch to func1
同步與異步的性能區別
import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn
。 初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
前面咱們知道了,gevent 遇到IO阻塞時會自動切換任務。接下來咱們再看一個例子
from urllib import request import gevent def f(url): print('GET:%s' %url) resp = request.urlopen(url) data = resp.read() # f = open('url.html','wb') # f.write(data) # f.close() print('%d bytes received from %s' %(len(data),url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), # gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/') ])
比較一下 使用協程爬多個網頁 和 使用同步串行爬多個網頁 所花費的時間:
from urllib import request import gevent import time def f(url): print('GET:%s' %url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s' %(len(data),url)) async_start_time = time.time() gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://github.com/') ]) print('並行cost:',time.time()-async_start_time)
from urllib import request import time def f(url): print('GET:%s' %url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s' %(len(data),url)) url = [ 'https://www.python.org/', 'https://github.com/', ] sync_start_time = time.time() for url in url: f(url) print('串行cost:',time.time()-sync_start_time)
咱們發現,兩者所用時間差很少,並行並無比串行快。這是由於,gevent默認檢測不到urllib(還有前面所學的socket)的IO操做,因此它遇到IO阻塞後沒有自動切換任務,也就是說gevent對urllib來講很差使。
那麼怎麼才能讓gevent知道urllib正在進行IO操做呢,給它打個補丁。monkey.patch_all()就是把當前程序的全部IO操做全都單獨打上標記,這樣,gevent遇到它本身不能識別的IO操做時,由於有了標記,gevent也可以自動切換任務。
經過gevent實現單線程下的urllib爬網頁併發
from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #把當前程序的全部IO操做給我單獨打上標記 def f(url): print('GET:%s' %url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s' %(len(data),url)) async_start_time = time.time() gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://github.com/') ]) print('並行cost:',time.time()-async_start_time) #並行cost: 3.2701869010925293
經過gevent實現單線程下的多socket併發
server side
import gevent from gevent import socket, monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() #過來一個連接 gevent.spawn(handle_request, cli) #將連接交個gevent去起一個協程,把新生成的客戶端的鏈接實例交給handle_request方法 def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
client side
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"), encoding="utf8") s.sendall(msg) data = s.recv(1024) # print(data) print('Received', repr(data)) s.close()
import socket import threading def sock_conn(): client = socket.socket() client.connect(("localhost",8001)) count = 0 while True: #msg = input(">>:").strip() #if len(msg) == 0:continue client.send( ("hello %s" %count).encode("utf-8")) data = client.recv(1024) print("[%s]recv from server:" % threading.get_ident(),data.decode()) #結果 count +=1 client.close() for i in range(100): t = threading.Thread(target=sock_conn) t.start()