Select\Poll\Epoll異步IO與事件驅動

事件驅動與異步IO

事件驅動編程是一種編程規範,這裏程序的執行流由外部事件來規定。它的特色是包含一個事件循環,但外部事件發生時使用回調機制來觸發響應的處理。另外兩種常見的編程規範是(單線程)同步以及多線程編程。python

 

 

在單線程同步模型中,任務按照順序執行。若是某個任務由於IO而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷出來的。若是任務之間並無相互依賴的關係,但仍然須要相互等待的話這就使得程序沒必要要的下降了運行速度。程序員

 

在多線程版本中,2個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序必須寫代碼來保護共享資源,防止其餘被多線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制加鎖,可重入函數,線程局部存儲或其餘機制來處理線程安全問題。編程

 

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理IO或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當IO操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件來到時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關係線程安全問題。數組

 

事件驅動模型適用的環境:緩存

  • 程序中有許多任務,並且任務之間不存在因果聯繫
  • 任務之間高度獨立(由於它們不須要相互通訊,或着等待彼此)
  • 在等待事件到來時,某些任務會阻塞

 

 

Select \ Poll \ Epoll 異步IO

三者的區別:

select

select最先於1983年出如今4.2BSD中,它經過一個select() 系統調用來監控多個文件描述符的數組,當select() 返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。安全

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今來看,這也是它所剩很少的優勢之一。服務器

select的缺點在於單個進程可以監視的文件描述符的數量存在有最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。網絡

另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定開銷。數據結構

 

poll

poll在1986年誕生於System V Release 3,它和select在本質上沒有太大差異,可是poll沒有最大文件描述符數量的限制。多線程

poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。

另外,select()和poll() 將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發。

 

epoll

直到Linux2.6纔出現了由內核直接支持的實現方式,就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux 2.6下性能最好的多路IO就緒通知方式。

epoll能夠同時支持水平觸發和邊緣觸發(只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是沒有采起行動,那麼它就不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要高一些,可是代碼實現至關複雜。

epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

另外一個本質的改進在於epoll採用基於事件的就緒通知方式,在select、poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用類是callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

 

 

 

python select

python的select() 方法直接調用操做系統的IO接口,它監控sockets,open files,and pipes(全部帶fileno()方法的文件句柄)什麼時候變成 readable 和 writeable 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過python的解釋器。

下面經過echo server例子瞭解select是如何經過單進程實現同時處理多個非阻塞的scoket鏈接的

import select
import socket
import sys
import Queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)

slect()方法接收並監控3個通訊列表,第一個是全部的輸入的data,就是指外部發過來的數據,第2個是監控和接收全部要法術的data(outgoing data),第3個監控錯誤信息,接下來須要建立2個列表來包含輸入和輸出信息來傳遞給select()

# Sockets from which we expect to read
inputs = [ server ]

# Sockets to which we expect to write
outputs = [ ]

全部客戶端的進來的鏈接和數據將會被server的主循環程序放在上面的list中處理,咱們如今的server端須要等待鏈接可寫(writable)以後才能過來,而後接收數據並返回(所以不是在接收到數據以後馬上返回),由於每一個鏈接要把輸入或輸出的數據先緩存到queue裏,而後再由select取出來再發出去。

# Outgoing message queues (socket:Queue)
message_queues = {}

下面是此程序的主循環,調用select()時會阻塞和等待直到新的鏈接和數據進來

while inputs:

    # Wait for at least one of the sockets to be ready for processing
    print >>sys.stderr, '\nwaiting for the next event'
    readable, writable, exceptional = select.select(inputs, outputs, inputs)

當你把 inputs,outputs,exceptional(這裏跟inputs共用)傳給select()後,它返回3個新的list,咱們上面將它們分別賦值爲 readable , writable ,execptional,全部在readable list中的socket鏈接表明有數據可接收(recv),全部在 writable list中的存放着能夠對其進行發送(send)操做的socket 鏈接,當鏈接通訊出現error時會把error寫到 exceptional 列表中。

 

