python學習----IO模型

一.IO模型介紹

本文討論的背景是Linux環境下的network IO. 本文最重要的參考文獻是Richard Stevens的"UNIX® Network Programming Volume 1,python

Third Edition: The Sockets Networking ", 6.2節"I/O Models ",Stevens在這節中詳細說明了各類IO的特色和區別.linux

Stevens在文章中一共比較了五種IO Model:程序員

blocking IO 阻塞IOweb

nonblocking IO 非阻塞IO數據庫

IO multiplexing IO多路複用編程

signal driven IO 信號驅動IOwindows

asynchronous IO 異步IO數組

因爲signal driven IO (信號驅動IO) 在實際中並不經常使用, 因此主要介紹其他四種IO Model緩存

再說一下IO發生時涉及的對象和步驟。對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的tomcat

process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,該操做會經歷兩個階段:

#1)等待數據準備 (Waiting for the data to be ready)
#2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)

 補充:

#一、輸入操做:read、readv、recv、recvfrom、recvmsg共5個函數,若是會阻塞狀態,
則會經歷wait data和copy data兩個階段,若是設置爲非阻塞則在wait 不到data時拋出異常
#二、輸出操做:write、writev、send、sendto、sendmsg共5個函數,在發送緩衝區滿了會
阻塞在原地,若是設置爲非阻塞,則會拋出異常
#三、接收外來連接:accept,與輸入操做相似 #四、發起外出連接:connect,與輸出操做相似

 回顧:

同步:提交一個任務以後要等待這個任務執行完畢

異步:只管提交任務,不等待這個任務執行完畢就能夠去作其餘的事情

阻塞:recv、recvfrom、accept,線程階段  運行狀態-->阻塞狀態-->就緒

非阻塞:沒有阻塞狀態

二. 阻塞IO(blocking IO)

在Linux中, 默認狀況下全部的socket都是blocking, 一個典型的讀操做流程大概以下圖:(recvfrom和tcp裏面的recv在這些IO模型裏面是同樣的)

上面的圖形分析:兩個階段的阻塞

  當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來講,不少時候數據在一開始尚未到

達(好比,尚未收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。

  而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。

  因此,blocking IO的特色就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。

  幾乎全部的程序員第一次接觸到的網絡編程都是從listen()、send()、recv() 等接口開始的,使用這些接口能夠很方便的構建服務器/客戶機的模型。然而大部分的socket接口都是阻塞型的。以下圖

  ps:所謂阻塞型接口是指系統調用(通常是IO接口)不返回調用結果並讓當前線程一直阻塞,只有當該系統調用得到結果或者超時出錯時才返回.

實際上,除非特別指定,幾乎全部的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,如在調用recv(1024)的同時,線程將被

阻塞,在此期間,線程將沒法執行任何運算或響應任何的網絡請求。

一個簡單的解決方案:

#在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。

 

該方案的問題是:

#開啓多進程或都線程的方式,在遇到要同時響應成百上千路的鏈接請求,則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,並且線程與進
程自己也更容易進入假死狀態。

 

改進方案:

#不少程序員可能會考慮使用「線程池」或「鏈接池」。「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務。
「鏈接池」維持鏈接的緩存池,儘可能重用已有的鏈接、減小建立和關閉鏈接的頻率。這兩種技術均可以很好的下降系統開銷,都被普遍應用不少大型系統,
如websphere、tomcat和各類數據庫等。

 

改進後方案其實也存在着問題:

#「線程池」和「鏈接池」技術也只是在必定程度上緩解了頻繁調用IO接口帶來的資源佔用。並且,所謂「池」始終有其上限,當請求大大超過上限時,「池」構成的系統對外
界的響應並不比沒有池的時候效果好多少。因此使用「池」必須考慮其面臨的響應規模,並根據響應規模調整「池」的大小。

 

對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。總之,

多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,能夠用非阻塞接口來嘗試解決這個問題。

三. 非阻塞IO(nonblocking IO)

Linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:

  從圖中能夠看出,當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,

它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而用戶就能夠在本次到下

次再發起read詢問的時間間隔內作其餘事情,或者直接再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就

將數據拷貝到了用戶內存(這一階段仍然是阻塞的),而後返回。

  也就是說非阻塞的recvform系統調用調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,可

以乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。輪詢檢查內核數據,直到數據

準備好,再拷貝數據到進程,進行數據處理。須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

 因此,在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有。

# 服務端
import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)

