python之IO多路複用(二)——select、poll、epoll詳解


  select,poll,epoll都是IO多路複用的機制。I/O多路複用就是經過一種機制使一個進程能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。python

  select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的linux

  異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。windows


  sellect、poll、epoll三者的區別 :數組

  select:
網絡

  目前支持幾乎全部的平臺數據結構

  默認單個進程可以監視的文件描述符的數量存在最大限制,在linux上默認只支持1024個socket併發

    能夠經過修改宏定義或從新編譯內核(修改系統最大支持的端口數)的方式提高這一限制app

  內核準備好數據後通知用戶有數據了,但不告訴用戶是哪一個鏈接有數據,用戶只能經過輪詢的方式來獲取數據異步

    假定select讓內核監視100個socket鏈接,當有1個鏈接有數據後,內核就通知用戶100個鏈接中有數據了socket

    可是不告訴用戶是哪一個鏈接有數據了,此時用戶只能經過輪詢的方式一個個去檢查而後獲取數據

    這裏是假定有100個socket鏈接,那麼若是有上萬個,上十萬個呢?

    那你就得輪詢上萬次,上十萬次,而你所取的結果僅僅就那麼1個。這樣就會浪費不少沒用的開銷

  只支持水平觸發

  每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大

  同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也會很大

  poll:

  與select沒有本質上的差異,僅僅是沒有了最大文件描述符數量的限制

  只支持水平觸發

  只是一個過渡版本,不多用

  epoll:

  Linux2.6纔出現的epoll,具有了select和poll的一切優勢,公認爲性能最好的多路IO就緒通知方法

  沒有最大文件描述符數量的限制

  同時支持水平觸發和邊緣觸發

  不支持windows平臺

  內核準備好數據之後會通知用戶哪一個鏈接有數據了

  IO效率不隨fd數目增長而線性降低

  使用mmap加速內核與用戶空間的消息傳遞


  水平觸發與邊緣觸發:

  水平觸發:將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用epoll時將再次報告這些文件描述符,這種方式稱爲水平觸發

  邊緣觸發:只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發

  理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。


  select和epoll的特色

  select:

  select經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。

  因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。

  epoll:

  epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

  另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。


  select

select(rlist, wlist, xlist, timeout=None)

  select函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。

  調用後select函數會阻塞,直到有描述符就緒(有數據可讀、可寫、或者有except),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠經過遍歷fdset,來找到就緒的描述符。


  poll

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

  不一樣於select使用三個位圖來表示三個fdset的方式,poll使用一個pollfd的指針實現。

struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

  pollfd結構包含了要監視的event和發生的event,再也不使用select「參數-值」傳遞的方式。

  同時,pollfd並無最大數量限制(可是數量過大後性能也是會降低)。 

  和select函數同樣,poll返回後,須要輪詢pollfd來獲取就緒的描述符。


  從上面看,select和poll都須要在返回後,經過遍歷文件描述符來獲取已經就緒的socket。

  事實上,同時鏈接的大量客戶端在一時刻可能只有不多的處於就緒狀態,所以隨着監視的描述符數量的增加,其效率也會線性降低。


  epoll

  epoll是在2.6內核中提出的,是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。

  epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。

  epoll操做過程

  epoll操做過程須要三個接口,分別以下:

int epoll_create(int size);//建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

  1. int epoll_create(int size);

  建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。

  當建立好epoll句柄後,它就會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。


  2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

  函數是對指定描述符fd執行op操做。

  epfd:是epoll_create()的返回值。

  op:表示op操做,用三個宏來表示:

    添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。

    分別添加、刪除和修改對fd的監聽事件。

  fd:是須要監聽的fd(文件描述符)

  epoll_event:是告訴內核須要監聽什麼事


  3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

  等待epfd上的io事件,最多返回maxevents個事件。

  參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。


  一個簡單的select多併發socket服務端代碼以下:

#!/usr/bin/python
#Author:sean

import select
import socket
import queue

server = socket.socket()
HOST = 'localhost'
PORT = 8080
print("start up %s on port: %s",% (HOST,PORT))
server.bind((HOST,PORT))
server.listen()

server.setblocking(False)   #不阻塞

msg_dic_queue = {}    #這是一個隊列字典,存放要返回給客戶端的數據

inputs = [server]   #inputs裏存放要讓內核監測的鏈接,這裏的server是指監測server自己的鏈接狀態
#inputs = [server,conn]
outputs = []    #outputs裏存放要返回給客戶端的數據鏈接對象