readable list中的socket能夠有3種可能狀態

第一種是若是這個socket是 main 「server」 socket,它負責監聽客戶端的鏈接,若是 main server socket 出如今readable裏,表明這個server端已經ready來接收一個新的鏈接進來了,爲了讓這個 main server 能同時處理多個鏈接,在下面的代碼裏,咱們把這個main server 的socket設置爲非阻塞模式。

# Handle inputs
for s in readable:
 
    if s is server:
        # A "readable" server socket is ready to accept a connection
        connection, client_address = s.accept()
        print >>sys.stderr, 'new connection from', client_address
        connection.setblocking(0)
        inputs.append(connection)
 
        # Give the connection a queue for data we want to send
        message_queues[connection] = Queue.Queue()

第二中狀況是這個socket是已經創建的鏈接,它把數據發了過來,這個時候你就能夠經過recv() 來接收它發過來的數據,而後把接收到的數據放到queue裏,這樣就能夠把接收到的數據再傳回給客戶端了。

else:
     data = s.recv(1024)
     if data:
         # A readable client socket has data
         print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
         message_queues[s].put(data)
         # Add output channel for response
         if s not in outputs:
             outputs.append(s)

第三種狀況就是這個客戶端已經斷開了,因此再經過recv() 接收到的數據就是爲空了,因此這個時候就能夠把這個跟客戶端的鏈接關閉了。

 

else:
    # Interpret empty result as closed connection
    print >>sys.stderr, 'closing', client_address, 'after reading no data'
    # Stop listening for input on the connection
    if s in outputs:
        outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉
    inputs.remove(s)    #inputs中也刪除掉
    s.close()           #把這個鏈接關閉掉
 
    # Remove message queue
    del message_queues[s]

對於writable list中的socket,也有幾種狀態,若是這個客戶端鏈接在跟它對應的queue裏面的數據,就把這個數據取出來再發回給這個客戶端,是否就把這個鏈接從 output list 中移除,這樣下一次循環select()調用時檢測到 outputs list中沒有這個鏈接,那就會認爲這個鏈接還處於非活動狀態。

# Handle outputs
for s in writable:
    try:
        next_msg = message_queues[s].get_nowait()
    except Queue.Empty:
        # No messages waiting so stop checking for writability.
        print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
        outputs.remove(s)
    else:
        print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
        s.send(next_msg)

組後,若是在跟某個socket鏈接通訊過程當中出現了錯誤,就把這個鏈接對象在 inputs、outputs、message_queue中都刪除,再把鏈接關閉掉

# Handle "exceptional conditions"
for s in exceptional:
    print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
    # Stop listening for input on the connection
    inputs.remove(s)
    if s in outputs:
        outputs.remove(s)
    s.close()
 
    # Remove message queue
    del message_queues[s]

 

 

最後服務器端的完整代碼:

import select
import socket
import sys
import queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)
 
# Sockets from which we expect to read
inputs = [ server ]
 
# Sockets to which we expect to write
outputs = [ ]
 
message_queues = {}
while inputs:
 
    # Wait for at least one of the sockets to be ready for processing
    print( '\nwaiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # Handle inputs
    for s in readable:
 
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print('new connection from', client_address)
            connection.setblocking(False)
            inputs.append(connection)
 
            # Give the connection a queue for data we want to send
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
                print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉
                inputs.remove(s)    #inputs中也刪除掉
                s.close()           #把這個鏈接關閉掉
 
                # Remove message queue
                del message_queues[s]
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
            print('output queue for', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle "exceptional conditions"
    for s in exceptional:
        print('handling exceptional condition for', s.getpeername() )
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
 
        # Remove message queue
        del message_queues[s]

 

客戶端完整代碼:

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10000)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)
 
for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()
            s.close()
相關文章
相關標籤/搜索