首先列一下,sellect、poll、epoll三者的區別
select
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。python
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。windows
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。數組
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。網絡
poll
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。數據結構
poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。app
另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。socket
epoll
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。ide
epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。函數
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。性能
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
使用 select :
在python中,select函數是一個對底層操做系統的直接訪問的接口。它用來監控sockets、files和pipes,等待IO完成(Waiting for I/O completion)。當有可讀、可寫或是異常事件產生時,select能夠很容易的監控到。
select.select(rlist, wlist, xlist[, timeout]) 傳遞三個參數,一個爲輸入而觀察的文件對象列表,一個爲輸出而觀察的文件對象列表和一個觀察錯誤異常的文件列表。第四個是一個可選參數,表示超時秒數。其返回3個tuple,每一個tuple都是一個準備好的對象列表,它和前邊的參數是同樣的順序。下面,主要結合代碼,簡單說說select的使用。
Server端程序:
一、該程序主要是利用socket進行通訊,接收客戶端發送過來的數據,而後再發還給客戶端。
二、首先創建一個TCP/IP socket,並將其設爲非阻塞,而後進行bind和listen。
三、經過select函數獲取到三種文件列表,分別對每一個列表的每一個元素進行輪詢,對不一樣socket進行不一樣的處理,最外層循環直到inputs列表爲空爲止
四、當設置timeout參數時,若是發生了超時,select函數會返回三個空列表。
代碼以下(代碼中已經有很詳細的註釋,這裏就不過多解釋了):
'''
Created on 2012-1-6
The echo server example from the socket section can be extanded to watche for more than
one connection at a time by using select() .The new version starts out by creating a nonblocking
TCP/IP socket and configuring it to listen on an address
@author : xiaojay
'''
import
select
import
socket
import
Queue
#create a socket
server
=
socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(
False
)
#set option reused
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR ,
1
)
server_address
=
(
'192.168.1.102'
,
10001
)
server.bind(server_address)
server.listen(
10
)
#sockets from which we except to read
inputs
=
[server]
#sockets from which we expect to write
outputs
=
[]
#Outgoing message queues (socket:Queue)
message_queues
=
{}
#A optional parameter for select is TIMEOUT
timeout
=
20
while
inputs:
print
"waiting for next event"
readable , writable , exceptional
=
select.select(inputs, outputs, inputs, timeout)
# When timeout reached , select return three empty lists
if
not
(readable
or
writable
or
exceptional) :
print
"Time out ! "
break
;
for
s
in
readable :
if
s
is
server:
# A "readable" socket is ready to accept a connection
connection, client_address
=
s.accept()
print
" connection from "
, client_address
connection.setblocking(
0
)
inputs.append(connection)
message_queues[connection]
=
Queue.Queue()
else
:
data
=
s.recv(
1024
)
if
data :
print
" received "
, data ,
"from "
,s.getpeername()
message_queues[s].put(data)
# Add output channel for response
if
s
not
in
outputs:
outputs.append(s)
else
:
#Interpret empty result as closed connection
print
" closing"
, client_address
if
s
in
outputs :
outputs.remove(s)
inputs.remove(s)
s.close()
#remove message queue
del
message_queues[s]
for
s
in
writable:
try
:
next_msg
=
message_queues[s].get_nowait()
except
Queue.Empty:
print
" "
, s.getpeername() ,
'queue empty'
outputs.remove(s)
else
:
print
" sending "
, next_msg ,
" to "
, s.getpeername()
s.send(next_msg)
for
s
in
exceptional:
print
" exception condition on "
, s.getpeername()
#stop listening for input on the connection
inputs.remove(s)
if
s
in
outputs:
outputs.remove(s)
s.close()
#Remove message queue
del
message_queues[s]
Client端程序:
Client端建立多個socket進行server連接,用於觀察使用select函數的server端如何進行處理。
代碼以下(代碼中已經有很詳細的註釋,這裏就不過多解釋了):
'''
Created on 2012-1-5
The example client program uses some sockets to demonstrate how the server
with select() manages multiple connections at the same time . The client
starts by connecting each TCP/IP socket to the server
@author : peter
'''
import
socket
messages
=
[
"This is the message"
,
"It will be sent"
,
"in parts "
]
print
"Connect to the server"
server_address
=
(
"192.168.1.102"
,
10001
)
#Create a TCP/IP sock
socks
=
[]
for
i
in
range
(
10
):
socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM))
for
s
in
socks:
s.connect(server_address)
counter
=
0
for
message
in
messages :
#Sending message from different sockets
for
s
in
socks:
counter
+
=
1
print
" %s sending %s"
%
(s.getpeername(),message
+
" version "
+
str
(counter))
s.send(message
+
" version "
+
str
(counter))
#Read responses on both sockets
for
s
in
socks:
data
=
s.recv(
1024
)
print
" %s received %s"
%
(s.getpeername(),data)
if
not
data:
print
"closing socket "
,s.getpeername()
s.close()
使用Poll:
Server端:
'''
Created on 2012-1-6
The poll function provides similar features to select() , but the underlying implementation is more efficient.
But poll() is not supported under windows .
@author : xiaojay
'''
import
socket
import
select
import
Queue
# Create a TCP/IP socket, and then bind and listen
server
=
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(
False
)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
1
)
server_address
=
(
"192.168.1.102"
,
10001
)
print
"Starting up on %s port %s"
%
server_address
server.bind(server_address)
server.listen(
5
)
message_queues
=
{}
#The timeout value is represented in milliseconds, instead of seconds.
timeout
=
1000
# Create a limit for the event
READ_ONLY
=
( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE
=
(READ_ONLY|select.POLLOUT)
# Set up the poller
poller
=
select.poll()
poller.register(server,READ_ONLY)
#Map file descriptors to socket objects
fd_to_socket
=
{server.fileno():server,}
while
True
:
print
"Waiting for the next event"
events
=
poller.poll(timeout)
print
"*"
*
20
print
len
(events)
print
events
print
"*"
*
20
for
fd ,flag
in
events:
s
=
fd_to_socket[fd]
if
flag & (select.POLLIN | select.POLLPRI) :
if
s
is
server :
# A readable socket is ready to accept a connection
connection , client_address
=
s.accept()
print
" Connection "
, client_address
connection.setblocking(
False
)
fd_to_socket[connection.fileno()]
=
connection
poller.register(connection,READ_ONLY)
#Give the connection a queue to send data
message_queues[connection]
=
Queue.Queue()
else
:
data
=
s.recv(
1024
)
if
data:
# A readable client socket has data
print
" received %s from %s "
%
(data, s.getpeername())
message_queues[s].put(data)
poller.modify(s,READ_WRITE)
else
:
# Close the connection
print
" closing"
, s.getpeername()
# Stop listening for input on the connection
poller.unregister(s)
s.close()
del
message_queues[s]
elif
flag & select.POLLHUP :
#A client that "hang up" , to be closed.
print
" Closing "
, s.getpeername() ,
"(HUP)"
poller.unregister(s)
s.close()
elif
flag & select.POLLOUT :
#Socket is ready to send data , if there is any to send
try
:
next_msg
=
message_queues[s].get_nowait()
except
Queue.Empty:
# No messages waiting so stop checking
print
s.getpeername() ,
" queue empty"
poller.modify(s,READ_ONLY)
else
:
print
" sending %s to %s"
%
(next_msg , s.getpeername())
s.send(next_msg)
elif
flag & select.POLLERR:
#Any events with POLLERR cause the server to close the socket
print
" exception on"
, s.getpeername()
poller.unregister(s)
s.close()
del
message_queues[s]