Python網絡編程中的select 和 poll I/O複用的簡單使用

首先列一下,sellect、poll、epoll三者的區別 
select 
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。python

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。windows

select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。數組

另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。網絡

poll 
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。數據結構

poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。app

另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。socket

epoll 
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。ide

epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。函數

epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。性能

另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

使用 select : 
在python中,select函數是一個對底層操做系統的直接訪問的接口。它用來監控sockets、files和pipes,等待IO完成(Waiting for I/O completion)。當有可讀、可寫或是異常事件產生時,select能夠很容易的監控到。 
select.select(rlist, wlist, xlist[, timeout]) 傳遞三個參數,一個爲輸入而觀察的文件對象列表,一個爲輸出而觀察的文件對象列表和一個觀察錯誤異常的文件列表。第四個是一個可選參數,表示超時秒數。其返回3個tuple,每一個tuple都是一個準備好的對象列表,它和前邊的參數是同樣的順序。下面,主要結合代碼,簡單說說select的使用。 
Server端程序: 
一、該程序主要是利用socket進行通訊,接收客戶端發送過來的數據,而後再發還給客戶端。 
二、首先創建一個TCP/IP socket,並將其設爲非阻塞,而後進行bind和listen。 
三、經過select函數獲取到三種文件列表,分別對每一個列表的每一個元素進行輪詢,對不一樣socket進行不一樣的處理,最外層循環直到inputs列表爲空爲止 
四、當設置timeout參數時,若是發生了超時,select函數會返回三個空列表。 
代碼以下(代碼中已經有很詳細的註釋,這裏就不過多解釋了):

'''
Created on 2012-1-6
The echo server example from the socket section can be extanded to watche for more than
one connection at a time by using select() .The new version starts out by creating a nonblocking
TCP/IP socket and configuring it to listen on an address
@author : xiaojay
'''
import  select
import  socket
import  Queue
 
#create a socket
server =  socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking( False )
#set option reused
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1 )
 
server_address =  ( '192.168.1.102' , 10001 )
server.bind(server_address)
 
server.listen( 10 )
 
#sockets from which we except to read
inputs =  [server]
 
#sockets from which we expect to write
outputs =  []
 
#Outgoing message queues (socket:Queue)
message_queues =  {}
 
#A optional parameter for select is TIMEOUT
timeout =  20
 
while  inputs:
     print  "waiting for next event"
     readable , writable , exceptional =  select.select(inputs, outputs, inputs, timeout)
 
     # When timeout reached , select return three empty lists
     if  not  (readable or  writable or  exceptional) :
         print  "Time out ! "
         break ;   
     for  s in  readable :
         if  s is  server:
             # A "readable" socket is ready to accept a connection
             connection, client_address =  s.accept()
             print  "    connection from " , client_address
             connection.setblocking( 0 )
             inputs.append(connection)
             message_queues[connection] =  Queue.Queue()
         else :
             data =  s.recv( 1024 )
             if  data :
                 print  " received "  , data , "from " ,s.getpeername()
                 message_queues[s].put(data)
                 # Add output channel for response   
                 if  s not  in  outputs:
                     outputs.append(s)
             else :
                 #Interpret empty result as closed connection
                 print  "  closing" , client_address
                 if  s in  outputs :
                     outputs.remove(s)
                 inputs.remove(s)
                 s.close()
                 #remove message queue
                 del  message_queues[s]
     for  s in  writable:
         try :
             next_msg =  message_queues[s].get_nowait()
         except  Queue.Empty:
             print  " "  , s.getpeername() , 'queue empty'
             outputs.remove(s)
         else :
             print  " sending "  , next_msg , " to " , s.getpeername()
             s.send(next_msg)
     
     for  s in  exceptional:
         print  " exception condition on " , s.getpeername()
         #stop listening for input on the connection
         inputs.remove(s)
         if  s in  outputs:
             outputs.remove(s)
         s.close()
         #Remove message queue
         del  message_queues[s]

Client端程序: 
Client端建立多個socket進行server連接,用於觀察使用select函數的server端如何進行處理。 
代碼以下(代碼中已經有很詳細的註釋,這裏就不過多解釋了)

