【python】-- IO多路複用(select、poll、epoll)介紹及實現

IO多路複用(select、poll、epoll)介紹及select、epoll的實現

IO多路複用中包括 select、pool、epoll,這些都屬於同步,還不屬於異步python

1、IO多路複用介紹

一、selectwindows

select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。數組

  select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。網絡

  select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。數據結構

  另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。app

二、poll異步

poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。socket

  poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。函數

  另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。oop

三、epoll

直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。

  epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。

  epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

  另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

四、sellect、poll、epoll三者的區別

 

2、select IO多路複用

Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢。select的一 個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制,可是這樣會形成效率的下降

一、select語法:

select(rlist, wlist, xlist, timeout=None)

select()方法接收並監控3個通訊列表, 第一個rlist監控全部要進來的輸入數據,第二個wlist是監控全部要發出去的輸出數據,第三個監控異常錯誤數據,第四個設置指定等待時間,若是想當即返回,設爲null便可,最後須要建立2個列表來包含輸入和輸出信息來傳給select(),讓select方法經過內核去監控,而後生成三個實例。

#創建兩個列表,好比想讓內核去檢測50個鏈接,須要傳給它一個列表,就是這個inputs(列表中裏面存放的是須要被內核監控的連接),而後交給select,就至關於交給內核了
inputs = [server,] #輸入列表,監控全部輸入數據

outputs = []  #輸入列表,監控全部輸出數據

#把兩個列表傳給select方法經過內核去監控,生成三個實例
readable,writeable,exceptional = select.select(inputs,outputs,inputs)  # 這裏select方法的第三個參數一樣傳入input列表是由於,input列表中存放着全部的連接,好比以前放入的50被監控連接中有5個斷了,出現了異常,就會輸入到exceptional裏面,但這5連接自己是放在inputs列表中

二、select服務端代碼實例:

import select,socket,queue
server = socket.socket()
server.bind(("localhost",9000))
server.listen(1000)
server.setblocking(False) #設置爲非阻塞
msg_dic = dict() #定義一個隊列字典
inputs = [server,]  #因爲設置成非阻塞模式,accept和recive都不阻塞了,沒有值就會報錯,所以最開始須要最開始須要監控服務端自己,等待客戶端鏈接
outputs = [] 
while True:
    #exceptional表示若是inputs列表中出現異常,會輸出到這個exceptional中
    readable,writeable,exceptional = select.select(inputs,outputs,inputs)#若是沒有任何客戶端鏈接,就會阻塞在這裏 
    for r in readable:# 沒有個r表明一個socket連接
        if r is server:  #若是這個socket是server的話,就說明是是新客戶端鏈接了
            conn,addr = r.accept() #新鏈接進來了,接受這個鏈接,生成這個客戶端實例
            print("來了一個新鏈接",addr)
            inputs.append(conn)#爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
            #就會被交給select去監聽
            msg_dic[conn] = queue.Queue() #初始化一個隊列,後面存要返回給這個客戶端的數據
        else: #若是不是server,就說明是以前創建的客戶端來數據了
            data = r.recv(1024)
            print("收到數據:",data)
            msg_dic[r].put(data)#收到的數據先放到queue裏,一會返回給客戶端
            outputs.append(r)#爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端
            # r.send(data)
            # print("send done....")
    for w in writeable:  #要返回給客戶端的連接列表
        data_to_client = msg_dic[w].get()
        w.send(data_to_client)   #返回給客戶端的源數據
        outputs.remove(w)  #確保下次循環的時候writeable,不返回這個已經處理完的這個鏈接了
 
    for e in exceptional:  #處理異常的鏈接
        if e in outputs:   #由於e不必定在outputs,因此先要判斷
            outputs.remove(e)
        inputs.remove(e)   #刪除inputs中異常鏈接
        del msg_dic[e]   #刪除此鏈接對應的隊列

 

3、epoll IO多路複用

epoll的方式,這種效率更高,可是這種方式在Windows下不支持,在Linux是支持的,selectors模塊就是默認使用就是epoll,可是若是在windows系統上使用selectors模塊,就會找不到epoll,從而使用select。

一、selectors語法:

#定義一個對象
sel = selectors.DefaultSelector()

#註冊一個事件
sel.register(server,selectors.EVENT_READ,accept) #註冊事件,只要來一個鏈接就調accept這個函數,就至關於以前select的用法,sel.register(server,selectors.EVENT_READ,accept) ==  inputs=[server,],readable,writeable,exceptional = select.select(inputs,outputs,inputs)意思是同樣的。

 

 

 二、selectors代碼實例:

import selectors,socket
 
sel = selectors.DefaultSelector()
 
def accept(sock,mask):
    "接收客戶端信息實例"
    conn,addr = sock.accept()
    print("accepted",conn,'from',addr)
    conn.setblocking(False)
    sel.register(conn,selectors.EVENT_READ,read)  #新鏈接註冊read回調函數
 
def read(conn,mask):
    "接收客戶端的數據"
    data = conn.recv(1024)
    if data:
        print("echoing",repr(data),'to',conn)
        conn.send(data)
    else:
        print("closing",conn)
        sel.unregister(conn)
        conn.close()
 
server = socket.socket()
server.bind(('localhost',9999))
server.listen(500)
server.setblocking(False)
sel.register(server,selectors.EVENT_READ,accept)  #註冊事件,只要來一個鏈接就調accept這個函數,
#sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,]
 
while True:
    events = sel.select()  #這個select,看起來是select,有可能調用的是epoll,看你操做系統是Windows的仍是Linux的
                           #默認阻塞,有活動鏈接就返回活動鏈接列表
    print("事件:",events)
    for key,mask in events:
        callback = key.data #至關於調accept了
        callback(key.fileobj,mask)  #key.fileobj=文件句柄

 

打印服務端:

 [(SelectorKey(fileobj=<socket.socket fd=436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222)>, fd=436, events=1, data=<function accept at 0x0000022296063E18>), 1)]
accepted <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)> from ('127.0.0.1', 50281)
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'adas' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'HA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'asdHA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>

 

 這樣就容易明白:callback = key.data #第一次調用的是accept,第二次調用的是read    callback(key.fileobj,mask)  #key.fileobj=文件句柄

 

客戶端代碼:

在Linux端,selectors模塊才能是epoll

import socket,sys
 
messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 9999)
 
# 建立100個 TCP/IP socket實例
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)]
 
# 鏈接服務端
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)
 
for message in messages:
 
    # 發送消息至服務端
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)
 
    # 從服務端接收消息
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )
相關文章
相關標籤/搜索