Python併發編程之IO模型

IO模型介紹

本文討論的背景是Linux環境下的network IO,對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,該操做會經歷兩個階段:linux

  • 等待數據準備 (Waiting for the data to be ready)
  • 將數據從內核拷貝到進程中(Copying the data from the kernel to the process)

1、補充:

  • 輸入操做:read、readv、recv、recvfrom、recvmsg共5個函數,若是會阻塞狀態,則會經歷wait data和copy data兩個階段,若是設置爲非阻塞則在wait 不到data時拋出異常
  • 輸出操做:write、writev、send、sendto、sendmsg共5個函數,在發送緩衝區滿了會阻塞在原地,若是設置爲非阻塞,則會拋出異常
  • 接收外來連接:accept,與輸入操做相似
  • 發起外出連接:connect,與輸出操做相似
  • 阻塞型接口:是指系統調用(通常是IO接口)不返回調用結果並讓當前線程一直阻塞,只有當該系統調用得到結果或者超時出錯時才返回。

阻塞IO(blocking IO)

  • blocking IO的特色就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。
  • 會一直block住對應的進程,直到操做完成。
  • 除非特別指定,幾乎全部的IO接口 ( 包括socket接口 ) 都是阻塞型的
    1. 一個簡單的解決方案:
      在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。
    2. 該方案的問題是:
      開啓多進程或都線程的方式,在遇到要同時響應成百上千路的鏈接請求,則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,並且線程與進程自己也更容易進入假死狀態。
    3. 改進方案:
      考慮使用「線程池」或「鏈接池」。「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務。「鏈接池」維持鏈接的緩存池,儘可能重用已有的鏈接、減小建立和關閉鏈接的頻率。這兩種技術均可以很好的下降系統開銷,都被普遍應用不少大型系統,如websphere、tomcat和各類數據庫等。
    4. 改進後方案其實也存在着問題:
      「線程池」和「鏈接池」技術也只是在必定程度上緩解了頻繁調用IO接口帶來的資源佔用。並且,所謂「池」始終有其上限,當請求大大超過上限時,「池」構成的系統對外界的響應並不比沒有池的時候效果好多少。因此使用「池」必須考慮其面臨的響應規模,並根據響應規模調整「池」的大小。
    5. 對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。總之,多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸.

非阻塞IO(non-blocking IO)

  • 在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有。
  • 若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。
  • 用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而用戶就能夠在本次到下次再發起read詢問的時間間隔內作其餘事情,或者直接再次發送read操做。
  • 一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據從kerne拷貝到了用戶內存(這一階段仍然是阻塞的),而後返回。
  • non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。

1、缺點

  • 非阻塞IO模型不被推薦
  • 循環調用recv()將大幅度推高CPU佔用率;這也是咱們在代碼中留一句time.sleep(2)的緣由,不然在低配主機下極容易出現卡機狀況
  • 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。
  • 在這個方案中recv()更多的是起到檢測「操做是否完成」的做用,實際操做系統提供了更爲高效的檢測「操做是否完成「做用的接口,例如select()多路複用模式,能夠一次檢測多個鏈接是否活躍。

2、輪詢

  • 輪詢:非阻塞的recvform系統調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,能夠乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。
  • 輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

# 服務端
import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)

server.setblocking(False)
r_list=[]
w_list={}

while 1:
    try:
        conn,addr=server.accept()
        r_list.append(conn)
    except BlockingIOError:
        # 強調強調強調:!!!非阻塞IO的精髓在於徹底沒有阻塞!!!
        # time.sleep(0.5) # 打開該行註釋純屬爲了方便查看效果
        print('在作其餘的事情')
        print('rlist: ',len(r_list))
        print('wlist: ',len(w_list))


        # 遍歷讀列表,依次取出套接字讀取內容
        del_rlist=[]
        for conn in r_list:
            try:
                data=conn.recv(1024)
                if not data:
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn]=data.upper()
            except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收
                continue
            except ConnectionResetError: # 當前套接字出異常,則關閉,而後加入刪除列表,等待被清除
                conn.close()
                del_rlist.append(conn)


        # 遍歷寫列表,依次取出套接字發送內容
        del_wlist=[]
        for conn,data in w_list.items():
            try:
                conn.send(data)
                del_wlist.append(conn)
            except BlockingIOError:
                continue


        # 清理無用的套接字,無需再監聽它們的IO操做
        for conn in del_rlist:
            r_list.remove(conn)

        for conn in del_wlist:
            w_list.pop(conn)