while True:
    print("waiting for next connect...")
    readable,writeable,exceptional = select.select(inputs,outputs,inputs)   #若是沒有任何fd就緒,程序就會一直阻塞在這裏
    # print(readable,writeable,exceptional)
    for r in readable:  #處理活躍的鏈接,每一個r就是一個socket鏈接對象
        if r is server: #表明來了一個新鏈接
            conn,client_addr = server.accept()
            print("arrived a new connect: ",client_addr)
            conn.setblocking(False)
            inputs.append(conn) #由於這個新創建的鏈接還沒發數據來,如今就接收的話,程序就報異常了
            #因此要想實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn
            msg_dic_queue[conn] = queue.Queue()   #初始化一個隊列,後面存要返回給客戶端的數據
        else:   #r不是server的話就表明是一個與客戶端創建的文件描述符了
            #客戶端的數據過來了,在這裏接收
            data = r.recv(1024)
            if data:
                print("received data from [%s]: "% r.getpeername()[0],data)
                msg_dic_queue[r].put(data)  #收到的數據先放到隊列字典裏,以後再返回給客戶端
                if r not in outputs:
                    outputs.append(r)   #放入返回的鏈接隊列裏。爲了避免影響處理與其它客戶端的鏈接,這裏不馬上返回數據給客戶端
            else:   #若是收不到data就表明客戶端已經斷開了
                print("Client is disconnect",r)
                if r in outputs:
                    outputs.remove(r)   #清理已斷開的鏈接
                inputs.remove(r)
                del msg_dic_queue[r]

    for w in writeable: #處理要返回給客戶端的鏈接列表
        try:
            next_msg = msg_dic_queue[w].get_nowait()
        except queue.Empty:
            print("client [%s]"% w.getpeername()[0],"queue is empty...")
            outputs.remove(w)   #確保下次循環時writeable不返回已經處理完的鏈接
        else:
            print("sending message to [%s]"% w.getpeername()[0],next_msg)
            w.send(next_msg)    #返回給客戶端源數據

    for e in exceptional:   #處理異常鏈接
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del msg_dic_queue[e]

  select多併發socket客戶端代碼以下:

#!/usr/bin/python
#Author:sean

import socket

msgs = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
SERVER_ADDRESS = 'localhost'
SERVER_PORT = 8080

# Create a few TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(500) ]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % (SERVER_ADDRESS,SERVER_PORT))
for s in socks:
    s.connect((SERVER_ADDRESS,SERVER_PORT))

for message in msgs:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )


  epoll多併發socket服務端代碼以下:

#!/usr/bin/python
#Author:sean

import socket, logging
import select, errno

logger = logging.getLogger("network-server")

def InitLog():
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("network-server.log")
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)


if __name__ == "__main__":
    InitLog()

    try:
        # 建立 TCP socket 做爲監聽 socket
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    except socket.error as  msg:
        logger.error("create socket failed")

    try:
        # 設置 SO_REUSEADDR 選項
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    except socket.error as  msg:
        logger.error("setsocketopt SO_REUSEADDR failed")

    try:
        # 進行 bind -- 此處未指定 ip 地址,即 bind 了所有網卡 ip 上
        listen_fd.bind(('', 8008))
    except socket.error as  msg:
        logger.error("bind failed")

    try:
        # 設置 listen 的 backlog 數
        listen_fd.listen(10)
    except socket.error as  msg:
        logger.error(msg)

    try:
        # 建立 epoll 句柄
        epoll_fd = select.epoll()
        # 向 epoll 句柄中註冊 監聽 socket 的 可讀 事件
        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
    except select.error as  msg:
        logger.error(msg)

    connections = {}
    addresses = {}
    datalist = {}
    while True:
        # epoll 進行 fd 掃描的地方 -- 未指定超時時間則爲阻塞等待
        epoll_list = epoll_fd.poll()

        for fd, events in epoll_list:
            # 若爲監聽 fd 被激活
            if fd == listen_fd.fileno():
                # 進行 accept -- 得到鏈接上來 client 的 ip 和 port,以及 socket 句柄
                conn, addr = listen_fd.accept()
                logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                # 將鏈接 socket 設置爲 非阻塞
                conn.setblocking(0)
                # 向 epoll 句柄中註冊 鏈接 socket 的 可讀 事件
                epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
                # 將 conn 和 addr 信息分別保存起來
                connections[conn.fileno()] = conn
                addresses[conn.fileno()] = addr
            elif select.EPOLLIN & events:
                # 有 可讀 事件激活
                datas = ''
                while True:
                    try:
                        # 從激活 fd 上 recv 10 字節數據
                        data = connections[fd].recv(10)
                        # 若當前沒有接收到數據,而且以前的累計數據也沒有
                        if not data and not datas:
                            # 從 epoll 句柄中移除該 鏈接 fd
                            epoll_fd.unregister(fd)
                            # server 側主動關閉該 鏈接 fd
                            connections[fd].close()
                            logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                            break
                        else:
                            # 將接收到的數據拼接保存在 datas 中
                            datas += data
                    except socket.error as  msg:
                        # 在 非阻塞 socket 上進行 recv 須要處理 讀穿 的狀況
                        # 這裏其實是利用 讀穿 出 異常 的方式跳到這裏進行後續處理
                        if msg.errno == errno.EAGAIN:
                            logger.debug("%s receive %s" % (fd, datas))
                            # 將已接收數據保存起來
                            datalist[fd] = datas
                            # 更新 epoll 句柄中鏈接d 註冊事件爲 可寫
                            epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                            break
                        else:
                            # 出錯處理
                            epoll_fd.unregister(fd)
                            connections[fd].close()
                            logger.error(msg)
                            break
            elif select.EPOLLHUP & events:
                # 有 HUP 事件激活
                epoll_fd.unregister(fd)
                connections[fd].close()
                logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
            elif select.EPOLLOUT & events:
                # 有 可寫 事件激活
                sendLen = 0
                # 經過 while 循環確保將 buf 中的數據所有發送出去
                while True:
                    # 將以前收到的數據發回 client -- 經過 sendLen 來控制發送位置
                    sendLen += connections[fd].send(datalist[fd][sendLen:])
                    # 在所有發送完畢後退出 while 循環
                    if sendLen == len(datalist[fd]):
                        break
                # 更新 epoll 句柄中鏈接 fd 註冊事件爲 可讀
                epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
            else:
                # 其餘 epoll 事件不進行處理
                continue

  代碼好多?想用更簡單的代碼來實現一樣的效果?請往這兒走

相關文章
相關標籤/搜索