協程,又稱微線程,纖程。英文名Coroutine。html
進程和線程的運行都是搶佔式的python
協程是協做式的(非搶佔式),程序的運行前後順序咱們能夠徹底控制linux
協程本質上就只有一個線程,主要解決IO操做nginx
優勢:web
優勢1: 協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。編程
優勢2: 不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。windows
由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。數組
1 import time
2 import queue 3 4 def consumer(name): 5 print("--->ready to eat baozi...") 6 while True: 7 new_baozi = yield 8 print("[%s] is eating baozi %s" % (name,new_baozi)) 9 #time.sleep(1) 10 11 def producer(): 12 13 r = con.__next__() 14 r = con2.__next__() 15 n = 0 16 while 1: 17 time.sleep(1) 18 print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) 19 con.send(n) 20 con2.send(n+1) 21 22 n +=2 23 24 25 if __name__ == '__main__': 26 con = consumer("c1") 27 con2 = consumer("c2") 28 p = producer()
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator緩存
1 from greenlet import greenlet
2
3
4 def test1(): 5 print(12) 6 gr2.switch() 7 print(34) 8 gr2.switch() 9 10 11 def test2(): 12 print(56) 13 gr1.switch() 14 print(78) 15 16 17 gr1 = greenlet(test1) 18 gr2 = greenlet(test2) 19 gr1.switch()
協程: 碰見IO操做會自動切換進行下一個須要cpu的操做安全
1 import gevent
2
3 import requests,time 4 5 6 start=time.time() 7 8 def f(url): 9 print('GET: %s' % url) 10 resp =requests.get(url) 11 data = resp.text 12 print('%d bytes received from %s.' % (len(data), url)) 13 14 gevent.joinall([ 15 16 gevent.spawn(f, 'https://www.python.org/'), 17 gevent.spawn(f, 'https://www.yahoo.com/'), 18 gevent.spawn(f, 'https://www.baidu.com/'), 19 gevent.spawn(f, 'https://www.sina.com.cn/'), 20 21 ]) 22 23 # f('https://www.python.org/') 24 # 25 # f('https://www.yahoo.com/') 26 # 27 # f('https://baidu.com/') 28 # 29 # f('https://www.sina.com.cn/') 30 31 print("cost time:",time.time()-start)
協程:碰見IO操做會切換,那麼何時切換回來?如何肯定IO操做結束了?
開始--->代碼塊A--->代碼塊B--->代碼塊C--->代碼塊D--->......--->結束
每個代碼塊裏是完成各類各樣事情的代碼,但編程者知道代碼塊A,B,C,D...的執行順序,惟一可以改變這個流程的是數據。輸入不一樣的數據,根據條件語句判斷,流程或許就改成A--->C--->E...--->結束。每一次程序運行順序或許都不一樣,但它的控制流程是由輸入數據和你編寫的程序決定的。若是你知道這個程序當前的運行狀態(包括輸入數據和程序自己),那你就知道接下來甚至一直到結束它的運行流程。
開始--->初始化--->等待
與上面傳統編程模式不一樣,事件驅動程序在啓動以後,就在那等待,等待什麼呢?等待被事件觸發。傳統編程下也有「等待」的時候,好比在代碼塊D中,你定義了一個input(),須要用戶輸入數據。但這與下面的等待不一樣,傳統編程的「等待」,好比input(),你做爲程序編寫者是知道或者強制用戶輸入某個東西的,或許是數字,或許是文件名稱,若是用戶輸入錯誤,你還須要提醒他,並請他從新輸入。事件驅動程序的等待則是徹底不知道,也不強制用戶輸入或者幹什麼。只要某一事件發生,那程序就會作出相應的「反應」。這些事件包括:輸入信息、鼠標、敲擊鍵盤上某個鍵還有系統內部定時器觸發。
若是建立一個線程用來檢測用戶的鼠標點擊事件,缺點:
CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
因此,該方式是很是很差的。
因此利用事件驅動模型解決這個問題,大致思路:
有一個事件(消息)隊列;
鼠標按下時,往這個隊列中增長一個點擊事件(消息);
有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。
用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。
最初的問題:怎麼肯定IO操做完了切回去呢?經過回調函數 ( 注意,事件驅動的監聽事件是由操做系統調用的cpu來完成的)
1.要理解事件驅動和程序,就須要與非事件驅動的程序進行比較。實際上,現代的程序大可能是事件驅動的,好比多線程的程序,確定是事件驅動的。早期則存在許多非事件驅動的程序,這樣的程序,在須要等待某個條件觸發時,會不斷地檢查這個條件,直到條件知足,這是很浪費cpu時間的。而事件驅動的程序,則有機會釋放cpu從而進入睡眠態(注意是有機會,固然程序也可自行決定不釋放cpu),當事件觸發時被操做系統喚醒,這樣就能更加有效地使用cpu. 2.再說什麼是事件驅動的程序。一個典型的事件驅動的程序,就是一個死循環,並以一個線程的形式存在,這個死循環包括兩個部分,第一個部分是按照必定的條件接收並選擇一個要處理的事件,第二個部分就是事件的處理過程。程序的執行過程就是選擇事件和處理事件,而當沒有任何事件觸發時,程序會因查詢事件隊列失敗而進入睡眠狀態,從而釋放cpu。 3.事件驅動的程序,一定會直接或者間接擁有一個事件隊列,用於存儲未能及時處理的事件。 4.事件驅動的程序的行爲,徹底受外部輸入的事件控制,因此,事件驅動的系統中,存在大量這種程序,並以事件做爲主要的通訊方式。 5.事件驅動的程序,還有一個最大的好處,就是能夠按照必定的順序處理隊列中的事件,而這個順序則是由事件的觸發順序決定的,這一特性每每被用於保證某些過程的原子化。 6.目前windows,linux,nucleus,vxworks都是事件驅動的,只有一些單片機多是非事件驅動的。
前面是用協程實現的IO阻塞自動切換,那麼協程又是怎麼實現的,在原理是是怎麼實現的。如何去實現事件驅動的狀況下IO的自動阻塞的切換,這個學名叫什麼呢? => IO多路複用
好比socketserver,多個客戶端鏈接,單線程下實現併發效果,就叫多路複用。
本文討論的背景是Linux環境下的network IO。
如今操做系統都是採用虛擬存儲器,那麼對32位操做系統而言,它的尋址空間(虛擬存儲空間)爲4G(2的32次方)。
操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護的內存空間,也有訪問底層硬件設備的全部權限。
爲了保證用戶進程不能直接操做內核(kernel),保證內核的安全,操心繫統將虛擬空間劃分爲兩部分,一部分爲內核空間,一部分爲用戶空間。
針對linux操做系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱爲內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱爲用戶空間。
爲了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復之前掛起的某個進程的執行。這種行爲被稱爲進程切換,這種切換是由操做系統來完成的。所以能夠說,任何進程都是在操做系統內核的支持下運行的,是與內核緊密相關的。
從一個進程的運行轉到另外一個進程上運行,這個過程當中通過下面這些變化:
保存處理機上下文,包括程序計數器和其餘寄存器。
更新PCB信息。
把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。
選擇另外一個進程執行,並更新其PCB。
更新內存管理的數據結構。
恢復處理機上下文。
注:總而言之就是很耗資源的
正在執行的進程,因爲期待的某些事件未發生,如請求系統資源失敗、等待某種操做的完成、新數據還沒有到達或無新工做作等,則由系統自動執行阻塞原語(Block),使本身由運行狀態變爲阻塞狀態。可見,進程的阻塞是進程自身的一種主動行爲,也所以只有處於運行態的進程(得到CPU),纔可能將其轉爲阻塞狀態。當進程進入阻塞狀態,是不佔用CPU資源的。
文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。
緩存 I/O 又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。用戶空間無法直接訪問內核空間的,內核態到用戶態的數據拷貝
數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的 CPU 以及內存開銷是很是大的。
在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來講,不少時候數據在一開始尚未到達(好比,尚未收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。
因此,blocking IO的特色就是在IO執行的兩個階段都被block了。
####blocking IO from socket import * import select ser = socket(AF_INET, SOCK_STREAM) ser.bind(('192.168.0.106', 8000)) ser.listen(5) #system call while True: con, addr = ser.accept() #return call while 1: msg = con.recv(1024) con.send(msg.upper())
linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:
從圖中能夠看出,當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。
因此,用戶進程實際上是須要不斷的主動詢問kernel數據好了沒有。
注意:
在網絡IO時候,非阻塞IO也會進行recvform系統調用,檢查數據是否準備好,與阻塞IO不同,」非阻塞將大的整片時間的阻塞分紅N多的小的阻塞, 因此進程不斷地有機會 ‘被’ CPU光顧」。即每次recvform系統調用之間,cpu的權限還在進程手中,這段時間是能夠作其餘事情的,
也就是說非阻塞的recvform系統調用調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,能夠乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。
1 ########################################################nonblocking IO 2 import time 3 import socket 4 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 5 sk.bind(('192.168.0.106', 8000)) 6 sk.listen(5) 7 sk.setblocking(False) #設置爲sk爲非阻塞 8 print ('waiting client connection .......') 9 while True: 10 try: 11 connection,address = sk.accept() # 進程主動輪詢,不斷進行system call 12 print("address",address) 13 client_messge = connection.recv(1024) 14 print(str(client_messge,'utf8')) 15 connection.close() 16 except Exception as e: 17 print (e) 18 time.sleep(3)
IO multiplexing這個詞可能有點陌生,可是若是我說select,epoll,大概就都能明白了。有些地方也稱這種IO方式爲event driven IO。咱們都知道,select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:
當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。(多說一句。因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)
在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
注意1:select函數返回結果中若是有文件可讀了,那麼進程就能夠經過調用accept()或recv()來讓kernel將位於內核中準備到的數據copy到用戶區。
注意2: select的優點在於能夠處理多個鏈接,不適用於單個鏈接
1 ###### IO多路複用 2 from socket import * 3 import select, time 4 5 ser = socket(AF_INET, SOCK_STREAM) 6 ser.bind(('192.168.0.106', 8000)) 7 ser.listen(5) 8 9 inp = [ser,] 10 while True: 11 r, w, e = select.select(inp, [], [], 4) #監聽inp中的元素(能夠選擇是否認時),直至數據從內核態copy到用戶態 12 print(r) 13 for i in r: 14 print() 15 con, addr = i.accept() #若不accept,則select會一直觸發 16 print(con) 17 print(addr)i 18 inp.append(con) 19 print('over')
1 ###### IO多路複用升級 2 from socket import * 3 import select, time 4 5 ser = socket(AF_INET, SOCK_STREAM) 6 ser.bind(('192.168.0.106', 8000)) 7 ser.listen(5) 8 9 inp = [ser,] 10 while True: 11 r, w, e = select.select(inp, [], [], ) 12 for i in r: 13 if i == ser: 14 con, addr = i.accept() 15 print(con) 16 print(addr) 17 inp.append(con) 18 else: 19 try: 20 msg = i.recv(1024) 21 i.send(msg.upper()) 22 except Exception as e: 23 print(e) 24 inp.remove(i) #再也不監聽 25 print('select over at-->', time.ctime())
IO多路複用之selector
1 from socket import * 2 import selectors 3 4 sel = selectors.DefaultSelector() 5 6 ser = socket(AF_INET, SOCK_STREAM) 7 ser.bind(('192.168.0.106', 8000)) 8 ser.listen(5) 9 10 def accept(ser, mask): 11 con, addr = ser.accept() 12 print(ser, mask) 13 con.setblocking(False) 14 sel.register(con, selectors.EVENT_READ, read) 15 16 def read(con, mask): 17 try: 18 msg = con.recv(1024) 19 con.send(msg.upper()) 20 except Exception as e: 21 print('will close-->', con) 22 sel.unregister(con) #再也不監聽con 23 con.close() 24 25 26 sel.register(ser, selectors.EVENT_READ, accept) #監聽ser並將accept綁定 27 28 while True: 29 events = sel.select() #開始監聽ser 30 for key, mask in events: 31 callback = key.data 32 callback(key.fileobj, mask)
用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
全程無阻塞-->異步Io,不然就是同步(以前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO同步IO)
定義中所指的」IO operation」是指真實的IO操做,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。而asynchronous IO則不同,當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。
poll
它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。
通常也不用它,至關於過渡階段
epoll
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll。被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。windows不支持
沒有最大文件描述符數量的限制。
好比100個鏈接,有兩個活躍了,epoll會告訴用戶這兩個兩個活躍了,直接取就ok了,而select是循環一遍。
(瞭解)epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
因此市面上上見到的所謂的異步IO,好比nginx、Tornado、等,咱們叫它異步IO,其實是IO多路複用。
# 首先咱們來定義流的概念,一個流能夠是文件,socket,pipe等等能夠進行I/O操做的內核對象。 # 無論是文件,仍是套接字,仍是管道,咱們均可以把他們看做流。 # 以後咱們來討論I/O的操做,經過read,咱們能夠從流中讀入數據;經過write,咱們能夠往流寫入數據。如今假 # 定一個情形,咱們須要從流中讀數據,可是流中尚未數據,(典型的例子爲,客戶端要從socket讀如數據,可是 # 服務器尚未把數據傳回來),這時候該怎麼辦? # 阻塞。阻塞是個什麼概念呢?好比某個時候你在等快遞,可是你不知道快遞何時過來,並且你沒有別的事能夠幹 # (或者說接下來的事要等快遞來了才能作);那麼你能夠去睡覺了,由於你知道快遞把貨送來時必定會給你打個電話 # (假定必定能叫醒你)。 # 非阻塞忙輪詢。接着上面等快遞的例子,若是用忙輪詢的方法,那麼你須要知道快遞員的手機號,而後每分鐘給他掛 # 個電話:「你到了沒?」 # 很明顯通常人不會用第二種作法,不只顯很無腦,浪費話費不說,還佔用了快遞員大量的時間。 # 大部分程序也不會用第二種作法,由於第一種方法經濟而簡單,經濟是指消耗不多的CPU時間,若是線程睡眠了, # 就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片了。 # # 爲了瞭解阻塞是如何進行的,咱們來討論緩衝區,以及內核緩衝區,最終把I/O事件解釋清楚。緩衝區的引入是爲 # 了減小頻繁I/O操做而引發頻繁的系統調用(你知道它很慢的),當你操做一個流時,更多的是以緩衝區爲單位進 # 行操做,這是相對於用戶空間而言。對於內核來講,也須要緩衝區。 # 假設有一個管道,進程A爲管道的寫入方,B爲管道的讀出方。 # 假設一開始內核緩衝區是空的,B做爲讀出方,被阻塞着。而後首先A往管道寫入,這時候內核緩衝區由空的狀態變 # 到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之爲「緩衝區非空」。 # 可是「緩衝區非空」事件通知B後,B卻尚未讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫 # 入的數據會滯留在內核緩衝區中,若是內核也緩衝區滿了,B仍未開始讀數據,最終內核緩衝區會被填滿,這個時候 # 會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,咱們把這個事件定義爲「緩衝區滿」。 # 假設後來B終於開始讀數據了,因而內核的緩衝區空了出來,這時候內核會告訴A,內核緩衝區有空位了,你能夠從 # 長眠中醒來了,繼續寫數據了,咱們把這個事件叫作「緩衝區非滿」 # 也許事件Y1已經通知了A,可是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩衝區空了。這個時候內核就告 # 訴B,你須要阻塞了!,咱們把這個時間定爲「緩衝區空」。 # 這四個情形涵蓋了四個I/O事件,緩衝區滿,緩衝區空,緩衝區非空,緩衝區非滿(注都是說的內核緩衝區,且這四 # 個術語都是我生造的,僅爲解釋其原理而造)。這四個I/O事件是進行阻塞同步的根本。(若是不能理解「同步」是 # 什麼概念,請學習操做系統的鎖,信號量,條件變量等任務同步方面的相關知識)。 # # 而後咱們來講說阻塞I/O的缺點。可是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。若是想要同時處理多 # 個流,要麼多進程(fork),要麼多線程(pthread_create),很不幸這兩種方法效率都不高。 # 因而再來考慮非阻塞忙輪詢的I/O方式,咱們發現咱們能夠同時處理多個流了(把一個流從阻塞模式切換到非阻塞 # 模式再此不予討論): # while true { # for i in stream[]; { # if i has data # read until unavailable # } # } # 咱們只要不停的把全部流從頭至尾問一遍,又從頭開始。這樣就能夠處理多個流了,但這樣的作法顯然很差,由於 # 若是全部的流都沒有數據,那麼只會白白浪費CPU。這裏要補充一點,阻塞模式下,內核對於I/O事件的處理是阻 # 塞或者喚醒,而非阻塞模式下則把I/O事件交給其餘對象(後文介紹的select以及epoll)處理甚至直接忽略。 # # 爲了不CPU空轉,能夠引進了一個代理(一開始有一位叫作select的代理,後來又有一位叫作poll的代理,不 # 過二者的本質是同樣的)。這個代理比較厲害,能夠同時觀察許多流的I/O事件,在空閒的時候,會把當前線程阻 # 塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,因而咱們的程序就會輪詢一遍全部的流(因而咱們可 # 以把「忙」字去掉了)。代碼長這樣: # while true { # select(streams[]) # for i in streams[] { # if i has data # read until unavailable # } # } # 因而,若是沒有I/O事件產生,咱們的程序就會阻塞在select處。可是依然有個問題,咱們從select那裏僅僅知 # 道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至所有),咱們只能無差異輪詢全部流, # 找出能讀出數據,或者寫入數據的流,對他們進行操做。 # 可是使用select,咱們有O(n)的無差異輪詢複雜度,同時處理的流越多,每一次無差異輪詢時間就越長。再次 # 說了這麼多,終於能好好解釋epoll了 # epoll能夠理解爲event poll,不一樣於忙輪詢和無差異輪詢,epoll之會把哪一個流發生了怎樣的I/O事件通知我 # 們。此時咱們對這些流的操做都是有意義的。 # 在討論epoll的實現細節以前,先把epoll的相關操做列出: # epoll_create 建立一個epoll對象,通常epollfd = epoll_create() # epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增長/刪除某一個流的某一個事件 # 好比 # epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//有緩衝區內有數據時epoll_wait返回 # epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//緩衝區可寫入時epoll_wait返回 # epoll_wait(epollfd,...)等待直到註冊的事件發生 # (注:當對一個非阻塞流的讀寫發生緩衝區滿或緩衝區空,write/read會返回-1,並設置errno=EAGAIN。 # 而epoll只關心緩衝區非滿和緩衝區非空事件)。 # 一個epoll模式的代碼大概的樣子是: # while true { # active_stream[] = epoll_wait(epollfd) # for i in active_stream[] { # read or write till unavailable # } # } # 舉個例子: # select: # 班裏三十個同窗在考試,誰先作完想交卷都要經過按鈕來活動,他按按鈕做爲老師的我桌子上的燈就會變紅. # 一旦燈變紅,我(select)我就能夠知道有人交卷了,可是我並不知道誰交的,因此,我必須跟個傻子似的輪詢 # 地去問:嘿,是你要交卷嗎?而後我就能夠以這種效率極低地方式找到要交卷的學生,而後把它的卷子收上來. # # # epoll: # 此次再有人按按鈕,我這不光燈會亮,上面還會顯示要交卷學生的名字.這樣我就能夠直接去對應學生那收卷就 # 好了.固然,同時能夠有多人交卷.
# 在linux的IO多路複用中有水平觸發,邊緣觸發兩種模式,這兩種模式的區別以下: # # 水平觸發:若是文件描述符已經就緒能夠非阻塞的執行IO操做了,此時會觸發通知.容許在任意時刻重複檢測IO的狀態, # 沒有必要每次描述符就緒後儘量多的執行IO.select,poll就屬於水平觸發. # # 邊緣觸發:若是文件描述符自上次狀態改變後有新的IO活動到來,此時會觸發通知.在收到一個IO事件通知後要儘量 # 多的執行IO操做,由於若是在一次通知中沒有執行完IO那麼就須要等到下一次新的IO活動到來才能獲取到就緒的描述 # 符.信號驅動式IO就屬於邊緣觸發. # # epoll既能夠採用水平觸發,也能夠採用邊緣觸發. # # 你們可能還不能徹底瞭解這兩種模式的區別,咱們能夠舉例說明:一個管道收到了1kb的數據,epoll會當即返回,此時 # 讀了512字節數據,而後再次調用epoll.這時若是是水平觸發的,epoll會當即返回,由於有數據準備好了.若是是邊 # 緣觸發的不會當即返回,由於此時雖然有數據可讀可是已經觸發了一次通知,在此次通知到如今尚未新的數據到來, # 直到有新的數據到來epoll纔會返回,此時老的數據和新的數據均可以讀取到(固然是須要此次你儘量的多讀取). # 下面咱們還從電子的角度來解釋一下: # # 水平觸發:也就是隻有高電平(1)或低電平(0)時才觸發通知,只要在這兩種狀態就能獲得通知.上面提到的只要 # 有數據可讀(描述符就緒)那麼水平觸發的epoll就當即返回. # # 邊緣觸發:只有電平發生變化(高電平到低電平,或者低電平到高電平)的時候才觸發通知.上面提到即便有數據 # 可讀,可是沒有新的IO活動到來,epoll也不會當即返回.
1 import time 2 import socket 3 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 4 sk.setsockopt 5 sk.bind(('127.0.0.1',6667)) 6 sk.listen(5) 7 sk.setblocking(False) 8 while True: 9 try: 10 print ('waiting client connection .......') 11 connection,address = sk.accept() # 進程主動輪詢 12 print("+++",address) 13 client_messge = connection.recv(1024) 14 print(str(client_messge,'utf8')) 15 connection.close() 16 except Exception as e: 17 print (e) 18 time.sleep(4) 19 20 #############################client 21 22 import time 23 import socket 24 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 25 26 while True: 27 sk.connect(('127.0.0.1',6667)) 28 print("hello") 29 sk.sendall(bytes("hello","utf8")) 30 time.sleep(2) 31 break
優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在同時執行)。
缺點:任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。
在非阻塞實例中,輪詢的主語是進程,而「後臺」 可能有多個任務在同時進行,人們就想到了循環查詢多個任務的完成狀態,只要有任何一個任務完成,就去處理它。不過,這個監聽的重任經過調用select等函數交給了內核去作。IO多路複用有兩個特別的系統調用select、poll、epoll函數。select調用是內核級別的,select輪詢相對非阻塞的輪詢的區別在於—前者能夠等待多個socket,能實現同時對多個IO端口進行監聽,當其中任何一個socket的數據準好了,就能返回進行可讀,而後進程再進行recvfrom系統調用,將數據由內核拷貝到用戶進程,固然這個過程是阻塞的。
爲何不調用accept,會反覆print?
由於select是水平觸發
import socket import select sk=socket.socket() sk.bind(("127.0.0.1",9904)) sk.listen(5) while True: r,w,e=select.select([sk,],[],[],5) for i in r: # conn,add=i.accept() #print(conn) print("hello") print('>>>>>>') #*************************client.py import socket sk=socket.socket() sk.connect(("127.0.0.1",9904)) while 1: inp=input(">>").strip() sk.send(inp.encode("utf8")) data=sk.recv(1024) print(data.decode("utf8"))
1 #***********************server.py 2 import socket 3 import select 4 sk=socket.socket() 5 sk.bind(("127.0.0.1",8801)) 6 sk.listen(5) 7 inputs=[sk,] 8 while True: 9 r,w,e=select.select(inputs,[],[],5) 10 print(len(r)) 11 12 for obj in r: 13 if obj==sk: 14 conn,add=obj.accept() 15 print(conn) 16 inputs.append(conn) 17 else: 18 data_byte=obj.recv(1024) 19 print(str(data_byte,'utf8')) 20 inp=input('回答%s號客戶>>>'%inputs.index(obj)) 21 obj.sendall(bytes(inp,'utf8')) 22 23 print('>>',r) 24 25 #***********************client.py 26 27 import socket 28 sk=socket.socket() 29 sk.connect(('127.0.0.1',8801)) 30 31 while True: 32 inp=input(">>>>") 33 sk.sendall(bytes(inp,"utf8")) 34 data=sk.recv(1024) 35 print(str(data,'utf8'))
文件描述符其實就是我們平時說的句柄,只不過文件描述符是linux中的概念。注意,咱們的accept或recv調用時即向系統發出recvfrom請求
(1) 若是內核緩衝區沒有數據--->等待--->數據到了內核緩衝區,轉到用戶進程緩衝區;
(2) 若是先用select監聽到某個文件描述符對應的內核緩衝區有了數據,當咱們再調用accept或recv時,直接將數據轉到用戶緩衝區。
如何在某一個client端退出後,不影響server端和其它客戶端正常交流
linux
if not data_byte: inputs.remove(obj) continue
win
try: data_byte=obj.recv(1024) print(str(data_byte,'utf8')) inp=input('回答%s號客戶>>>'%inputs.index(obj)) obj.sendall(bytes(inp,'utf8')) except Exception: inputs.remove(obj)
1 #_*_coding:utf-8_*_ 2 __author__ = 'Alex Li' 3 4 import select 5 import socket 6 import sys 7 import queue 8 9 # Create a TCP/IP socket 10 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 11 server.setblocking(False) 12 13 # Bind the socket to the port 14 server_address = ('localhost', 10000) 15 print(sys.stderr, 'starting up on %s port %s' % server_address) 16 server.bind(server_address) 17 18 # Listen for incoming connections 19 server.listen(5) 20 21 # Sockets from which we expect to read 22 inputs = [ server ] 23 24 # Sockets to which we expect to write 25 outputs = [ ] 26 27 message_queues = {} 28 while inputs: 29 30 # Wait for at least one of the sockets to be ready for processing 31 print( '\nwaiting for the next event') 32 readable, writable, exceptional = select.select(inputs, outputs, inputs) 33 # Handle inputs 34 for s in readable: 35 36 if s is server: 37 # A "readable" server socket is ready to accept a connection 38 connection, client_address = s.accept() 39 print('new connection from', client_address) 40 connection.setblocking(False) 41 inputs.append(connection) 42 43 # Give the connection a queue for data we want to send 44 message_queues[connection] = queue.Queue() 45 else: 46 data = s.recv(1024) 47 if data: 48 # A readable client socket has data 49 print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) ) 50 message_queues[s].put(data) 51 # Add output channel for response 52 if s not in outputs: 53 outputs.append(s) 54 else: 55 # Interpret empty result as closed connection 56 print('closing', client_address, 'after reading no data') 57 # Stop listening for input on the connection 58 if s in outputs: 59 outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉 60 inputs.remove(s) #inputs中也刪除掉 61 s.close() #把這個鏈接關閉掉 62 63 # Remove message queue 64 del message_queues[s] 65 # Handle outputs 66 for s in writable: 67 try: 68 next_msg = message_queues[s].get_nowait() 69 except queue.Empty: 70 # No messages waiting so stop checking for writability. 71 print('output queue for', s.getpeername(), 'is empty') 72 outputs.remove(s) 73 else: 74 print( 'sending "%s" to %s' % (next_msg, s.getpeername())) 75 s.send(next_msg) 76 # Handle "exceptional conditions" 77 for s in exceptional: 78 print('handling exceptional condition for', s.getpeername() ) 79 # Stop listening for input on the connection 80 inputs.remove(s) 81 if s in outputs: 82 outputs.remove(s) 83 s.close() 84 85 # Remove message queue 86 del message_queues[s]
1 # select 模擬一個socket server,注意socket必須在非阻塞狀況下才能實現IO多路複用。 2 # 接下來經過例子瞭解select 是如何經過單進程實現同時處理多個非阻塞的socket鏈接的。 3 #server端 4 5 6 import select 7 import socket 8 import queue 9 10 server = socket.socket() 11 server.bind(('localhost',9000)) 12 server.listen(1000) 13 14 server.setblocking(False) # 設置成非阻塞模式,accept和recv都非阻塞 15 # 這裏若是直接 server.accept() ,若是沒有鏈接會報錯,因此有數據才調他們 16 # BlockIOError:[WinError 10035] 沒法當即完成一個非阻塞性套接字操做。 17 msg_dic = {} 18 inputs = [server,] # 交給內核、select檢測的列表。 19 # 必須有一個值,讓select檢測,不然報錯提供無效參數。 20 # 沒有其餘鏈接以前,本身就是個socket,本身就是個鏈接,檢測本身。活動了說明有連接 21 outputs = [] # 你往裏面放什麼,下一次就出來了 22 23 while True: 24 readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 定義檢測 25 #新來鏈接 檢測列表 異常(斷開) 26 # 異常的也是inputs是: 檢測那些鏈接的存在異常 27 print(readable,writeable,exceptional) 28 for r in readable: 29 if r is server: # 有數據,表明來了一個新鏈接 30 conn, addr = server.accept() 31 print("來了個新鏈接",addr) 32 inputs.append(conn) # 把鏈接加到檢測列表裏,若是這個鏈接活動了,就說明數據來了 33 # inputs = [server.conn] # 【conn】只返回活動的鏈接,但怎麼肯定是誰活動了 34 # 若是server活動,則來了新鏈接,conn活動則來數據 35 msg_dic[conn] = queue.Queue() # 初始化一個隊列,後面存要返回給這個客戶端的數據 36 else: 37 try : 38 data = r.recv(1024) # 注意這裏是r,而不是conn,多個鏈接的狀況 39 print("收到數據",data) 40 # r.send(data) # 不能直接發,若是客戶端不收,數據就沒了 41 msg_dic[r].put(data) # 往裏面放數據 42 outputs.append(r) # 放入返回的鏈接隊列裏 43 except ConnectionResetError as e: 44 print("客戶端斷開了",r) 45 if r in outputs: 46 outputs.remove(r) #清理已斷開的鏈接 47 inputs.remove(r) #清理已斷開的鏈接 48 del msg_dic[r] ##清理已斷開的鏈接 49 50 for w in writeable: # 要返回給客戶端的鏈接列表 51 data_to_client = msg_dic[w].get() # 在字典裏取數據 52 w.send(data_to_client) # 返回給客戶端 53 outputs.remove(w) # 刪除這個數據,確保下次循環的時候不返回這個已經處理完的鏈接了。 54 55 for e in exceptional: # 若是鏈接斷開,刪除鏈接相關數據 56 if e in outputs: 57 outputs.remove(e) 58 inputs.remove(e) 59 del msg_dic[e] 60 61 62 #*************************client 63 import socket 64 client = socket.socket() 65 66 client.connect(('localhost', 9000)) 67 68 while True: 69 cmd = input('>>> ').strip() 70 if len(cmd) == 0 : continue 71 client.send(cmd.encode('utf-8')) 72 data = client.recv(1024) 73 print(data.decode()) 74 75 client.close()
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)
只須要發送一次系統調用,並等待系統return OK指令
在等待的過程當中不能作其餘任務,cpu沒有利用上
####blocking IO from socket import * import select ser = socket(AF_INET, SOCK_STREAM) ser.bind(('192.168.0.106', 8000)) ser.listen(5) #system call while True: con, addr = ser.accept() #return call while 1: msg = con.recv(1024) con.send(msg.upper())
定時發送系統調用,當系統return OK時再操做
兩次發送期間能夠利用cpu作其餘任務
缺點:系統調用過多;不能及時處理數據
1 ########################################################nonblocking IO 2 import time 3 import socket 4 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 5 sk.bind(('192.168.0.106', 8000)) 6 sk.listen(5) 7 sk.setblocking(False) #設置爲sk爲非阻塞 8 print ('waiting client connection .......') 9 while True: 10 try: 11 connection,address = sk.accept() # 進程主動輪詢,不斷進行system call 12 print("address",address) 13 client_messge = connection.recv(1024) 14 print(str(client_messge,'utf8')) 15 connection.close() 16 except Exception as e: 17 print (e) 18 time.sleep(3)
兩次系統調用
優點:能夠監聽多個連接;能夠利用單線程下實現併發
實現方法:select 或 poll 或eopll
1 ###### IO多路複用 2 from socket import * 3 import select, time 4 5 ser = socket(AF_INET, SOCK_STREAM) 6 ser.bind(('192.168.0.106', 8000)) 7 ser.listen(5) 8 9 inp = [ser,] 10 while True: 11 r, w, e = select.select(inp, [], [], 4) #監聽inp中的元素(能夠選擇是否認時),直至數據從內核態copy到用戶態 12 print(r) 13 for i in r: 14 print() 15 con, addr = i.accept() #若不accept,則select會一直觸發 16 print(con) 17 print(addr)i 18 inp.append(con) 19 print('over')
1 ###### IO多路複用升級 2 from socket import * 3 import select, time 4 5 ser = socket(AF_INET, SOCK_STREAM) 6 ser.bind(('192.168.0.106', 8000)) 7 ser.listen(5) 8 9 inp = [ser,] 10 while True: 11 r, w, e = select.select(inp, [], [], ) 12 for i in r: 13 if i == ser: 14 con, addr = i.accept() 15 print(con) 16 print(addr) 17 inp.append(con) 18 else: 19 try: 20 msg = i.recv(1024) 21 i.send(msg.upper()) 22 except Exception as e: 23 print(e) 24 inp.remove(i) #再也不監聽 25 print('select over at-->', time.ctime())
selector
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print('accepted', conn, 'from', addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 12 def read(conn, mask): 13 try: 14 data = conn.recv(1000) # Should be ready 15 if not data: 16 raise Exception 17 print('echoing', repr(data), 'to', conn) 18 conn.send(data) # Hope it won't block 19 except Exception as e: 20 print('closing', conn) 21 sel.unregister(conn) 22 conn.close() 23 24 sock = socket.socket() 25 sock.bind(('localhost', 8090)) 26 sock.listen(100) 27 sock.setblocking(False) 28 29 sel.register(sock, selectors.EVENT_READ, accept) 30 print("server.....") 31 32 while True: 33 events = sel.select()#[sock,,conn2] 34 for key, mask in events: 35 callback = key.data 36 callback(key.fileobj, mask) 37 38 import socket 39 import select 40 sk=socket.socket() 41 sk.bind(("127.0.0.1",9904)) 42 sk.listen(5) 43 44 while True: 45 r,w,e=select.select([sk,sk],[],[],5) 46 r=[sk,] 47 for i in r: 48 conn,add=i.accept() 49 print(conn) 50 print("hello") 51 52 print('>>>>>>')
全程無阻塞-->異步Io,不然就是同步(上面三種都是同步IO)
https://www.cnblogs.com/yuanchenqi/articles/6248025.html
https://www.cnblogs.com/linhaifeng/articles/6817679.html#_label1