協程:英文名Coroutine,是單線程下的併發,又稱微線程,纖程。chrome
協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。對比操做系統控制線程的切換,用戶在單線程內控制協程的切換。編程
協程本身自己沒法實現併發(甚至性能會下降),協程+IO切換性能提升。數組
一般程序中子程序調用老是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不一樣。多線程
協程看上去也是子程序,但執行過程當中,在子程序內部可中斷,而後轉而執行別的子程序,在適當的時候再返回來接着執行。併發
注意,在一個子程序中中斷,去執行其餘子程序,不是函數調用,有點相似CPU的中斷。異步
看起來A、B的執行有點像多線程,但協程的特色在因而一個線程執行,那和多線程比,協程有何優點?socket
最大的優點就是協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。async
第二大優點就是不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。異步編程
由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。函數
Python對協程的支持是經過generator實現的。
在generator中,咱們不但能夠經過for
循環來迭代,還能夠不斷調用next()
函數獲取由yield
語句返回的下一個值。
可是Python的yield
不但能夠返回一個值,它還能夠接收調用者發出的參數。
來看例子:
傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但一不當心就可能死鎖。
若是改用協程,生產者生產消息後,直接經過yield
跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高:
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
執行結果:
[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK
注意到consumer
函數是一個generator
,把一個consumer
傳入produce
後:
c.send(None)
啓動生成器;c.send(n)
切換到consumer
執行;consumer
經過yield
拿到消息,處理,又經過yield
把結果傳回;produce
拿到consumer
處理的結果,繼續生產下一條消息;produce
決定不生產了,經過c.close()
關閉consumer
,整個過程結束。整個流程無鎖,由一個線程執行,produce
和consumer
協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。
最後套用Donald Knuth的一句話總結協程的特色:「子程序就是協程的一種特例。」
若是咱們在單個線程內有20個任務,要想實如今多個任務之間切換,使用yield生成器的方式過於麻煩(須要先獲得初始化一次的生成器,而後再調用send。。。很是麻煩),而使用greenlet模塊能夠很是簡單地實現這20個任務直接的切換。
pip3 install greenlet
單純的切換(在沒有io的狀況下或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度。
from greenlet import greenlet def eat(name): print('%s eat 1' % name) g2.switch('nick') print('%s eat 2' % name) g2.switch() def play(name): print('%s play 1' % name) g1.switch() print('%s play 2' % name) g1 = greenlet(eat) g2 = greenlet(play) g1.switch('nick') # 能夠在第一次switch時傳入參數,之後都不須要
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。
單線程裏的這20個任務的代碼一般會既有計算操做又有阻塞操做,咱們徹底能夠在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2...如此,才能提升效率,這就用到了Gevent模塊。
#順序執行 import time def f1(): res=1 for i in range(100000000): res+=i def f2(): res=1 for i in range(100000000): res*=i start=time.time() f1() f2() stop=time.time() print('run time is %s' %(stop-start)) #10.985628366470337 #切換 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() stop=time.time() print('run time is %s' %(stop-start)) # 52.763017892837524
Gevent 是一個第三方庫,能夠輕鬆實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet,它是以C擴展模塊形式接入Python的輕量級協程。
pip3 install gevent
g1=gevent.spawn(func,1,,2,3,x=4,y=5):# 建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的 g2=gevent.spawn(func2) g1.join():#等待g1結束 g2.join():#等待g2結束 #上述兩步合成一步: gevent.joinall([g1,g2])
:#拿到func1的返回值g1.value
import gevent
def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() # 或者gevent.joinall([g1,g2]) print('主')
上例gevent.sleep(2)
模擬的是gevent能夠識別的io阻塞,而time.sleep(2)
或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了。
from gevent import monkey;monkey.patch_all()
必須放到被打補丁者的前面,如time,socket模塊以前。或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()
放到文件的開頭。
from gevent import monkey;monkey.patch_all() import gevent import time
def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print('主')
咱們能夠用threading.current_thread().getName()
來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程
from gevent import monkey;monkey.patch_all() import threading import gevent import time
def eat(): print(threading.current_thread().getName()) print('eat food 1') time.sleep(2) print('eat food 2') def play(): print(threading.current_thread().getName()) print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print('主')
from gevent import spawn,joinall,monkey;monkey.patch_all() import time
def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): # 同步 for i in range(10): task(i) def asynchronous(): # 異步 g_l=[spawn(task,i) for i in range(10)] joinall(g_l) print('DONE') if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
# 上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 # 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數, # 後者阻塞當前流程,並執行全部給定的greenlet任務。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
經過gevent實現單線程下的socket併發
注意:from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞。
from gevent import monkey;monkey.patch_all() from socket import * import gevent #若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print('client %s:%s msg: %s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1',8080)
from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) #套接字對象必定要加到函數內,即局部名稱空間內,放在函數外則被全部線程共享,則你們公用一個套接字對象,那麼客戶端端口永遠同樣了 c.connect((server_ip,port)) count=0 while True: c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8')) msg=c.recv(1024) print(msg.decode('utf-8')) count+=1 if __name__ == '__main__': for i in range(500): t=Thread(target=client,args=('127.0.0.1',8080)) t.start()