server.setblocking(False) #設置不阻塞
r_list=[]  #用來存儲全部來請求server端的conn鏈接
w_list={}  #用來存儲全部已經有了請求數據的conn的請求數據

while 1:
    try:
        conn,addr=server.accept() #不阻塞,會報錯
        r_list.append(conn)  #爲了將鏈接保存起來,否則下次循環的時候,上一次的鏈接就沒有了
    except BlockingIOError:
        # 強調強調強調:!!!非阻塞IO的精髓在於徹底沒有阻塞!!!
        # time.sleep(0.5) # 打開該行註釋純屬爲了方便查看效果
        print('在作其餘的事情')
        print('rlist: ',len(r_list))
        print('wlist: ',len(w_list))


        # 遍歷讀列表,依次取出套接字讀取內容
        del_rlist=[] #用來存儲刪除的conn鏈接
        for conn in r_list:
            try:
                data=conn.recv(1024) #不阻塞,會報錯
                if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下數據
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn]=data.upper()
            except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收
                continue
            except ConnectionResetError: # 當前套接字出異常,則關閉,而後加入刪除列表,等待被清除
                conn.close()
                del_rlist.append(conn)


        # 遍歷寫列表,依次取出套接字發送內容
        del_wlist=[]
        for conn,data in w_list.items():
            try:
                conn.send(data)
                del_wlist.append(conn)
            except BlockingIOError:
                continue


        # 清理無用的套接字,無需再監聽它們的IO操做
        for conn in del_rlist:
            r_list.remove(conn)
        #del_rlist.clear() #清空列表中保存的已經刪除的內容
        for conn in del_wlist:
            w_list.pop(conn)
        #del_wlist.clear()

#客戶端
import socket
import os
import time
import threading
client=socket.socket()
client.connect(('127.0.0.1',8083))

while 1:
    res=('%s hello' %os.getpid()).encode('utf-8')
    client.send(res)
    data=client.recv(1024)

    print(data.decode('utf-8'))


##多線程的客戶端請求版本
# def func():
#     sk = socket.socket()
#     sk.connect(('127.0.0.1',9000))
#     sk.send(b'hello')
#     time.sleep(1)
#     print(sk.recv(1024))
#     sk.close()
#
# for i in range(20):
#     threading.Thread(target=func).start()
非阻塞IO示例

 

雖然咱們上面的代碼經過設置非阻塞,規避了IO操做,可是非阻塞IO模型毫不被推薦。

nonblocking IO模型的優勢:

可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在「」同時「」執行)。

缺點:

#1. 循環調用recv()將大幅度推高CPU佔用率;這也是咱們在代碼中留一句time.sleep(2)的緣由,不然在低配主機下極容易出現卡機狀況
#2. 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。

  此外,在這個方案中recv()更多的是起到檢測「操做是否完成」的做用,實際操做系統提供了更爲高效的檢測「操做是否完成「做用的接口,例如select()多路複用模式,

能夠一次檢測多個鏈接是否活躍。

四. 多路複用IO(IO multiplexing)

 IO multiplexing這種IO方式有些地方也被稱爲事件驅動IO(event driven IO).

基本原理: select/epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。

它的流程如圖:

解釋圖:

  當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。

這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。

   這個圖和blocking IO的圖其實並無太大的不一樣,事實上還更差一些。由於它不只阻塞了還多須要使用兩個系統調用(select和recvfrom),而blocking IO只調用了

一個系統調用(recvfrom),當只有一個鏈接請求的時候,這個模型還不如阻塞IO效率高。可是,用select的優點在於它能夠同時處理多個connection,而阻塞IO那裏不能,

我無論阻塞不阻塞,你全部的鏈接包括recv等操做,我都幫你監聽着(以什麼形式監聽的呢?先不要考慮,下面會講的~~),其中任何一個有變更(有連接,有數據),

我就告訴你用戶,那麼你就能夠去調用這個數據了,這就是他的NB之處。這個IO多路複用模型機制是操做系統幫咱們提供的,在windows上有這麼個機制叫作select,

那麼若是咱們想經過本身寫代碼來控制這個機制或者本身寫這麼個機制,咱們可使用python中的select模塊來完成上面這一系列代理的行爲。在一切皆文件的unix下,

這些能夠接收數據的對象或者鏈接,都叫作文件描述符fd

強調:

1. 若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll

的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。

2. 在多路複用模型中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這

個函數block,而不是被socket IO給block。

python中的select模塊:

import select

fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])