#客戶端
import socket
import os

client=socket.socket()
client.connect(('127.0.0.1',8083))

while 1:
    res=('%s hello' %os.getpid()).encode('utf-8')
    client.send(res)
    data=client.recv(1024)

    print(data.decode('utf-8'))

非阻塞IO示例
非阻塞IO

多路複用IO(IO multiplexing)

當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。web

  • IO multiplexing也就是select/epoll,也有稱這種IO方式爲事件驅動IO(event driven IO)
  • select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。
  • 基本原理就是select/epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。

這個圖和blocking IO的圖其實並無太大的不一樣,事實上還更差一些。由於這裏須要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。數據庫

1、注意

  • 若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。
  • 在多路複用模型中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
  • 結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接。

2、select監聽fd變化的過程分析:

  • 用戶進程建立socket對象,拷貝監聽的fd到內核空間,每個fd會對應一張系統文件表,內核空間的fd響應到數據後,就會發送信號給用戶進程數據已到;
  • 用戶進程再發送系統調用,好比(accept)將內核空間的數據copy到用戶空間,同時做爲接受數據端內核空間的數據清除,這樣從新監聽時fd再有新的數據又能夠響應到了(發送端由於基於TCP協議因此須要收到應答後纔會清除)。

3、優勢:

  • 相比其餘模型,使用select() 的事件驅動模型只用單線程(進程)執行,佔用資源少,不消耗太多 CPU,同時可以爲多客戶端提供服務。
  • 若是試圖創建一個簡單的事件驅動的服務器程序,這個模型有必定的參考價值。

4、缺點:

  • select()接口並非實現「事件驅動」的最好選擇,由於當須要探測的句柄值較大時,select()接口自己須要消耗大量時間去輪詢各個句柄
  • 不少操做系統提供了更爲高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll...
  • 若是須要實現更高效的服務器程序,相似epoll這樣的接口更被推薦。遺憾的是不一樣的操做系統特供的epoll接口有很大差別,因此使用相似於epoll的接口實現具備較好跨平臺能力的服務器會比較困難。
  • 該模型將事件探測和事件響應夾雜在一塊兒,一旦事件響應的執行體龐大,則對整個模型是災難性的。
#服務端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
server.setblocking(False)
print('starting...')

rlist=[server,]
wlist=[]
wdata={}

while True:
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print(wl)
    for sock in rl:
        if sock == server:
            conn,addr=sock.accept()
            rlist.append(conn)
        else:
            try:
                data=sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                wlist.append(sock)
                wdata[sock]=data.upper()
            except Exception:
                sock.close()
                rlist.remove(sock)

    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

#客戶端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()

select網絡IO模型
select網絡IO模型

5、select,poll,epoll

  • select,poll,epoll三種IO多路複用模型在不一樣的平臺有着不一樣的支持,而epoll在windows下就不支持,好在咱們有selectors模塊,幫咱們默認選擇當前平臺下最合適的
IO複用:複用也就是共用的意思,在通訊領域中爲了充分利用網絡鏈接的物理介質,每每在同一條網絡鏈路上採用時分複用或頻分複用的技術使其在同一鏈路上傳輸多路信號,即公用某個「介質」來儘量多的作同一類(性質)的事,那IO複用的「介質」是什麼呢?爲此咱們首先來看看服務器編程的模型,客戶端發來的請求服務端會產生一個進程來對其進行服務,每當來一個客戶請求就產生一個進程來服務,然而進程不可能無限制的產生,所以爲了解決大量客戶端訪問的問題,引入了IO複用技術,即:一個進程能夠同時對多個客戶請求進行服務。也就是說IO複用的「介質」是進程(準確的說複用的是select和poll,由於進程也是靠調用select和poll來實現的),複用一個進程(select和poll)來對多個IO進行服務,雖然客戶端發來的IO是併發的可是IO所需的讀寫數據多數狀況下是沒有準備好的,所以就能夠利用一個函數(select和poll)來監聽IO所需的這些數據的狀態,一旦IO有數據能夠進行讀寫了,進程就來對這樣的IO進行服務。

  

