python--(協程 和 I/O多路複用)python
一.協程linux
1. >>>單線程下實現併發, 最大化線程的效率, 檢測 IO 並自動切換,程序級別的任務切換, 以前多進程多線程都是系統級別的切換, 程序級別的切換比系統要快的多.編程
#協程:單線程下實現併發
#併發:僞並行,遇到IO就切換,單核下多個任務之間切換執行,給你的效果就是貌似你的幾個程序在同時執行.提升效率 #多線程多進程下的任務切換+保存狀態是操做系統
#任務切換 + 保存狀態
#並行:多核cpu,真正的同時執行
#串行:一個任務執行完在執行另一個任務windows
# 串行 # import time # # def func1(): # time.sleep(1) # print('func1') # # def func2(): # time.sleep(2) # print('func2') # # if __name__ == '__main__': # func1() # func2()
#基於yield併發執行,多任務之間來回切換,這就是個簡單的協程的體現,可是他不能節省I/O時間.服務器
import time def consumer(): '''任務1:接收數據,處理數據''' while True: x=yield # time.sleep(1) #發現什麼?只是進行了切換,可是並無節省I/O時間 print('處理了數據:',x) def producer(): '''任務2:生產數據''' g=consumer() # print('asdfasfasdf') next(g) #找到了consumer函數的yield位置 for i in range(3): # for i in range(10000000): g.send(i) #給yield傳值,而後再循環給下一個yield傳值,而且多了切換的程序,比直接串行執行還多了一些步驟,致使執行效率反而更低了。 print('發送了數據:',i) start=time.time() #基於yield保存狀態,實現兩個任務直接來回切換,即併發的效果 #PS:若是每一個任務中都加上打印,那麼明顯地看到兩個任務的打印是你一次我一次,即併發執行的. producer() #我在當前線程中只執行了這個函數,可是經過這個函數裏面的send切換了另一個任務 stop=time.time() # 串行執行的方式 res=producer() consumer(res) stop=time.time() print(stop-start)
import time # def consumer(): # for i in range(10): # # x=yield # time.sleep(1) # print('處理了數據:',i) # def producer(): # g=consumer() # next(g) # for i in range(3): # g.send(i) # print('發送了數據:',i) # # # start=time.time() # producer() # stop=time.time() # print(stop-start) # import time # def consumer(): # for i in range(4): # time.sleep(1) # # print('處理了數據:',i) # def producer(): # for i in range(3): # print('發送了數據:',i) # # start=time.time() # consumer() #3.00097393989563 # producer() # stop=time.time() # print('>>>>>',stop-start) import time def consumer(): for i in range(4): x = yield time.sleep(1) print('處理了數據:',i) def producer(): g = consumer() next(g) for i in range(3): g.send(i) print('發送了數據:',i) # greenlet # start=time.time() # producer() # stop=time.time() # print(stop-start)
2. Greenlet: 多線程
#安裝: pip3 install greenlet 併發
>>>任務切換 + 保存狀態,沒有實現IO自動切換,app
>>>greenlet只是提供了一種比 generator 更加便捷的切換方式, 當切到一個任務時若是遇到io, 那就原地阻塞, 仍然是沒有解決遇到IO自動切換提高效率的問題.異步
import gevent from gevent import monkey monkey.patch_all() import time def eat(name): print('%s eat 1' %name) # gevent.sleep(2) time.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) # gevent.sleep(2) time.sleep(2) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') #異步執行這個eat任務,後面egon就是給他傳的參數 g2=gevent.spawn(play,name='egon') # g1.join() # g2.join() gevent.joinall([g1,g2]) print('主')
3.Gevent socket
#安裝: pip3 install gevent
>>>任務切換 + 保存狀態,實現了IO自動切換,而且經過monkey 可以識別到基本上全部的IO操做.
>>>Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
1 = gevent.spawn(func, 1, 2, 3, x=4, y=5) #建立一個協程對象g1, spawn口號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置 實參或關鍵字實參,都是傳給函數eat的 g2 = gevent.spawn(func2) g1.join() #等待g1結束,上面只是建立協程對象,這個join纔是去執行 g2.join() #等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,可是你會發現,若是g2裏面的任務執行的時間長,可是不寫join的話,就不會執行完等到g2剩下的任務了 #或者上述兩步合做一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
import gevent from gevent import monkey monkey.patch_all() import time def eat(name): print('%s eat 1' %name) # gevent.sleep(2) time.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) # gevent.sleep(2) time.sleep(2) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') #異步執行這個eat任務,後面egon就是給他傳的參數 g2=gevent.spawn(play,name='egon') # g1.join() # g2.join() gevent.joinall([g1,g2]) print('主')
二.I/O模型介紹
對於network IO 他會涉及兩個系統對象:
# 1.調用IO的Process or thread
# 2.系統內核(kernel)
當一個read/recv讀數據的操做發生時,該操做會經歷兩個階段:
# 1)等待數據準備 (Waiting for the data to be ready)
# 2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)
補充:
# 1.輸入操做:read、readv、recv、recvfrom、recvmsg共5個函數,若是會阻塞狀態,則會經歷 # wait data和copy data兩個階段,若是設置爲非阻塞則在wait 不到data時拋出異常
#二、輸出操做:write、writev、send、sendto、sendmsg共5個函數,在發送緩衝區滿了會阻塞在原地,若是設置爲非阻塞,則會拋出異常
#三、接收外來連接:accept,與輸入操做相似
#四、發起外出連接:connect,與輸出操做相似
1.阻塞IO模型 blocking
IO
2.非阻塞IO模型
>>>徹底沒有阻塞,不推薦使用
import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) print('你看看卡在哪') server.setblocking(False) rlist = [] rl = [] while 1: try: conn, addr = server.accept() print(addr) rlist.append(conn) print('來自%s:%s的連接請求'%(addr[0],addr[1])) except BlockingIOError: print('去買點藥') # time.sleep(0.1) print('rlist',rlist,len(rlist)) for con in rlist: try: from_client_msg = con.recv(1024) except BlockingIOError: continue except ConnectionResetError: con.close() rl.append(con) print('>>>>',rl) for remove_con in rl: rlist.remove(remove_con) rl.clear()
import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) print('你看看卡在哪') server.setblocking(False) while 1: try: conn, addr = server.accept() print('來自%s的連接請求'%addr) except BlockingIOError: print('去買點藥') time.sleep(0.1)
import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) print('你看看卡在哪') server.setblocking(False) rlist = [] rl = [] while 1: try: conn, addr = server.accept() print(addr) rlist.append(conn) print('來自%s:%s的連接請求'%(addr[0],addr[1])) except BlockingIOError: print('去買點藥') # time.sleep(0.1) print('rlist',rlist,len(rlist)) for con in rlist: try: from_client_msg = con.recv(1024) except BlockingIOError: continue except ConnectionResetError: con.close() rl.append(con) print('>>>>',rl) for remove_con in rl: rlist.remove(remove_con) rl.clear()
import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) print('你看看卡在哪') server.setblocking(False) rlist = [] rl = [] while 1: try: conn, addr = server.accept() print(addr) rlist.append(conn) print('來自%s:%s的連接請求'%(addr[0],addr[1])) except BlockingIOError: print('去買點藥') # time.sleep(0.1) print('rlist',rlist,len(rlist)) for con in rlist: try: from_client_msg = con.recv(1024) except BlockingIOError: continue except ConnectionResetError: con.close() rl.append(con) print('>>>>',rl) for remove_con in rl: rlist.remove(remove_con) rl.clear()
# 服務端 import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) server.setblocking(False) #設置不阻塞 r_list=[] #用來存儲全部來請求server端的conn鏈接 w_list={} #用來存儲全部已經有了請求數據的conn的請求數據 while 1: try: conn,addr=server.accept() #不阻塞,會報錯 r_list.append(conn) #爲了將鏈接保存起來,否則下次循環的時候,上一次的鏈接就沒有了 except BlockingIOError: # 強調強調強調:!!!非阻塞IO的精髓在於徹底沒有阻塞!!! # time.sleep(0.5) # 打開該行註釋純屬爲了方便查看效果 print('在作其餘的事情') print('rlist: ',len(r_list)) print('wlist: ',len(w_list)) # 遍歷讀列表,依次取出套接字讀取內容 del_rlist=[] #用來存儲刪除的conn鏈接 for conn in r_list: try: data=conn.recv(1024) #不阻塞,會報錯 if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下數據 conn.close() del_rlist.append(conn) continue w_list[conn]=data.upper() except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收 continue except ConnectionResetError: # 當前套接字出異常,則關閉,而後加入刪除列表,等待被清除 conn.close() del_rlist.append(conn) # 遍歷寫列表,依次取出套接字發送內容 del_wlist=[] for conn,data in w_list.items(): try: conn.send(data) del_wlist.append(conn) except BlockingIOError: continue # 清理無用的套接字,無需再監聽它們的IO操做 for conn in del_rlist: r_list.remove(conn) #del_rlist.clear() #清空列表中保存的已經刪除的內容 for conn in del_wlist: w_list.pop(conn) #del_wlist.clear() #客戶端 import socket import os import time import threading client=socket.socket() client.connect(('127.0.0.1',8083)) while 1: res=('%s hello' %os.getpid()).encode('utf-8') client.send(res) data=client.recv(1024) print(data.decode('utf-8')) ##多線程的客戶端請求版本 # def func(): # sk = socket.socket() # sk.connect(('127.0.0.1',9000)) # sk.send(b'hello') # time.sleep(1) # print(sk.recv(1024)) # sk.close() # # for i in range(20): # threading.Thread(target=func).start()
import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) server.setblocking(False) #設置不阻塞 r_list=[] #用來存儲全部來請求server端的conn鏈接 w_list={} #用來存儲全部已經有了請求數據的conn的請求數據 while 1: try: conn,addr=server.accept() #不阻塞,會報錯 r_list.append(conn) #爲了將鏈接保存起來,否則下次循環的時候,上一次的鏈接就沒有了 except BlockingIOError: # 強調強調強調:!!!非阻塞IO的精髓在於徹底沒有阻塞!!! # time.sleep(0.5) # 打開該行註釋純屬爲了方便查看效果 print('在作其餘的事情') # print('rlist: ',len(r_list)) # print('wlist: ',len(w_list)) # 遍歷讀列表,依次取出套接字讀取內容 del_rlist=[] #用來存儲刪除的conn鏈接 for conn in r_list: try: data=conn.recv(1024) #不阻塞,會報錯 if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下數據 conn.close() del_rlist.append(conn) continue w_list[conn]=data.upper() except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收 continue except ConnectionResetError: # 當前套接字出異常,則關閉,而後加入刪除列表,等待被清除 conn.close() del_rlist.append(conn) # 遍歷寫列表,依次取出套接字發送內容 del_wlist=[] for conn,data in w_list.items(): try: conn.send(data) del_wlist.append(conn) except BlockingIOError: continue # 清理無用的套接字,無需再監聽它們的IO操做 for conn in del_rlist: r_list.remove(conn) #del_rlist.clear() #清空列表中保存的已經刪除的內容 for conn in del_wlist: w_list.pop(conn) #del_wlist.clear()
雖然咱們上面的代碼經過設置非阻塞,規避了IO操做,可是非阻塞IO模型毫不被推薦。
咱們不可否則其優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在「」同時「」執行)。
可是也難掩其缺點:
#1. 循環調用recv()將大幅度推高CPU佔用率;這也是咱們在代碼中留一句time.sleep(2)的緣由,不然在低配主機下極容易出現卡機狀況 #2. 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。
3.IO多路複用:三種機制
Select: 代理監聽全部的須要使用的對象,輪訓本身監聽的那個列表.windows linux
Poll: 沒有監聽數量的限制 linux
Epoll: 回調機制 Linux
Seletor: 根據系統自動選擇一個最優的機制
python中的select模塊:
import select fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout]) 參數: 可接受四個參數(前三個必須) rlist: wait until ready for reading #等待讀的對象,你須要監聽的須要獲取數據的對象列表 wlist: wait until ready for writing #等待寫的對象,你須要寫一些內容的時候,input等等,也就是說我會循環他看看是否有須要發送的消息,若是有我取出這個對象的消息併發送出去,通常用不到,這裏咱們也給一個[]。 xlist: wait for an 「exceptional condition」 #等待異常的對象,一些額外的狀況,通常用不到,可是必須傳,那麼咱們就給他一個[]。 timeout: 超時時間 當超時時間 = n(正整數)時,那麼若是監聽的句柄均無任何變化,則select會阻塞n秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。 返回值:三個列表與上面的三個參數列表是對應的 select方法用來監視文件描述符(當文件描述符條件不知足時,select會阻塞),當某個文件描述符狀態改變後,會返回三個列表 1、當參數1 序列中的fd知足「可讀」條件時,則獲取發生變化的fd並添加到fd_r_list中 2、當參數2 序列中含有fd時,則將該序列中全部的fd添加到 fd_w_list中 3、當參數3 序列中的fd發生錯誤時,則將該發生錯誤的fd添加到 fd_e_list中 四、當超時時間爲空,則select會一直阻塞,直到監聽的句柄發生變化
結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接
#服務端 from socket import * import select server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8093)) server.listen(5) # 設置爲非阻塞 server.setblocking(False) # 初始化將服務端socket對象加入監聽列表,後面還要動態添加一些conn鏈接對象,當accept的時候sk就有感應,當recv的時候conn就有動靜 rlist=[server,] rdata = {} #存放客戶端發送過來的消息 wlist=[] #等待寫對象 wdata={} #存放要返回給客戶端的消息 print('預備!監聽!!!') count = 0 #寫着計數用的,爲了看實驗效果用的,沒用 while True: # 開始 select 監聽,對rlist中的服務端server進行監聽,select函數阻塞進程,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手信號,從而變得可讀,知足select函數的「可讀」條件),被觸發的(有動靜的)套接字(服務器套接字)返回給了rl這個返回值裏面; rl,wl,xl=select.select(rlist,wlist,[],0.5) print('%s 次數>>'%(count),wl) count = count + 1 # 對rl進行循環判斷是否有客戶端鏈接進來,當有客戶端鏈接進來時select將觸發 for sock in rl: # 判斷當前觸發的是否是socket對象, 當觸發的對象是socket對象時,說明有新客戶端accept鏈接進來了 if sock == server: # 接收客戶端的鏈接, 獲取客戶端對象和客戶端地址信息 conn,addr=sock.accept() #把新的客戶端鏈接加入到監聽列表中,當客戶端的鏈接有接收消息的時候,select將被觸發,會知道這個鏈接有動靜,有消息,那麼返回給rl這個返回值列表裏面。 rlist.append(conn) else: # 因爲客戶端鏈接進來時socket接收客戶端鏈接請求,將客戶端鏈接加入到了監聽列表中(rlist),客戶端發送消息的時候這個鏈接將觸發 # 因此判斷是不是客戶端鏈接對象觸發 try: data=sock.recv(1024) #沒有數據的時候,咱們將這個鏈接關閉掉,並從監聽列表中移除 if not data: sock.close() rlist.remove(sock) continue print("received {0} from client {1}".format(data.decode(), sock)) #將接受到的客戶端的消息保存下來 rdata[sock] = data.decode() #將客戶端鏈接對象和這個對象接收到的消息加工成返回消息,並添加到wdata這個字典裏面 wdata[sock]=data.upper() #須要給這個客戶端回覆消息的時候,咱們將這個鏈接添加到wlist寫監聽列表中 wlist.append(sock) #若是這個鏈接出錯了,客戶端暴力斷開了(注意,我尚未接收他的消息,或者接收他的消息的過程當中出錯了) except Exception: #關閉這個鏈接 sock.close() #在監聽列表中將他移除,由於無論什麼緣由,它畢竟是斷開了,不必再監聽它了 rlist.remove(sock) # 若是如今沒有客戶端請求鏈接,也沒有客戶端發送消息時,開始對發送消息列表進行處理,是否須要發送消息 for sock in wl: sock.send(wdata[sock]) wlist.remove(sock) wdata.pop(sock) # #將一次select監聽列表中有接收數據的conn對象所接收到的消息打印一下 # for k,v in rdata.items(): # print(k,'發來的消息是:',v) # #清空接收到的消息 # rdata.clear() --------------------------------------- #客戶端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8093)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close()
4.異步IO操做
用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel操做系統會等待數據(阻塞)準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。