參數: 可接受四個參數(前三個必須)
    rlist: wait until ready for reading  #等待讀的對象,你須要監聽的須要獲取數據的對象列表
    wlist: wait until ready for writing  #等待寫的對象,你須要寫一些內容的時候,input等等,也就是說我會循環他看看是否有須要發送的消息,
   若是有我取出這個對象的消息併發送出去,通常用不到,這裏咱們也給一個[]。
xlist: wait for an 「exceptional condition」 #等待異常的對象,一些額外的狀況,通常用不到,可是必須傳,那麼咱們就給他一個[]。 timeout: 超時時間 當超時時間 = n(正整數)時,那麼若是監聽的句柄均無任何變化,則select會阻塞n秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。   返回值:三個列表與上面的三個參數列表是對應的   select方法用來監視文件描述符(當文件描述符條件不知足時,select會阻塞),當某個文件描述符狀態改變後,會返回三個列表 1、當參數1 序列中的fd知足「可讀」條件時,則獲取發生變化的fd並添加到fd_r_list中 2、當參數2 序列中含有fd時,則將該序列中全部的fd添加到 fd_w_list中 3、當參數3 序列中的fd發生錯誤時,則將該發生錯誤的fd添加到 fd_e_list中 四、當超時時間爲空,則select會一直阻塞,直到監聽的句柄發生變化

 

結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接  

#服務端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
# 設置爲非阻塞
server.setblocking(False)

# 初始化將服務端socket對象加入監聽列表,後面還要動態添加一些conn鏈接對象,當accept的時候sk就有感應,當recv的時候conn就有動靜
rlist=[server,]
rdata = {}  #存放客戶端發送過來的消息

wlist=[]  #等待寫對象
wdata={}  #存放要返回給客戶端的消息

print('預備!監聽!!!')
count = 0 #寫着計數用的,爲了看實驗效果用的,沒用
while True:
    # 開始 select 監聽,對rlist中的服務端server進行監聽,select函數阻塞進程,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手信號,從而變得可讀,知足select函數的「可讀」條件),被觸發的(有動靜的)套接字(服務器套接字)返回給了rl這個返回值裏面;
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print('%s 次數>>'%(count),wl)
    count = count + 1
    # 對rl進行循環判斷是否有客戶端鏈接進來,當有客戶端鏈接進來時select將觸發
    for sock in rl:
        # 判斷當前觸發的是否是socket對象, 當觸發的對象是socket對象時,說明有新客戶端accept鏈接進來了
        if sock == server:
            # 接收客戶端的鏈接, 獲取客戶端對象和客戶端地址信息
            conn,addr=sock.accept()
            #把新的客戶端鏈接加入到監聽列表中,當客戶端的鏈接有接收消息的時候,select將被觸發,會知道這個鏈接有動靜,有消息,那麼返回給rl這個返回值列表裏面。
            rlist.append(conn)
        else:
            # 因爲客戶端鏈接進來時socket接收客戶端鏈接請求,將客戶端鏈接加入到了監聽列表中(rlist),客戶端發送消息的時候這個鏈接將觸發
            # 因此判斷是不是客戶端鏈接對象觸發
            try:
                data=sock.recv(1024)
                #沒有數據的時候,咱們將這個鏈接關閉掉,並從監聽列表中移除
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                print("received {0} from client {1}".format(data.decode(), sock))
                #將接受到的客戶端的消息保存下來
                rdata[sock] = data.decode()

                #將客戶端鏈接對象和這個對象接收到的消息加工成返回消息,並添加到wdata這個字典裏面
                wdata[sock]=data.upper()
                #須要給這個客戶端回覆消息的時候,咱們將這個鏈接添加到wlist寫監聽列表中
                wlist.append(sock)
            #若是這個鏈接出錯了,客戶端暴力斷開了(注意,我尚未接收他的消息,或者接收他的消息的過程當中出錯了)
            except Exception:
                #關閉這個鏈接
                sock.close()
                #在監聽列表中將他移除,由於無論什麼緣由,它畢竟是斷開了,不必再監聽它了
                rlist.remove(sock)
    # 若是如今沒有客戶端請求鏈接,也沒有客戶端發送消息時,開始對發送消息列表進行處理,是否須要發送消息
    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

    # #將一次select監聽列表中有接收數據的conn對象所接收到的消息打印一下
    # for k,v in rdata.items():
    #     print(k,'發來的消息是:',v)
    # #清空接收到的消息
    # rdata.clear()
代碼示例server端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()
代碼示例client端

 

select監聽fd變化的過程分析:

#用戶進程建立socket對象,拷貝監聽的fd到內核空間,每個fd會對應一張系統文件表,內核空間的fd響應到數據後,就會發送信號給用戶進程數據已到;
#用戶進程再發送系統調用,好比(accept)將內核空間的數據copy到用戶空間,同時做爲接受數據端內核空間的數據清除,這樣從新監聽時fd再有新的數
據又能夠響應到了(發送端由於基於TCP協議因此須要收到應答後纔會清除)。

 

該模型的優勢:

相比其餘模型,使用select()的事件驅動模型只用單線程(進程)執行, 佔用資源少, 不消耗太多CUP, 同時可以爲多客戶端提供服務. 

該模型的缺點:

首先select()接口並非實現"事件驅動"的最好選擇. 由於當須要探測的句柄值較大時, select()接口自己須要消耗大量時間去輪詢

各個句柄. 不少操做系統提供了更爲高效的服務器程序, 相似epoll這樣的接口更被推薦.如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。

若是須要實現更高效的服務器程序,相似epoll這樣的接口更被推薦。遺憾的是不一樣的操做系統特供的epoll接口有很大差別,因此使用相似於epoll的接口實現具

有較好跨平臺能力的服務器會比較困難。其次,該模型將事件探測和事件響應夾雜在一塊兒,一旦事件響應的執行體龐大,則對整個模型是災難性的。

 

select作得事情和第二階段的阻塞沒有關係,就是從內核態將數據拷貝到用戶態的阻塞,始終幫你作得監聽的工做,幫你節省了一些第一階段阻塞的時間。

   IO多路複用的機制:

    select機制: Windows、Linux

    poll機制    : Linux    #和lselect監聽機制同樣,可是對監聽列表裏面的數量沒有限制,select默認限制是1024個,可是他們兩個都是操做系統輪詢每個

    被監聽的文件描述符(若是數量很大,其實效率不太好),看是否有可讀操做。

    epoll機制  : Linux    #它的監聽機制和上面兩個不一樣,他給每個監聽的對象綁定了一個回調函數,你這個對象有消息,那麼觸發回調函數給用戶,用戶

    就進行系統調用來拷貝數據,並非輪詢監聽全部的被監聽對象,這樣的效率高不少。

五. 異步IO(asynchronous IO)

Linux下的asynchronous IO其實用得很少,從內核2.6版本纔開始引入。先看一下它的流程:

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,

因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,

告訴它read操做完成了.

因爲python在copy數據這個階段沒有提供操縱操做系統的接口,因此用python無法實現這套異步IO機制,其餘幾個IO模型都沒有解決

第二階段的阻塞(用戶態和內核態之間copy數據),可是C語言是能夠實現的,由於C語言是最接近底層的.

六. IO模型比較分析

blocking IO 和nonblocking IO的區別:

調用blocking IO會一直block住對應的進程直到操做完成,而non-blocking IO在kernel還準備數據的狀況下會馬上返回。

synchronous IO和asynchronous IO的區別:

A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked; 

二者的區別就在於synchronous IO作」IO operation」的時候會將process阻塞。按照這個定義,四個IO模型能夠分爲兩大類,以前所述的blocking IO,non-blocking IO,

IO multiplexing都屬於synchronous IO這一類,而 asynchronous I/O後一類 。

各個IO model 的比較:

 

 通過上面的介紹,會發現non-blocking IO和asynchronous IO的區別仍是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主

動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做

交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。

七. selectors模塊

IO複用:爲了解釋這個名詞,首先來理解下複用這個概念,複用也就是共用的意思,這樣理解仍是有些抽象,爲此,我們來理解下複用在通訊領域的使用,在通訊領域中爲了充分利用網絡鏈接的物理介質,每每在同一條網絡鏈路上採用時分複用或頻分複用的技術使其在同一鏈路上傳輸多路信號,到這裏咱們就基本上理解了複用的含義,即公用某個「介質」來儘量多的作同一類(性質)的事,那IO複用的「介質」是什麼呢?爲此咱們首先來看看服務器編程的模型,客戶端發來的請求服務端會產生一個進程來對其進行服務,每當來一個客戶請求就產生一個進程來服務,然而進程不可能無限制的產生,所以爲了解決大量客戶端訪問的問題,引入了IO複用技術,即:一個進程能夠同時對多個客戶請求進行服務。也就是說IO複用的「介質」是進程(準確的說複用的是select和poll,由於進程也是靠調用select和poll來實現的),複用一個進程(select和poll)來對多個IO進行服務,雖然客戶端發來的IO是併發的可是IO所需的讀寫數據多數狀況下是沒有準備好的,所以就能夠利用一個函數(select和poll)來監聽IO所需的這些數據的狀態,一旦IO有數據能夠進行讀寫了,進程就來對這樣的IO進行服務。

  

