IO多路複用

同步與異步:python

同步就是當一個進程發起一個函數(任務)調用的時候,一直等到函數(任務)完成,而進程繼續處於激活狀態。程序員

異步狀況下是當一個進程發起一個函數(任務)調用的時候,不會等函數返回,而是繼續往下執行,函數返回的時候經過狀態、通知、事件等方式通知進程任務完成。web

 

阻塞與非阻塞:編程

阻塞調用是指調用結果返回以前,當前線程會被掛起(如遇到io操做)。函數只有在獲得結果以後纔會將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不一樣的。
對於同步調用來講,不少時候當前線程仍是激活的,只是從邏輯上當前函數沒有返回而已。windows

#舉例: 數組

#1. 同步調用:apply一個累計1億次的任務,該調用會一直等待,直到任務返回結果爲止,但並未阻塞住(即使是被搶走cpu的執行權限,那也是處於就緒態); 服務器

#2. 阻塞調用:當socket工做在阻塞模式的時候,若是沒有數據的狀況下調用recv函數,則當前線程就會被掛起,直到有數據爲止。網絡

非阻塞和阻塞的概念相對應,指在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線程。多線程

阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能知足的時候就將進程掛起,而非阻塞則不會阻塞當前進程併發

 

IO發生時涉及的對象和步驟。對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,該操做會經歷兩個階段:

#1)等待數據準備 (Waiting for the data to be ready)
#2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)

  

阻塞IO(blocking IO)

 

 當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來講,不少時候數據在一開始尚未到達(好比,尚未收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。

    而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。

 因此,blocking IO的特色就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。

    幾乎全部的程序員第一次接觸到的網絡編程都是從listen()、send()、recv() 等接口開始的,使用這些接口能夠很方便的構建服務器/客戶機的模型。然而大部分的socket接口都是阻塞型的

    對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。總之,多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,能夠用非阻塞接口來嘗試解決這個問題。

 

非阻塞IO(non-blocking IO)

 

從圖中能夠看出,當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而用戶就能夠在本次到下次再發起read詢問的時間間隔內作其餘事情,或者直接再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存(這一階段仍然是阻塞的),而後返回。

    也就是說非阻塞的recvform系統調用調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,能夠乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

    因此,在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有。

可是非阻塞IO模型毫不被推薦。

    咱們不可否則其優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在「」同時「」執行)。

    可是也難掩其缺點:

#1. 循環調用recv()將大幅度推高CPU佔用率;這也是咱們在代碼中留一句time.sleep(2)的緣由,不然在低配主機下極容易出現卡機狀況
#2. 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。

      此外,在這個方案中recv()更多的是起到檢測「操做是否完成」的做用,實際操做系統提供了更爲高效的檢測「操做是否完成「做用的接口,例如select()多路複用模式,能夠一次檢測多個鏈接是否活躍。

# import time
# import socket
# sk = socket.socket()
# sk.bind(('127.0.0.1',9000))
# sk.setblocking(False)   # 設置當前的socket server爲一個非阻塞IO模型
# sk.listen()
# conn_l = []
# del_l = []
# while True:
#     try:
#         conn,addr = sk.accept()
#         conn_l.append(conn)   # [conn1,conn2]
#     except BlockingIOError:
#         for conn in conn_l:   # [conn1,conn2]
#             try:
#                 conn.send(b'hello')
#                 print(conn.recv(1024))
#             except (NameError,BlockingIOError):pass
#             except ConnectionResetError:
#                 conn.close()
#                 del_l.append(conn)
#         for del_conn in del_l:
#             conn_l.remove(del_conn)
#         del_l.clear()

  

 

多路複用IO(IO multiplexing)

  當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
    這個圖和blocking IO的圖其實並無太大的不一樣,事實上還更差一些。由於這裏須要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。

    強調:

    1. 若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。

    2. 在多路複用模型中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

    結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接 

import select  #  模塊
import socket
# 用來操做操做系統中的select(IO多路複用)機制
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.setblocking(False)
sk.listen()

r_lst = [sk,]
print(sk)
while True:
    r_l,_,_ = select.select(r_lst,[],[])  # r_lst = [sk,conn1,conn2,conn3]
    for item in r_l:
        if item is sk:
            conn, addr = sk.accept()
            r_lst.append(conn)
        else:
            try:
                print(item.recv(1024))
                item.send(b'hello')
            except ConnectionResetError:
                item.close()
                r_lst.remove(item)

  

 

selectors模塊

