python網絡編程——IO多路複用之epoll

一、內核EPOLL模型講解

    此部分參考http://blog.csdn.net/mango_song/article/details/42643971博文並整理html

    首先咱們來定義流的概念,一個流能夠是文件,socket,pipe等能夠進行I/O操做的內核對象。不論是文件,仍是套接字(socket),仍是管道(pipe),咱們均可以把他們看做流。python

    以後咱們來討論I/O操做,經過read,咱們能夠從流中讀入數據;經過write,咱們能夠往流中寫入數據。如今假定1種情形,咱們須要從流中讀數據,可是流中尚未數據,(典型的例子爲,客戶端要從socket讀數據,可是服務器端尚未把數據傳回來),這時候該怎麼辦?編程

    阻塞:阻塞是個什麼概念呢?好比某個時候你在等快遞,可是你還不知道快遞何時過來,並且你也沒有別的事能夠幹(或者說接下來的事要等快遞來了才能作);那麼你能夠去睡覺了,由於你知道快遞把貨送來時必定會給你打電話(假定必定能叫醒你)。服務器

    非阻塞忙輪詢:接着上面等快遞的例子,若是用忙輪詢的方法,那麼你須要知道快遞員的手機號,而後每分鐘給他打個電話:「你到了沒?」網絡

    很明顯通常人不會用第二種作法,不只顯得無腦,浪費話費不說,還佔用了快遞員大量的時間。多線程

    大部分程序也不會用第二種作法,由於第一種方法經濟而簡單,經濟是指消耗不多的CPU時間,若是線程睡眠了,就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片架構

    爲了瞭解阻塞是如何進行的,咱們來討論緩衝區,以及內核緩衝區,最終把I/O事件解釋清楚。緩衝區的引入是爲了減小頻繁I/O操做而引發頻繁的系統調用(你知道它很慢的)當你操做一個流時,更多的是以緩衝區爲單位進行操做,這是相對於用戶空間而言。對於內核來講,也須要緩衝區。併發

    假設有一個管道,進程A爲管道的寫入方,B爲管道的讀出方。假設一開始內核緩衝區是空的,B做爲讀出方,被阻塞着。而後首先A往管道寫入,這時候內核緩衝區由空的狀態變到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之爲「緩衝區非空」。可是「緩衝區非空」事件通知B後,B卻尚未讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫入的數據會滯留在內核緩衝區中,若是內核也緩衝區滿了,B仍未開始讀數據,最終內核緩衝區會被填滿,這個時候會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,咱們把這個事件定義爲「緩衝區滿」。假設後來B終於開始讀數據了,因而內核的緩衝區空了出來,這時候內核會告訴A,內核緩衝區有空位了,你能夠從長眠中醒來了,繼續寫數據了,咱們把這個事件叫作「緩衝區非滿」。也許事件Y1已經通知了A,可是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩衝區空了。這個時候內核就告訴B,你須要阻塞了!,咱們把這個時間定爲「緩衝區空」。socket

    這四種情形涵蓋了四個I/O事件,內核緩衝區滿,內核緩衝區空,內核緩衝區非空,內核緩衝區非滿。這四個I/O事件是進行阻塞同步的根本。(若是不能理解「同步」是什麼概念,請學習操做系統的鎖,信號量,條件變量等任務同步方面的相關知識)。ide

    而後咱們來講說阻塞I/O的缺點。可是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。若是想要同時處理多個流,要麼多進程(fork),要麼多線程(pthread_create),很不幸這兩種方法效率都不高。因而再來考慮非阻塞忙輪詢的I/O方式,咱們發現能夠同時處理多個流(把一個流從阻塞模式切換到非阻塞模式再此不予討論):

1 while true {  
2      for i in stream[]; {  
3            if i has data  
4            read until unavailable  
5         }  
6 }  
View Code

    咱們只要不停的把全部流從頭至尾問一遍,又從頭開始。這樣就能夠處理多個流了,但這樣的作法顯然很差,由於若是全部的流都沒有數據,那麼只會白白浪費CPU。這裏要補充一點,阻塞模式下,內核對於I/O事件的處理是阻塞或者喚醒,而非阻塞模式下則把I/O事件交給其餘對象(後文介紹的select以及epoll)處理甚至直接忽略。

    爲了不CPU空轉,能夠引進一個代理(一開始有一位叫作select的代理,後來又有一位叫作poll的代理,不過二者的本質是同樣的)。這個代理比較厲害,能夠同時觀察許多流的I/O事件,在空閒的時候,會把當前線程阻塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,因而咱們的程序就會輪詢一遍全部的流(因而咱們能夠把「忙」字去掉了)。代碼長這樣:

1 while true {  
2       select(streams[])  
3       for i in streams[] {  
4             if i has data  
5             read until unavailable  
6        }  
7 }  
View Code

    因而,若是沒有I/O事件產生,咱們的程序就會阻塞在select處。可是依然有個問題,咱們從select那裏僅僅知道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至所有),咱們只能無差異輪詢全部流,找出能讀出數據,或者寫入數據的流,對他們進行操做。

    可是使用select,咱們有O(n)的無差異輪詢複雜度,同時處理的流越多,每一次無差異輪詢時間就越長。再次說了這麼多,終於能好好解釋epoll了。
    epoll能夠理解爲event poll,不一樣於忙輪詢和無差異輪詢,epoll只會把哪一個流發生了怎樣的I/O事件通知咱們。此時咱們對這些流的操做都是有意義的(複雜度下降到了O(1))。
    在討論epoll的實現細節以前,先把epoll的相關操做列出:
1 epoll_create建立一個epoll對象,通常epollfd = epoll_create()  
2 epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增長/刪除某一個流的某一個事件  
3  好比  
4 epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//註冊緩衝區非空事件,即有數據流入  
5 epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//註冊緩衝區非滿事件,即流能夠被寫入  
6 epoll_wait(epollfd,...)等待直到註冊的事件發生  
7 (注:當對一個非阻塞流的讀寫發生緩衝區滿或緩衝區空,write/read會返回-1,並設置errno=EAGAIN。而epoll只關心緩衝區非滿和緩衝區非空事件)。  
View Code

 一個epoll模式的代碼大概的樣子是:

View Code

2 python中的epoll

   從以上可知,epoll是對select、poll模型的改進,提升了網絡編程的性能,普遍應用於大規模併發請求的C/S架構中。

  一、觸發方式:

     邊緣觸發/水平觸發,只適用於Unix/Linux操做系統

   二、原理圖

  三、通常步驟

  1. Create an epoll object——建立1個epoll對象
  2. Tell the epoll object to monitor specific events on specific sockets——告訴epoll對象,在指定的socket上監聽指定的事件
  3. Ask the epoll object which sockets may have had the specified event since the last query——詢問epoll對象,從上次查詢以來,哪些socket發生了哪些指定的事件
  4. Perform some action on those sockets——在這些socket上執行一些操做
  5. Tell the epoll object to modify the list of sockets and/or events to monitor——告訴epoll對象,修改socket列表和(或)事件,並監控
  6. Repeat steps 3 through 5 until finished——重複步驟3-5,直到完成
  7. Destroy the epoll object——銷燬epoll對象

  四、相關用法

import select 導入select模塊

epoll = select.epoll() 建立一個epoll對象

epoll.register(文件句柄,事件類型) 註冊要監控的文件句柄和事件

事件類型:

  select.EPOLLIN    可讀事件

  select.EPOLLOUT   可寫事件

  select.EPOLLERR   錯誤事件

  select.EPOLLHUP   客戶端斷開事件

epoll.unregister(文件句柄)   銷燬文件句柄

epoll.poll(timeout)  當文件句柄發生變化,則會以列表的形式主動報告給用戶進程,timeout

                     爲超時時間,默認爲-1,即一直等待直到文件句柄發生變化,若是指定爲1

                     那麼epoll每1秒彙報一次當前文件句柄的變化狀況,若是無變化則返回空

epoll.fileno() 返回epoll的控制文件描述符(Return the epoll control file descriptor)

epoll.modfiy(fineno,event) fineno爲文件描述符 event爲事件類型  做用是修改文件描述符所對應的事件

epoll.fromfd(fileno) 從1個指定的文件描述符建立1個epoll對象

epoll.close()   關閉epoll對象的控制文件描述符

   5 實例:客戶端發送數據 服務端將接收的數據返回給客戶端

 1 #!/usr/bin/env python
 2 #-*- coding:utf-8 -*-
 3 
 4 import socket
 5 import select
 6 import Queue
 7 
 8 #建立socket對象
 9 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