理解完IO複用後,咱們在來看下實現IO複用中的三個API(select、poll和epoll)的區別和聯繫

select,poll,epoll都是IO多路複用的機制,I/O多路複用就是經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知應用程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。三者的原型以下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);



 1.select的第一個參數nfds爲fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制爲__FD_SETSIZE(1024),位數組的每一位表明其對應的描述符是否須要被檢查。第二三四參數表示須要關注讀、寫、錯誤事件的文件描述符位數組,這些參數既是輸入參數也是輸出參數,可能會被內核修改用於標示哪些描述符上發生了關注的事件,因此每次調用select前都須要從新初始化fdset。timeout參數爲超時時間,該結構會被內核修改,其值爲超時剩餘的時間。

 select的調用步驟以下:

(1)使用copy_from_user從用戶空間拷貝fdset到內核空間

(2)註冊回調函數__pollwait

(3)遍歷全部fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據狀況會調用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll爲例,其核心實現就是__pollwait,也就是上面註冊的回調函數。

(5)__pollwait的主要工做就是把current(當前進程)掛到設備的等待隊列中,不一樣的設備有不一樣的等待隊列,對於tcp_poll 來講,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中並不表明進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數 據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。

(6)poll方法返回時會返回一個描述讀寫操做是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。

(7)若是遍歷完全部的fd,尚未返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。若是超過必定的超時時間(schedule_timeout 指定),仍是沒人喚醒,則調用select的進程會從新被喚醒得到CPU,進而從新遍歷fd,判斷有沒有就緒的fd。

(8)把fd_set從內核空間拷貝到用戶空間。

總結下select的幾大缺點:

(1)每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大

(2)同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大

(3)select支持的文件描述符數量過小了,默認是1024

 

2.  poll與select不一樣,經過一個pollfd數組向內核傳遞須要關注的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用於標示關注的事件和發生的事件,故pollfd數組只須要被初始化一次。

 poll的實現機制與select相似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,而後對pollfd中的每一個描述符進行poll,相比處理fdset來講,poll效率更高。poll返回後,須要對pollfd中的每一個元素檢查其revents值,來得指事件是否發生。

 

3.直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

 

epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麼解決的呢?在此以前,咱們先看一下epoll 和select和poll的調用接口上的不一樣,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函 數,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一個epoll句柄;epoll_ctl是注 冊要監聽的事件類型;epoll_wait則是等待事件的產生。

  對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定 EPOLL_CTL_ADD),會把全部的fd拷貝進內核,而不是在epoll_wait的時候重複拷貝。epoll保證了每一個fd在整個過程當中只會拷貝 一次。

  對於第二個缺點,epoll的解決方案不像select或poll同樣每次都把current輪流加入fd對應的設備等待隊列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)併爲每一個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調 函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是相似的)。

  對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子, 在1GB內存的機器上大約是10萬左右,具體數目能夠cat /proc/sys/fs/file-max察看,通常來講這個數目和系統內存關係很大。

總結:

(1)select,poll實現須要本身不斷輪詢全部fd集合,直到設備就緒,期間可能要睡眠和喚醒屢次交替。而epoll其實也須要調用 epoll_wait不斷輪詢就緒鏈表,期間也可能屢次睡眠和喚醒交替,可是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在 epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,可是select和poll在「醒着」的時候要遍歷整個fd集合,而epoll在「醒着」的 時候只要判斷一下就緒鏈表是否爲空就好了,這節省了大量的CPU時間,這就是回調機制帶來的性能提高。

(2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,而且要把current往設備等待隊列中掛一次,而epoll只要 一次拷貝,並且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這裏的等待隊列並非設備等待隊列,只是一個epoll內 部定義的等待隊列),這也能節省很多的開銷。
select,poll,epoll

 這三種IO多路複用模型在不一樣的平臺有着不一樣的支持,而epoll在windows下就不支持,好在咱們有selectors模塊,幫咱們默認選擇當前平臺下最合適的,咱們只須要寫監聽

誰,而後怎麼發送消息接收消息,可是具體怎麼監聽的,選擇的是select仍是poll仍是epoll,這是selector幫咱們自動選擇的。

#服務端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #設置socket的接口爲非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #至關於網select的讀列表裏append了一個文件句柄server_fileobj,而且綁定了一個回調函數accept