'''
Created on 2012-1-5
The example client program uses some sockets to demonstrate how the server
with select() manages multiple connections at the same time . The client
starts by connecting each TCP/IP socket to the server
@author : peter
'''
 
import  socket
 
messages =  [ "This is the message"  ,
             "It will be sent"  ,
             "in parts " ]
 
print  "Connect to the server"
 
server_address =  ( "192.168.1.102" , 10001 )
 
#Create a TCP/IP sock
 
socks =  []
 
for  i in  range ( 10 ):
     socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM))
 
for  s in  socks:
     s.connect(server_address)
 
counter =  0
for  message in  messages :
     #Sending message from different sockets
     for  s in  socks:
         counter + = 1
         print  "  %s sending %s"  %  (s.getpeername(),message + " version " + str (counter))
         s.send(message + " version " + str (counter))
     #Read responses on both sockets
     for  s in  socks:
         data =  s.recv( 1024 )
         print  " %s received %s"  %  (s.getpeername(),data)
         if  not  data:
             print  "closing socket " ,s.getpeername()
             s.close()

使用Poll:

Server端:

'''
Created on 2012-1-6
The poll function provides similar features to select() , but the underlying implementation is more efficient.
But poll() is not supported under windows .
@author : xiaojay
'''
import  socket
import  select
import  Queue
 
# Create a TCP/IP socket, and then bind and listen
server =  socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking( False )
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
server_address =  ( "192.168.1.102" , 10001 )
 
print   "Starting up on %s port %s"  %  server_address
server.bind(server_address)
server.listen( 5 )
message_queues =  {}
#The timeout value is represented in milliseconds, instead of seconds.
timeout =  1000
# Create a limit for the event
READ_ONLY =  ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE =  (READ_ONLY|select.POLLOUT)
# Set up the poller
poller =  select.poll()
poller.register(server,READ_ONLY)
#Map file descriptors to socket objects
fd_to_socket =  {server.fileno():server,}
while  True :
     print  "Waiting for the next event"
     events =  poller.poll(timeout)
     print  "*" * 20
     print  len (events)
     print  events
     print  "*" * 20
     for  fd ,flag in   events:
         s =  fd_to_socket[fd]
         if  flag & (select.POLLIN | select.POLLPRI) :
             if  s is  server :
                 # A readable socket is ready to accept a connection
                 connection , client_address =  s.accept()
                 print  " Connection "  , client_address
                 connection.setblocking( False )
                 
                 fd_to_socket[connection.fileno()] =  connection
                 poller.register(connection,READ_ONLY)
                 
                 #Give the connection a queue to send data
                 message_queues[connection]  =  Queue.Queue()
             else  :
                 data =  s.recv( 1024 )
                 if  data:
                     # A readable client socket has data
                     print  "  received %s from %s "  %  (data, s.getpeername())
                     message_queues[s].put(data)
                     poller.modify(s,READ_WRITE)
                 else  :
                     # Close the connection
                     print  "  closing"  , s.getpeername()
                     # Stop listening for input on the connection
                     poller.unregister(s)
                     s.close()
                     del  message_queues[s]
         elif  flag & select.POLLHUP :
             #A client that "hang up" , to be closed.
             print  " Closing " , s.getpeername() , "(HUP)"
             poller.unregister(s)
             s.close()
         elif  flag & select.POLLOUT :
             #Socket is ready to send data , if there is any to send
             try :
                 next_msg =  message_queues[s].get_nowait()
             except  Queue.Empty:
                 # No messages waiting so stop checking
                 print  s.getpeername() , " queue empty"
                 poller.modify(s,READ_ONLY)
             else  :
                 print  " sending %s to %s"  %  (next_msg , s.getpeername())
                 s.send(next_msg)
         elif  flag & select.POLLERR:
             #Any events with POLLERR cause the server to close the socket
             print  "  exception on"  , s.getpeername()
             poller.unregister(s)
             s.close()
             del  message_queues[s]
相關文章
相關標籤/搜索