select,poll,epoll都是IO多路複用的機制。I/O多路複用就是經過一種機制使一個進程能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。python
select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的linux
異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。windows
sellect、poll、epoll三者的區別 :數組
select:
網絡
目前支持幾乎全部的平臺數據結構
默認單個進程可以監視的文件描述符的數量存在最大限制,在linux上默認只支持1024個socket併發
能夠經過修改宏定義或從新編譯內核(修改系統最大支持的端口數)的方式提高這一限制app
內核準備好數據後通知用戶有數據了,但不告訴用戶是哪一個鏈接有數據,用戶只能經過輪詢的方式來獲取數據異步
假定select讓內核監視100個socket鏈接,當有1個鏈接有數據後,內核就通知用戶100個鏈接中有數據了socket
可是不告訴用戶是哪一個鏈接有數據了,此時用戶只能經過輪詢的方式一個個去檢查而後獲取數據
這裏是假定有100個socket鏈接,那麼若是有上萬個,上十萬個呢?
那你就得輪詢上萬次,上十萬次,而你所取的結果僅僅就那麼1個。這樣就會浪費不少沒用的開銷
只支持水平觸發
每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大
同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也會很大
poll:
與select沒有本質上的差異,僅僅是沒有了最大文件描述符數量的限制
只支持水平觸發
只是一個過渡版本,不多用
epoll:
Linux2.6纔出現的epoll,具有了select和poll的一切優勢,公認爲性能最好的多路IO就緒通知方法
沒有最大文件描述符數量的限制
同時支持水平觸發和邊緣觸發
不支持windows平臺
內核準備好數據之後會通知用戶哪一個鏈接有數據了
IO效率不隨fd數目增長而線性降低
使用mmap加速內核與用戶空間的消息傳遞
水平觸發與邊緣觸發:
水平觸發:將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用epoll時將再次報告這些文件描述符,這種方式稱爲水平觸發
邊緣觸發:只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發
理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
select和epoll的特色:
select:
select經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。
因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
epoll:
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
select
select(rlist, wlist, xlist, timeout=None)
select函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。
調用後select函數會阻塞,直到有描述符就緒(有數據可讀、可寫、或者有except),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠經過遍歷fdset,來找到就緒的描述符。
poll
int poll (struct pollfd *fds, unsigned int nfds, int timeout);
不一樣於select使用三個位圖來表示三個fdset的方式,poll使用一個pollfd的指針實現。
struct pollfd { int fd; /* file descriptor */ short events; /* requested events to watch */ short revents; /* returned events witnessed */ };
pollfd結構包含了要監視的event和發生的event,再也不使用select「參數-值」傳遞的方式。
同時,pollfd並無最大數量限制(可是數量過大後性能也是會降低)。
和select函數同樣,poll返回後,須要輪詢pollfd來獲取就緒的描述符。
從上面看,select和poll都須要在返回後,經過遍歷文件描述符來獲取已經就緒的socket。
事實上,同時鏈接的大量客戶端在一時刻可能只有不多的處於就緒狀態,所以隨着監視的描述符數量的增加,其效率也會線性降低。
epoll
epoll是在2.6內核中提出的,是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。
epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。
epoll操做過程
epoll操做過程須要三個接口,分別以下:
int epoll_create(int size);//建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
1. int epoll_create(int size);
建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。
當建立好epoll句柄後,它就會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。
2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函數是對指定描述符fd執行op操做。
epfd:是epoll_create()的返回值。
op:表示op操做,用三個宏來表示:
添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。
分別添加、刪除和修改對fd的監聽事件。
fd:是須要監聽的fd(文件描述符)
epoll_event:是告訴內核須要監聽什麼事
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。
一個簡單的select多併發socket服務端代碼以下:
#!/usr/bin/python #Author:sean import select import socket import queue server = socket.socket() HOST = 'localhost' PORT = 8080 print("start up %s on port: %s",% (HOST,PORT)) server.bind((HOST,PORT)) server.listen() server.setblocking(False) #不阻塞 msg_dic_queue = {} #這是一個隊列字典,存放要返回給客戶端的數據 inputs = [server] #inputs裏存放要讓內核監測的鏈接,這裏的server是指監測server自己的鏈接狀態 #inputs = [server,conn] outputs = [] #outputs裏存放要返回給客戶端的數據鏈接對象 while True: print("waiting for next connect...") readable,writeable,exceptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,程序就會一直阻塞在這裏 # print(readable,writeable,exceptional) for r in readable: #處理活躍的鏈接,每一個r就是一個socket鏈接對象 if r is server: #表明來了一個新鏈接 conn,client_addr = server.accept() print("arrived a new connect: ",client_addr) conn.setblocking(False) inputs.append(conn) #由於這個新創建的鏈接還沒發數據來,如今就接收的話,程序就報異常了 #因此要想實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn msg_dic_queue[conn] = queue.Queue() #初始化一個隊列,後面存要返回給客戶端的數據 else: #r不是server的話就表明是一個與客戶端創建的文件描述符了 #客戶端的數據過來了,在這裏接收 data = r.recv(1024) if data: print("received data from [%s]: "% r.getpeername()[0],data) msg_dic_queue[r].put(data) #收到的數據先放到隊列字典裏,以後再返回給客戶端 if r not in outputs: outputs.append(r) #放入返回的鏈接隊列裏。爲了避免影響處理與其它客戶端的鏈接,這裏不馬上返回數據給客戶端 else: #若是收不到data就表明客戶端已經斷開了 print("Client is disconnect",r) if r in outputs: outputs.remove(r) #清理已斷開的鏈接 inputs.remove(r) del msg_dic_queue[r] for w in writeable: #處理要返回給客戶端的鏈接列表 try: next_msg = msg_dic_queue[w].get_nowait() except queue.Empty: print("client [%s]"% w.getpeername()[0],"queue is empty...") outputs.remove(w) #確保下次循環時writeable不返回已經處理完的鏈接 else: print("sending message to [%s]"% w.getpeername()[0],next_msg) w.send(next_msg) #返回給客戶端源數據 for e in exceptional: #處理異常鏈接 if e in outputs: outputs.remove(e) inputs.remove(e) del msg_dic_queue[e]
select多併發socket客戶端代碼以下:
#!/usr/bin/python #Author:sean import socket msgs = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] SERVER_ADDRESS = 'localhost' SERVER_PORT = 8080 # Create a few TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(500) ] # Connect the socket to the port where the server is listening print('connecting to %s port %s' % (SERVER_ADDRESS,SERVER_PORT)) for s in socks: s.connect((SERVER_ADDRESS,SERVER_PORT)) for message in msgs: # Send messages on both sockets for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() )
epoll多併發socket服務端代碼以下:
#!/usr/bin/python #Author:sean import socket, logging import select, errno logger = logging.getLogger("network-server") def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) if __name__ == "__main__": InitLog() try: # 建立 TCP socket 做爲監聽 socket listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) except socket.error as msg: logger.error("create socket failed") try: # 設置 SO_REUSEADDR 選項 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error as msg: logger.error("setsocketopt SO_REUSEADDR failed") try: # 進行 bind -- 此處未指定 ip 地址,即 bind 了所有網卡 ip 上 listen_fd.bind(('', 8008)) except socket.error as msg: logger.error("bind failed") try: # 設置 listen 的 backlog 數 listen_fd.listen(10) except socket.error as msg: logger.error(msg) try: # 建立 epoll 句柄 epoll_fd = select.epoll() # 向 epoll 句柄中註冊 監聽 socket 的 可讀 事件 epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) except select.error as msg: logger.error(msg) connections = {} addresses = {} datalist = {} while True: # epoll 進行 fd 掃描的地方 -- 未指定超時時間則爲阻塞等待 epoll_list = epoll_fd.poll() for fd, events in epoll_list: # 若爲監聽 fd 被激活 if fd == listen_fd.fileno(): # 進行 accept -- 得到鏈接上來 client 的 ip 和 port,以及 socket 句柄 conn, addr = listen_fd.accept() logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) # 將鏈接 socket 設置爲 非阻塞 conn.setblocking(0) # 向 epoll 句柄中註冊 鏈接 socket 的 可讀 事件 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) # 將 conn 和 addr 信息分別保存起來 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr elif select.EPOLLIN & events: # 有 可讀 事件激活 datas = '' while True: try: # 從激活 fd 上 recv 10 字節數據 data = connections[fd].recv(10) # 若當前沒有接收到數據,而且以前的累計數據也沒有 if not data and not datas: # 從 epoll 句柄中移除該 鏈接 fd epoll_fd.unregister(fd) # server 側主動關閉該 鏈接 fd connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) break else: # 將接收到的數據拼接保存在 datas 中 datas += data except socket.error as msg: # 在 非阻塞 socket 上進行 recv 須要處理 讀穿 的狀況 # 這裏其實是利用 讀穿 出 異常 的方式跳到這裏進行後續處理 if msg.errno == errno.EAGAIN: logger.debug("%s receive %s" % (fd, datas)) # 將已接收數據保存起來 datalist[fd] = datas # 更新 epoll 句柄中鏈接d 註冊事件爲 可寫 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) break else: # 出錯處理 epoll_fd.unregister(fd) connections[fd].close() logger.error(msg) break elif select.EPOLLHUP & events: # 有 HUP 事件激活 epoll_fd.unregister(fd) connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) elif select.EPOLLOUT & events: # 有 可寫 事件激活 sendLen = 0 # 經過 while 循環確保將 buf 中的數據所有發送出去 while True: # 將以前收到的數據發回 client -- 經過 sendLen 來控制發送位置 sendLen += connections[fd].send(datalist[fd][sendLen:]) # 在所有發送完畢後退出 while 循環 if sendLen == len(datalist[fd]): break # 更新 epoll 句柄中鏈接 fd 註冊事件爲 可讀 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) else: # 其餘 epoll 事件不進行處理 continue
代碼好多?想用更簡單的代碼來實現一樣的效果?請往這兒走