while True:
    events=sel.select() #檢測全部的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
selector代碼示例
#!/usr/bin/env python  
import select  
import socket  

response = b''  

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
serversocket.bind(('0.0.0.0', 8080))  
serversocket.listen(1)  
# 由於socket默認是阻塞的,因此須要使用非阻塞(異步)模式。  
serversocket.setblocking(0)  

# 建立一個epoll對象  
epoll = select.epoll()  
# 在服務端socket上面註冊對讀event的關注。一個讀event隨時會觸發服務端socket去接收一個socket鏈接  
epoll.register(serversocket.fileno(), select.EPOLLIN)  

try:  
    # 字典connections映射文件描述符(整數)到其相應的網絡鏈接對象  
    connections = {}  
    requests = {}  
    responses = {}  
    while True:  
        # 查詢epoll對象,看是否有任何關注的event被觸發。參數「1」表示,咱們會等待1秒來看是否有event發生。  
        # 若是有任何咱們感興趣的event發生在此次查詢以前,這個查詢就會帶着這些event的列表當即返回  
        events = epoll.poll(1)  
        # event做爲一個序列(fileno,event code)的元組返回。fileno是文件描述符的代名詞,始終是一個整數。  
        for fileno, event in events:  
            # 若是是服務端產生event,表示有一個新的鏈接進來  
            if fileno == serversocket.fileno():  
                connection, address = serversocket.accept()  
                print('client connected:', address)  
                # 設置新的socket爲非阻塞模式  
                connection.setblocking(0)  
                # 爲新的socket註冊對讀(EPOLLIN)event的關注  
                epoll.register(connection.fileno(), select.EPOLLIN)  
                connections[connection.fileno()] = connection  
                # 初始化接收的數據  
                requests[connection.fileno()] = b''  

            # 若是發生一個讀event,就讀取從客戶端發送過來的新數據  
            elif event & select.EPOLLIN:  
                print("------recvdata---------")  
                # 接收客戶端發送過來的數據  
                requests[fileno] += connections[fileno].recv(1024)  
                # 若是客戶端退出,關閉客戶端鏈接,取消全部的讀和寫監聽  
                if not requests[fileno]:  
                    connections[fileno].close()  
                    # 刪除connections字典中的監聽對象  
                    del connections[fileno]  
                    # 刪除接收數據字典對應的句柄對象  
                    del requests[connections[fileno]]  
                    print(connections, requests)  
                    epoll.modify(fileno, 0)  
                else:  
                    # 一旦完成請求已收到,就註銷對讀event的關注,註冊對寫(EPOLLOUT)event的關注。寫event發生的時候,會回覆數據給客戶端  
                    epoll.modify(fileno, select.EPOLLOUT)  
                    # 打印完整的請求,證實雖然與客戶端的通訊是交錯進行的,但數據能夠做爲一個總體來組裝和處理  
                    print('-' * 40 + '\n' + requests[fileno].decode())  

            # 若是一個寫event在一個客戶端socket上面發生,它會接受新的數據以便發送到客戶端  
            elif event & select.EPOLLOUT:  
                print("-------send data---------")  
                # 每次發送一部分響應數據,直到完整的響應數據都已經發送給操做系統等待傳輸給客戶端  
                byteswritten = connections[fileno].send(requests[fileno])  
                requests[fileno] = requests[fileno][byteswritten:]  
                if len(requests[fileno]) == 0:  
                    # 一旦完整的響應數據發送完成,就再也不關注寫event  
                    epoll.modify(fileno, select.EPOLLIN)  

            # HUP(掛起)event代表客戶端socket已經斷開(即關閉),因此服務端也須要關閉。  
            # 沒有必要註冊對HUP event的關注。在socket上面,它們老是會被epoll對象註冊  
            elif event & select.EPOLLHUP:  
                print("end hup------")  
                # 註銷對此socket鏈接的關注  
                epoll.unregister(fileno)  
                # 關閉socket鏈接  
                connections[fileno].close()  
                del connections[fileno]  
finally:  
    # 打開的socket鏈接不須要關閉,由於Python會在程序結束的時候關閉。這裏顯式關閉是一個好的代碼習慣  
    epoll.unregister(serversocket.fileno())  
    epoll.close()  
    serversocket.close()  

---------------------

本文來自 richard1ybb 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/richard1ybb/article/details/74573200?utm_source=copy 
epoll實現代碼的示例(能夠不用看,做爲了解)
相關文章
相關標籤/搜索