select、poll、epoll三者的區別 python
select linux
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組(在linux中一切事物皆文件,塊設備,socket鏈接等。),當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位(變成ready),使得進程能夠得到這些文件描述符從而進行後續的讀寫操做(select會不斷監視網絡接口的某個目錄下有多少文件描述符變成ready狀態【在網絡接口中,過來一個鏈接就會創建一個'文件'】,變成ready狀態後,select就能夠操做這個文件描述符了)。數組
【socketserver是經過多線程來處理多個請求,每一個鏈接過來分配一個線程來處理,可是select是單進程的,一個進程執行代碼確定就是串行的,可是如今就要經過一個進程來實現併發的效果,一個進程下只有一個主線程,也就說說用一個線程實現併發的效果。爲何要用一個進程實現多併發而不採用多線程實現多併發呢?緩存
==========答:由於一個進程實現多併發比多線程是實現多併發的效率還要高,由於啓動多線程會有不少的開銷,並且CPU要不斷的檢查每一個線程的狀態,肯定哪一個線程是否能夠執行。這個對系統來講也是有壓力的,用單進程的話就能夠避免這種開銷和給系統帶來的壓力,網絡
那麼單進程是如何實現多併發的呢???多線程
========答:很巧妙的使用了生產者和消費者的模式(異步),生產者和消費者能夠實現非阻塞,一個socketserver經過select接收多個鏈接過來(以前的socket一個進程只能接收一個鏈接,當接收新的鏈接的時候產生阻塞,由於這個socket進程要先和客戶端進行通訊,兩者是彼此互相等待的【客戶端發一條消息,服務端收到,客戶端等着返回....服務端等着接收.........】一直在阻塞着,這個時候若是再來一個鏈接,要等以前的那個鏈接斷了,這個才能夠連進來。-----------也就是說用基本的socket實現多進程是阻塞的。爲了解決這個問題採用每來一個鏈接產生一個線程,是不阻塞了,可是當線程數量過多的時候,對於cpu來講開銷和壓力是比較大的。)對於單個socket來講,阻塞的時候大部分的時候都是在等待IO操做(網絡操做也屬於IO操做)。爲了不這種狀況,就出現了異步=============客戶端發起一個鏈接,會在服務端註冊一個文件句柄,服務端會不斷輪詢這些文件句柄的列表,主進程和客戶端創建鏈接而沒有啓動線程,這個時候主進程和客戶端進行交互,其餘的客戶端是沒法鏈接主進程的,爲了實現主進程既能和已鏈接的客戶端收發消息,又能和新的客戶端創建鏈接,就把輪詢變的很是快(死循環)去刷客戶端鏈接進來的文件句柄的列表,只要客戶端發消息了,服務端讀取了消息以後,有另外一個列表去接收給客戶端返回的消息,也不斷的去刷這個列表,刷出來後返回給客戶端,這樣和客戶端的此次通訊就完成了,可是跟客戶端的鏈接尚未斷,可是就進入了下一次的輪詢。】併發
select目前幾乎在全部的平臺上支持,良好跨平臺性。app
調用select的函數爲readable,writable,exceptional = select.select(rlist, wlist, xlist[, timeout])
,前三個參數都分別是三個列表,數組中的對象均爲waitable object
:均是整數的文件描述符(file descriptor)或者一個擁有返回文件描述符方法fileno()
的對象;
異步
rlist
: 等待讀就緒的listwlist
: 等待寫就緒的listerrlist
: 等待「異常」的list
select方法用來監視文件描述符,若是文件描述符發生變化,則獲取該描述符。
二、當 rlist
序列中的描述符發生可讀時(accetp和read),則獲取發生變化的描述符並添加到 readable
序列中
三、當 wlist
序列中含有描述符時,則將該序列中全部的描述符添加到 writable
序列中
四、當 errlist
序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 exceptional
序列中
五、當 超時時間 未設置,則select會一直阻塞,直到監聽的描述符發生變化
當 超時時間 =
1
時,那麼若是監聽的句柄均無任何變化,則select會阻塞
1
秒,以後返回三個空列表,若是監聽的描述符(fd)有變化,則直接執行。
file
對象(好比
sys.stdin
,或者會被
open()
和
os.open()
返回的object),socket object將會返回
socket.socket()
。也能夠自定義類,只要有一個合適的
fileno()
的方法(須要真實返回一個文件描述符,而不是一個隨機的整數)。
select 示例:socket
Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器
1 #coding:UTF8 2 3 import select 4 import socket 5 import sys 6 import Queue 7 8 #建立一個TCP/IP 進程 9 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 10 server.setblocking(0) 11 12 #鏈接地址和端口 13 server_address = ('localhost',10000) 14 print >>sys.stderr,'starting up on %s prot %s' % server_address 15 server.bind(server_address) 16 17 #最大容許連接數 18 server.listen(5) 19 20 inputs = [ server ] 21 outputs = [] 22 23 message_queues = {} 24 25 while inputs: 26 print >>sys.stderr,'\nwaiting for the next event' 27 readable,writable,exceptional = select.select(inputs,outputs,inputs) 28 29 # Handle inputs 30 for s in readable: 31 32 if s is server: 33 # A "readable" server socket is ready to accept a connection 34 connection, client_address = s.accept() 35 print >>sys.stderr, 'new connection from', client_address 36 #connection.setblocking(0) 37 inputs.append(connection) 38 39 # Give the connection a queue for data we want to send 40 message_queues[connection] = Queue.Queue() 41 42 else: 43 data = s.recv(1024) 44 if data: 45 # A readable client socket has data 46 print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) 47 message_queues[s].put(data) #這個s至關於connection 48 # Add output channel for response 49 if s not in outputs: 50 outputs.append(s) 51 52 else: 53 # Interpret empty result as closed connection 54 print >>sys.stderr, 'closing', client_address, 'after reading no data' 55 # Stop listening for input on the connection 56 if s in outputs: 57 outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉 58 inputs.remove(s) #inputs中也刪除掉 59 s.close() #把這個鏈接關閉掉 60 61 # Remove message queue 62 del message_queues[s] 63 64 # Handle outputs 65 for s in writable: 66 try: 67 next_msg = message_queues[s].get_nowait() 68 except Queue.Empty: 69 # No messages waiting so stop checking for writability. 70 print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty' 71 outputs.remove(s) 72 else: 73 print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername()) 74 s.send(next_msg.upper()) 75 # Handle "exceptional conditions" 76 for s in exceptional: 77 print >>sys.stderr, 'handling exceptional condition for', s.getpeername() 78 # Stop listening for input on the connection 79 inputs.remove(s) 80 if s in outputs: 81 outputs.remove(s) 82 s.close() 83 84 # Remove message queue 85 del message_queues[s]
代碼解析:
select()方法接收並監控3個通訊列表, 第一個是全部的輸入的data,就是指外部發過來的數據,第2個是監控和接收全部要發出去的data(outgoing data),第3個監控錯誤信息,接下來咱們須要建立2個列表來包含輸入和輸出信息來傳給select().
# Sockets from which we expect to read
inputs = [ server ] # Sockets to which we expect to write outputs = [ ]
全部客戶端的進來的鏈接和數據將會被server的主循環程序放在上面的list中處理,咱們如今的server端須要等待鏈接可寫(writable)以後才能過來,而後接收數據並返回(所以不是在接收到數據以後就馬上返回),由於每一個鏈接要把輸入或輸出的數據先緩存到queue裏,而後再由select取出來再發出去。
# Outgoing message queues (socket:Queue)
message_queues = {}
The main portion of the server program loops, calling select() to block and wait for network activity.
下面是此程序的主循環,調用select()時會阻塞和等待直到新的鏈接和數據進來
while inputs: # Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs)
當你把inputs,outputs,exceptional(這裏跟inputs共用)傳給select()後,它返回3個新的list,咱們上面將他們分別賦值爲readable,writable,exceptional, 全部在readable list中的socket鏈接表明有數據可接收(recv),全部在writable list中的存放着你能夠對其進行發送(send)操做的socket鏈接,當鏈接通訊出現error時會把error寫到exceptional列表中。
Readable list 中的socket 能夠有3種可能狀態,第一種是若是這個socket是main "server" socket,它負責監聽客戶端的鏈接,若是這個main server socket出如今readable裏,那表明這是server端已經ready來接收一個新的鏈接進來了,爲了讓這個main server能同時處理多個鏈接,在下面的代碼裏,咱們把這個main server的socket設置爲非阻塞模式。
第二種狀況是這個socket是已經創建了的鏈接,它把數據發了過來,這個時候你就能夠經過recv()來接收它發過來的數據,而後把接收到的數據放到queue裏,這樣你就能夠把接收到的數據再傳回給客戶端了。
第三種狀況就是這個客戶端已經斷開了,因此你再經過recv()接收到的數據就爲空了,因此這個時候你就能夠把這個跟客戶端的鏈接關閉了。
對於writable list中的socket,也有幾種狀態,若是這個客戶端鏈接在跟它對應的queue裏有數據,就把這個數據取出來再發回給這個客戶端,不然就把這個鏈接從output list中移除,這樣下一次循環select()調用時檢測到outputs list中沒有這個鏈接,那就會認爲這個鏈接還處於非活動狀態
最後,若是在跟某個socket鏈接通訊過程當中出了錯誤,就把這個鏈接對象在inputs\outputs\message_queue中都刪除,再把鏈接關閉掉
1 #coding:UTF8 2 3 import socket 4 import sys 5 6 messages = [ 'This is the message. ', 7 'It will be sent ', 8 'in parts.', 9 ] 10 server_address = ('localhost', 10003) 11 12 # Create a TCP/IP socket 13 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 14 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 15 ] 16 17 # Connect the socket to the port where the server is listening 18 print >>sys.stderr, 'connecting to %s port %s' % server_address 19 for s in socks: 20 s.connect(server_address) 21 22 for message in messages: 23 24 # Send messages on both sockets 25 for s in socks: 26 print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message) 27 s.send(message) 28 29 # Read responses on both sockets 30 for s in socks: 31 data = s.recv(1024) 32 print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data) 33 if not data: 34 print >>sys.stderr, 'closing socket', s.getsockname()
客戶端程序展現瞭如何經過select()對socket進行管理並與多個鏈接同時進行交互,經過循環經過每一個socket鏈接給server發送和接收數據。
server: starting up on localhost prot 10000 waiting for the next event new connection from ('127.0.0.1', 54812) waiting for the next event new connection from ('127.0.0.1', 54813) received "This is the message. " from ('127.0.0.1', 54812) waiting for the next event received "This is the message. " from ('127.0.0.1', 54813) sending "This is the message. " to ('127.0.0.1', 54812) waiting for the next event output queue for ('127.0.0.1', 54812) is empty sending "This is the message. " to ('127.0.0.1', 54813) waiting for the next event output queue for ('127.0.0.1', 54813) is empty waiting for the next event received "It will be sent " from ('127.0.0.1', 54812) received "It will be sent " from ('127.0.0.1', 54813) waiting for the next event sending "It will be sent " to ('127.0.0.1', 54812) sending "It will be sent " to ('127.0.0.1', 54813) waiting for the next event output queue for ('127.0.0.1', 54812) is empty output queue for ('127.0.0.1', 54813) is empty waiting for the next event received "in parts." from ('127.0.0.1', 54812) received "in parts." from ('127.0.0.1', 54813) waiting for the next event sending "in parts." to ('127.0.0.1', 54812) sending "in parts." to ('127.0.0.1', 54813) waiting for the next event output queue for ('127.0.0.1', 54812) is empty output queue for ('127.0.0.1', 54813) is empty waiting for the next event closing ('127.0.0.1', 54813) after reading no data closing ('127.0.0.1', 54813) after reading no data waiting for the next event client: connecting to localhost port 10000 ('127.0.0.1', 54812): sending "This is the message. " ('127.0.0.1', 54813): sending "This is the message. " ('127.0.0.1', 54812): received "THIS IS THE MESSAGE. " ('127.0.0.1', 54813): received "THIS IS THE MESSAGE. " ('127.0.0.1', 54812): sending "It will be sent " ('127.0.0.1', 54813): sending "It will be sent " ('127.0.0.1', 54812): received "IT WILL BE SENT " ('127.0.0.1', 54813): received "IT WILL BE SENT " ('127.0.0.1', 54812): sending "in parts." ('127.0.0.1', 54813): sending "in parts." ('127.0.0.1', 54812): received "IN PARTS." ('127.0.0.1', 54813): received "IN PARTS."
原文:http://pymotw.com/2/select/
poll
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。
poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll() 的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。
select.poll()
,返回一個poll的對象,支持註冊和註銷文件描述符。
poll.register(fd[, eventmask])
註冊一個文件描述符,註冊後,能夠經過poll()
方法來檢查是否有對應的I/O事件發生。fd
能夠是i 個整數,或者有返回整數的fileno()
方法對象。若是File對象實現了fileno(),也能夠看成參數使用。
eventmask
是一個你想去檢查的事件類型,它能夠是常量POLLIN
, POLLPRI
和 POLLOUT
的組合。若是缺省,默認會去檢查全部的3種事件類型。
事件常量 | 意義 |
---|---|
POLLIN | 有數據讀取 |
POLLPRT | 有數據緊急讀取 |
POLLOUT | 準備輸出:輸出不會阻塞 |
POLLERR | 某些錯誤狀況出現 |
POLLHUP | 掛起 |
POLLNVAL | 無效請求:描述沒法打開 |
poll.modify(fd, eventmask)
修改一個已經存在的fd,和poll.register(fd, eventmask)
有相同的做用。若是去嘗試修改一個未經註冊的fd,會引發一個errno
爲ENOENT的IOError
。poll.unregister(fd)
從poll對象中註銷一個fd。嘗試去註銷一個未經註冊的fd,會引發KeyError
。poll.poll([timeout])
去檢測已經註冊了的文件描述符。會返回一個可能爲空的list,list中包含着(fd, event)
這樣的二元組。 fd
是文件描述符, event
是文件描述符對應的事件。若是返回的是一個空的list,則說明超時了且沒有文件描述符有事件發生。timeout
的單位是milliseconds,若是設置了timeout
,系統將會等待對應的時間。若是timeout
缺省或者是None
,這個方法將會阻塞直到對應的poll對象有一個事件發生。#coding: utf-8 import select, socket response = b"hello world" serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) serversocket.bind(('localhost', 10000)) serversocket.listen(1) serversocket.setblocking(0) # poll = select.poll() poll.register(serversocket.fileno(), select.POLLIN) connections = {} while True: for fd, event in poll.poll(): if event == select.POLLIN: if fd == serversocket.fileno(): con, addr = serversocket.accept() poll.register(con.fileno(), select.POLLIN) connections[con.fileno()] = con else: con = connections[fd] data = con.recv(1024) if data: poll.modify(con.fileno(), select.POLLOUT) elif event == select.POLLOUT: con = connections[fd] con.send(response) poll.unregister(con.fileno()) con.close()
epoll
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。
epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明 就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了 這些文件描述符在系統調用時複製的開銷。
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描 述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調 機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
select.epoll([sizehint=-1])
返回一個epoll對象。
eventmask
事件常量 | 意義 |
---|---|
EPOLLIN | 讀就緒 |
EPOLLOUT | 寫就緒 |
EPOLLPRI | 有數據緊急讀取 |
EPOLLERR | assoc. fd有錯誤狀況發生 |
EPOLLHUP | assoc. fd發生掛起 |
EPOLLRT | 設置邊緣觸發(ET)(默認的是水平觸發) |
EPOLLONESHOT | 設置爲 one-short 行爲,一個事件(event)被拉出後,對應的fd在內部被禁用 |
EPOLLRDNORM | 和 EPOLLIN 相等 |
EPOLLRDBAND | 優先讀取的數據帶(data band) |
EPOLLWRNORM | 和 EPOLLOUT 相等 |
EPOLLWRBAND | 優先寫的數據帶(data band) |
EPOLLMSG | 忽視 |
epoll.close()
關閉epoll對象的文件描述符。epoll.fileno
返回control fd的文件描述符number。epoll.fromfd(fd)
用給予的fd來建立一個epoll對象。epoll.register(fd[, eventmask])
在epoll對象中註冊一個文件描述符。(若是文件描述符已經存在,將會引發一個IOError
)epoll.modify(fd, eventmask)
修改一個已經註冊的文件描述符。epoll.unregister(fd)
註銷一個文件描述符。epoll.poll(timeout=-1[, maxevnets=-1])
等待事件,timeout(float)的單位是秒(second)。1 #coding:Utf8 2 import socket, select 3 4 EOL1 = b'\n\n' 5 EOL2 = b'\n\r\n' 6 response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n' 7 response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n' 8 response += b'Hello, world!' 9 10 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 11 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 12 serversocket.bind(('localhost', 10000)) 13 serversocket.listen(1) 14 serversocket.setblocking(0) 15 16 epoll = select.epoll() 17 epoll.register(serversocket.fileno(), select.EPOLLIN) 18 19 try: 20 connections = {}; requests = {}; responses = {} 21 while True: 22 events = epoll.poll(1) 23 for fileno, event in events: 24 if fileno == serversocket.fileno(): 25 connection, address = serversocket.accept() 26 connection.setblocking(0) 27 epoll.register(connection.fileno(), select.EPOLLIN) 28 connections[connection.fileno()] = connection 29 requests[connection.fileno()] = b'' 30 responses[connection.fileno()] = response 31 elif event & select.EPOLLIN: 32 requests[fileno] += connections[fileno].recv(1024) 33 if EOL1 in requests[fileno] or EOL2 in requests[fileno]: 34 epoll.modify(fileno, select.EPOLLOUT) 35 print('-'*40 + '\n' + requests[fileno].decode()[:-2]) 36 elif event & select.EPOLLOUT: 37 byteswritten = connections[fileno].send(responses[fileno]) 38 responses[fileno] = responses[fileno][byteswritten:] 39 if len(responses[fileno]) == 0: 40 epoll.modify(fileno, 0) 41 connections[fileno].shutdown(socket.SHUT_RDWR) 42 elif event & select.EPOLLHUP: 43 epoll.unregister(fileno) 44 connections[fileno].close() 45 del connections[fileno] 46 finally: 47 epoll.unregister(serversocket.fileno()) 48 epoll.close() 49 serversocket.close()