理解完IO複用後,咱們在來看下實現IO複用中的三個API(select、poll和epoll)的區別和聯繫

select,poll,epoll都是IO多路複用的機制,I/O多路複用就是經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知應用程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。三者的原型以下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);



 1.select的第一個參數nfds爲fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制爲__FD_SETSIZE(1024),位數組的每一位表明其對應的描述符是否須要被檢查。第二三四參數表示須要關注讀、寫、錯誤事件的文件描述符位數組,這些參數既是輸入參數也是輸出參數,可能會被內核修改用於標示哪些描述符上發生了關注的事件,因此每次調用select前都須要從新初始化fdset。timeout參數爲超時時間,該結構會被內核修改,其值爲超時剩餘的時間。

 select的調用步驟以下:

(1)使用copy_from_user從用戶空間拷貝fdset到內核空間

(2)註冊回調函數__pollwait

(3)遍歷全部fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據狀況會調用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll爲例,其核心實現就是__pollwait,也就是上面註冊的回調函數。

(5)__pollwait的主要工做就是把current(當前進程)掛到設備的等待隊列中,不一樣的設備有不一樣的等待隊列,對於tcp_poll 來講,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中並不表明進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數 據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。

(6)poll方法返回時會返回一個描述讀寫操做是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。

(7)若是遍歷完全部的fd,尚未返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。若是超過必定的超時時間(schedule_timeout 指定),仍是沒人喚醒,則調用select的進程會從新被喚醒得到CPU,進而從新遍歷fd,判斷有沒有就緒的fd。

(8)把fd_set從內核空間拷貝到用戶空間。

總結下select的幾大缺點:

(1)每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大

(2)同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大

(3)select支持的文件描述符數量過小了,默認是1024

 

2.  poll與select不一樣,經過一個pollfd數組向內核傳遞須要關注的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用於標示關注的事件和發生的事件,故pollfd數組只須要被初始化一次。

 poll的實現機制與select相似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,而後對pollfd中的每一個描述符進行poll,相比處理fdset來講,poll效率更高。poll返回後,須要對pollfd中的每一個元素檢查其revents值,來得指事件是否發生。

 

3.直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

 

epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麼解決的呢?在此以前,咱們先看一下epoll 和select和poll的調用接口上的不一樣,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函 數,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一個epoll句柄;epoll_ctl是注 冊要監聽的事件類型;epoll_wait則是等待事件的產生。

  對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定 EPOLL_CTL_ADD),會把全部的fd拷貝進內核,而不是在epoll_wait的時候重複拷貝。epoll保證了每一個fd在整個過程當中只會拷貝 一次。

  對於第二個缺點,epoll的解決方案不像select或poll同樣每次都把current輪流加入fd對應的設備等待隊列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)併爲每一個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調 函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是相似的)。

  對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子, 在1GB內存的機器上大約是10萬左右,具體數目能夠cat /proc/sys/fs/file-max察看,通常來講這個數目和系統內存關係很大。

總結:

(1)select,poll實現須要本身不斷輪詢全部fd集合,直到設備就緒,期間可能要睡眠和喚醒屢次交替。而epoll其實也須要調用 epoll_wait不斷輪詢就緒鏈表,期間也可能屢次睡眠和喚醒交替,可是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在 epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,可是select和poll在「醒着」的時候要遍歷整個fd集合,而epoll在「醒着」的 時候只要判斷一下就緒鏈表是否爲空就好了,這節省了大量的CPU時間,這就是回調機制帶來的性能提高。

(2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,而且要把current往設備等待隊列中掛一次,而epoll只要 一次拷貝,並且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這裏的等待隊列並非設備等待隊列,只是一個epoll內 部定義的等待隊列),這也能節省很多的開銷。

select,poll,epoll
select,poll,epoll
#服務端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #設置socket的接口爲非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #至關於網select的讀列表裏append了一個文件句柄server_fileobj,而且綁定了一個回調函數accept

while True:
    events=sel.select() #檢測全部的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
selectors

異步IO(Asynchronous I/O)

  • Linux下的asynchronous IO其實用得很少,從內核2.6版本纔開始引入。
  • 當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。編程

總結

  • 調用blocking IO會一直block住對應的進程直到操做完成,而non-blocking IO在kernel還準備數據的狀況下會馬上返回。
  • non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。
  • asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。
相關文章
相關標籤/搜索