IO複用:爲了解釋這個名詞,首先來理解下複用這個概念,複用也就是共用的意思,這樣理解仍是有些抽象,爲此,我們來理解下複用在通訊領域的使用,在通訊領域中爲了充分利用網絡鏈接的物理介質,每每在同一條網絡鏈路上採用時分複用或頻分複用的技術使其在同一鏈路上傳輸多路信號,到這裏咱們就基本上理解了複用的含義,即公用某個「介質」來儘量多的作同一類(性質)的事,那IO複用的「介質」是什麼呢?爲此咱們首先來看看服務器編程的模型,客戶端發來的請求服務端會產生一個進程來對其進行服務,每當來一個客戶請求就產生一個進程來服務,然而進程不可能無限制的產生,所以爲了解決大量客戶端訪問的問題,引入了IO複用技術,即:一個進程能夠同時對多個客戶請求進行服務。也就是說IO複用的「介質」是進程(準確的說複用的是select和poll,由於進程也是靠調用select和poll來實現的),複用一個進程(select和poll)來對多個IO進行服務,雖然客戶端發來的IO是併發的可是IO所需的讀寫數據多數狀況下是沒有準備好的,所以就能夠利用一個函數(select和poll)來監聽IO所需的這些數據的狀態,一旦IO有數據能夠進行讀寫了,進程就來對這樣的IO進行服務。

  

理解完IO複用後,咱們在來看下實現IO複用中的三個API(select、poll和epoll)的區別和聯繫

select,poll,epoll都是IO多路複用的機制,I/O多路複用就是經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知應用程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。三者的原型以下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);



 1.select的第一個參數nfds爲fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制爲__FD_SETSIZE(1024),位數組的每一位表明其對應的描述符是否須要被檢查。第二三四參數表示須要關注讀、寫、錯誤事件的文件描述符位數組,這些參數既是輸入參數也是輸出參數,可能會被內核修改用於標示哪些描述符上發生了關注的事件,因此每次調用select前都須要從新初始化fdset。timeout參數爲超時時間,該結構會被內核修改,其值爲超時剩餘的時間。

 select的調用步驟以下:

(1)使用copy_from_user從用戶空間拷貝fdset到內核空間

(2)註冊回調函數__pollwait

(3)遍歷全部fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據狀況會調用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll爲例,其核心實現就是__pollwait,也就是上面註冊的回調函數。

(5)__pollwait的主要工做就是把current(當前進程)掛到設備的等待隊列中,不一樣的設備有不一樣的等待隊列,對於tcp_poll 來講,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中並不表明進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數 據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。

(6)poll方法返回時會返回一個描述讀寫操做是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。

(7)若是遍歷完全部的fd,尚未返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。若是超過必定的超時時間(schedule_timeout 指定),仍是沒人喚醒,則調用select的進程會從新被喚醒得到CPU,進而從新遍歷fd,判斷有沒有就緒的fd。

(8)把fd_set從內核空間拷貝到用戶空間。

總結下select的幾大缺點:

(1)每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大

(2)同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大

(3)select支持的文件描述符數量過小了,默認是1024

 

2.  poll與select不一樣,經過一個pollfd數組向內核傳遞須要關注的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用於標示關注的事件和發生的事件,故pollfd數組只須要被初始化一次。

 poll的實現機制與select相似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,而後對pollfd中的每一個描述符進行poll,相比處理fdset來講,poll效率更高。poll返回後,須要對pollfd中的每一個元素檢查其revents值,來得指事件是否發生。

 

3.直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

 

epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麼解決的呢?在此以前,咱們先看一下epoll 和select和poll的調用接口上的不一樣,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函 數,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一個epoll句柄;epoll_ctl是注 冊要監聽的事件類型;epoll_wait則是等待事件的產生。

  對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定 EPOLL_CTL_ADD),會把全部的fd拷貝進內核,而不是在epoll_wait的時候重複拷貝。epoll保證了每一個fd在整個過程當中只會拷貝 一次。

  對於第二個缺點,epoll的解決方案不像select或poll同樣每次都把current輪流加入fd對應的設備等待隊列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)併爲每一個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調 函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是相似的)。

  對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子, 在1GB內存的機器上大約是10萬左右,具體數目能夠cat /proc/sys/fs/file-max察看,通常來講這個數目和系統內存關係很大。

總結:

(1)select,poll實現須要本身不斷輪詢全部fd集合,直到設備就緒,期間可能要睡眠和喚醒屢次交替。而epoll其實也須要調用 epoll_wait不斷輪詢就緒鏈表,期間也可能屢次睡眠和喚醒交替,可是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在 epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,可是select和poll在「醒着」的時候要遍歷整個fd集合,而epoll在「醒着」的 時候只要判斷一下就緒鏈表是否爲空就好了,這節省了大量的CPU時間,這就是回調機制帶來的性能提高。

(2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,而且要把current往設備等待隊列中掛一次,而epoll只要 一次拷貝,並且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這裏的等待隊列並非設備等待隊列,只是一個epoll內 部定義的等待隊列),這也能節省很多的開銷。

select,poll,epoll
select,poll,epoll

這三種IO多路複用模型在不一樣的平臺有着不一樣的支持,而epoll在windows下就不支持,好在咱們有selectors模塊,幫咱們默認選擇當前平臺下最合適的

#服務端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #設置socket的接口爲非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #至關於網select的讀列表裏append了一個文件句柄server_fileobj,而且綁定了一個回調函數accept

while True:
    events=sel.select() #檢測全部的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

基於selectors模塊實現聊天
基於selectors模塊實現聊天
相關文章
相關標籤/搜索