10 #設置IP地址複用
11 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
12 #ip地址和端口號
13 server_address = ("127.0.0.1", 8888)
14 #綁定IP地址
15 serversocket.bind(server_address)
16 #監聽,並設置最大鏈接數
17 serversocket.listen(10)
18 print  "服務器啓動成功,監聽IP:" , server_address
19 #服務端設置非阻塞
20 serversocket.setblocking(False)  
21 #超時時間
22 timeout = 10
23 #建立epoll事件對象,後續要監控的事件添加到其中
24 epoll = select.epoll()
25 #註冊服務器監聽fd到等待讀事件集合
26 epoll.register(serversocket.fileno(), select.EPOLLIN)
27 #保存鏈接客戶端消息的字典,格式爲{}
28 message_queues = {}
29 #文件句柄到所對應對象的字典,格式爲{句柄:對象}
30 fd_to_socket = {serversocket.fileno():serversocket,}
31 
32 while True:
33   print "等待活動鏈接......"
34   #輪詢註冊的事件集合,返回值爲[(文件句柄,對應的事件),(...),....]
35   events = epoll.poll(timeout)
36   if not events:
37      print "epoll超時無活動鏈接,從新輪詢......"
38      continue
39   print "" , len(events), "個新事件,開始處理......"
40   
41   for fd, event in events:
42      socket = fd_to_socket[fd]
43      #若是活動socket爲當前服務器socket,表示有新鏈接
44      if socket == serversocket:
45             connection, address = serversocket.accept()
46             print "新鏈接:" , address
47             #新鏈接socket設置爲非阻塞
48             connection.setblocking(False)
49             #註冊新鏈接fd到待讀事件集合
50             epoll.register(connection.fileno(), select.EPOLLIN)
51             #把新鏈接的文件句柄以及對象保存到字典
52             fd_to_socket[connection.fileno()] = connection
53             #以新鏈接的對象爲鍵值,值存儲在隊列中,保存每一個鏈接的信息
54             message_queues[connection]  = Queue.Queue()
55      #關閉事件
56      elif event & select.EPOLLHUP:
57         print 'client close'
58         #在epoll中註銷客戶端的文件句柄
59         epoll.unregister(fd)
60         #關閉客戶端的文件句柄
61         fd_to_socket[fd].close()
62         #在字典中刪除與已關閉客戶端相關的信息
63         del fd_to_socket[fd]
64      #可讀事件
65      elif event & select.EPOLLIN:
66         #接收數據
67         data = socket.recv(1024)
68         if data:
69            print "收到數據:" , data , "客戶端:" , socket.getpeername()
70            #將數據放入對應客戶端的字典
71            message_queues[socket].put(data)
72            #修改讀取到消息的鏈接到等待寫事件集合(即對應客戶端收到消息後,再將其fd修改並加入寫事件集合)
73            epoll.modify(fd, select.EPOLLOUT)
74      #可寫事件
75      elif event & select.EPOLLOUT:
76         try:
77            #從字典中獲取對應客戶端的信息
78            msg = message_queues[socket].get_nowait()
79         except Queue.Empty:
80            print socket.getpeername() , " queue empty"
81            #修改文件句柄爲讀事件
82            epoll.modify(fd, select.EPOLLIN)
83         else :
84            print "發送數據:" , data , "客戶端:" , socket.getpeername()
85            #發送數據
86            socket.send(msg)
87 
88 #在epoll中註銷服務端文件句柄
89 epoll.unregister(serversocket.fileno())
90 #關閉epoll
91 epoll.close()
92 #關閉服務器socket
93 serversocket.close()
服務端代碼
 1 #!/usr/bin/env python
 2 #-*- coding:utf-8 -*-
 3 
 4 import socket
 5 
 6 #建立客戶端socket對象
 7 clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 8 #服務端IP地址和端口號元組
 9 server_address = ('127.0.0.1',8888)
10 #客戶端鏈接指定的IP地址和端口號
11 clientsocket.connect(server_address)
12 
13 while True:
14     #輸入數據
15     data = raw_input('please input:')
16     #客戶端發送數據
17     clientsocket.sendall(data)
18     #客戶端接收數據
19     server_data = clientsocket.recv(1024)
20     print '客戶端收到的數據:'server_data
21     #關閉客戶端socket
22     clientsocket.close() 
客戶端代碼

 

參考資料:

      http://blog.csdn.net/mango_song/article/details/42643971

      http://www.cnblogs.com/Alanpy/articles/5125986.html

      http://scotdoyle.com/python-epoll-howto.html

      http://www.haiyun.me/archives/1056.html

相關文章
相關標籤/搜索